| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_http -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 import time
23
24 import gobject
25 import gst
26
27 # socket needed to get hostname
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.web import server
32 from twisted.cred import credentials
33
34 from flumotion.component import feedcomponent
35 from flumotion.common import bundle, common, gstreamer, errors, pygobject
36 from flumotion.common import messages, netutils, log, interfaces
37
38 from flumotion.twisted import fdserver
39 from flumotion.twisted.compat import implements
40 from flumotion.component.misc.porter import porterclient
41
42 # proxy import
43 from flumotion.component.component import moods
44 from flumotion.common.pygobject import gsignal
45
46 from flumotion.component.consumers.httpstreamer import resources
47 from flumotion.component.base import http
48
49 from flumotion.common.messages import N_
50 T_ = messages.gettexter('flumotion')
51
52 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
53
54 # FIXME: generalize this class and move it out here ?
57 self.sink = sink
58
59 self.no_clients = 0
60 self.clients_added_count = 0
61 self.clients_removed_count = 0
62 self.start_time = time.time()
63 # keep track of the highest number and the last epoch this was reached
64 self.peak_client_number = 0
65 self.peak_epoch = self.start_time
66 self.load_deltas = [0, 0]
67 self._load_deltas_period = 10 # seconds
68 self._load_deltas_ongoing = [time.time(), 0, 0]
69
70 # keep track of average clients by tracking last average and its time
71 self.average_client_number = 0
72 self.average_time = self.start_time
73
74 self.hostname = "localhost"
75 self.port = 0
76 self.mountPoint = "/"
77
79 # update running average of clients connected
80 now = time.time()
81 # calculate deltas
82 dt1 = self.average_time - self.start_time
83 dc1 = self.average_client_number
84 dt2 = now - self.average_time
85 dc2 = self.no_clients
86 self.average_time = now # we can update now that we used self.av
87 if dt1 == 0:
88 # first measurement
89 self.average_client_number = 0
90 else:
91 dt = dt1 + dt2
92 before = (dc1 * dt1) / dt
93 after = dc2 * dt2 / dt
94 self.average_client_number = before + after
95
97 self._updateAverage()
98
99 self.no_clients += 1
100 self.clients_added_count +=1
101
102 # >= so we get the last epoch this peak was achieved
103 if self.no_clients >= self.peak_client_number:
104 self.peak_epoch = time.time()
105 self.peak_client_number = self.no_clients
106
111
113 """
114 Periodically, update our statistics on load deltas, and update the
115 UIState with new values for total bytes, bitrate, etc.
116 """
117
118 oldtime, oldadd, oldremove = self._load_deltas_ongoing
119 add, remove = self.clients_added_count, self.clients_removed_count
120 now = time.time()
121 diff = float(now - oldtime)
122
123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
124 self._load_deltas_ongoing = [now, add, remove]
125
126 self.update_ui_state()
127
128 self._updateCallLaterId = reactor.callLater(10, self._updateStats)
129
132
135
138
141
144
147
150
152 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
153
156
158 c = self
159
160 bytes_sent = c.getBytesSent()
161 bytes_received = c.getBytesReceived()
162 uptime = c.getUptime()
163
164 set('stream-mime', c.get_mime())
165 set('stream-url', c.getUrl())
166 set('stream-uptime', common.formatTime(uptime))
167 bitspeed = bytes_received * 8 / uptime
168 set('stream-bitrate', common.formatStorage(bitspeed) + 'bit/s')
169 set('stream-totalbytes', common.formatStorage(bytes_received) + 'Byte')
170 set('stream-bitrate-raw', bitspeed)
171 set('stream-totalbytes-raw', bytes_received)
172
173 set('clients-current', str(c.getClients()))
174 set('clients-max', str(c.getMaxClients()))
175 set('clients-peak', str(c.getPeakClients()))
176 set('clients-peak-time', c.getPeakEpoch())
177 set('clients-average', str(int(c.getAverageClients())))
178
179 bitspeed = bytes_sent * 8 / uptime
180 set('consumption-bitrate', common.formatStorage(bitspeed) + 'bit/s')
181 set('consumption-totalbytes', common.formatStorage(bytes_sent) + 'Byte')
182 set('consumption-bitrate-raw', bitspeed)
183 set('consumption-totalbytes-raw', bytes_sent)
184
191
193 """
194 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
195 """
196 d = self.callRemote('authenticate', bouncerName, keycard)
197 d.addErrback(log.warningFailure)
198 return d
199
201 """
202 @rtype: L{twisted.internet.defer.Deferred}
203 """
204 return self.callRemote('removeKeycardId', bouncerName, keycardId)
205
206 ### remote methods for manager to call on
208 self.comp.resource.expireKeycard(keycardId)
209
211 self.comp.update_ui_state()
212
214 self.comp.resource.rotateLogs()
215
217 return self.comp.getStreamData()
218
220 return self.comp.getLoadData()
221
224
225 ### the actual component is a streamer using multifdsink
227 implements(interfaces.IStreamingComponent)
228
229 checkOffset = True
230
231 # this object is given to the HTTPMedium as comp
232 logCategory = 'cons-http'
233
234 pipe_template = 'multifdsink name=sink ' + \
235 'sync=false ' + \
236 'recover-policy=3'
237 gsignal('client-removed', object, int, int, object)
238
239 componentMediumClass = HTTPMedium
240
242 reactor.debug = True
243 self.debug("HTTP streamer initialising")
244
245 self.caps = None
246 self.resource = None
247 self.mountPoint = None
248 self.burst_on_connect = False
249
250 self.description = None
251
252 self.type = None
253
254 # Used if we've slaved to a porter.
255 self._pbclient = None
256 self._porterUsername = None
257 self._porterPassword = None
258 self._porterPath = None
259
260 # Or if we're a master, we open our own port here. Also used for URLs
261 # in the porter case.
262 self.port = None
263 # We listen on this interface, if set.
264 self.iface = None
265
266 self._updateCallLaterId = None
267
268 self._pending_removals = {}
269
270 for i in ('stream-mime', 'stream-uptime', 'stream-bitrate',
271 'stream-totalbytes', 'clients-current', 'clients-max',
272 'clients-peak', 'clients-peak-time', 'clients-average',
273 'consumption-bitrate', 'consumption-totalbytes',
274 'stream-bitrate-raw', 'stream-totalbytes-raw',
275 'consumption-bitrate-raw', 'consumption-totalbytes-raw',
276 'stream-url'):
277 self.uiState.addKey(i, None)
278
280 return self.description
281
283 return self.pipe_template
284
286 props = self.config['properties']
287
288 # F0.6: remove backwards-compatible properties
289 self.fixRenamedProperties(props, [
290 ('issuer', 'issuer-class'),
291 ('mount_point', 'mount-point'),
292 ('porter_socket_path', 'porter-socket-path'),
293 ('porter_username', 'porter-username'),
294 ('porter_password', 'porter-password'),
295 ('user_limit', 'client-limit'),
296 ('bandwidth_limit', 'bandwidth-limit'),
297 ('burst_on_connect', 'burst-on-connect'),
298 ('burst_size', 'burst-size'),
299 ])
300
301 if props.get('type', 'master') == 'slave':
302 for k in 'socket-path', 'username', 'password':
303 if not 'porter-' + k in props:
304 msg = "slave mode, missing required property 'porter-%s'" % k
305 return defer.fail(errors.ConfigError(msg))
306
307 if 'burst-size' in props and 'burst-time' in props:
308 msg = 'both burst-size and burst-time set, cannot satisfy'
309 return defer.fail(errors.ConfigError(msg))
310
311 # tcp is where multifdsink is
312 version = gstreamer.get_plugin_version('tcp')
313 if version < (0, 10, 9, 1):
314 m = messages.Error(T_(N_(
315 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
316 ".".join(map(str, version)), 'multifdsink'))
317 m.add(T_(N_("Please upgrade '%s' to version %s."),
318 'gst-plugins-base', '0.10.10'))
319 self.addMessage(m)
320 self.setMood(moods.sad)
321
322 return defer.fail(
323 errors.ComponentSetupHandledError(
324 "multifdsink version not newer than 0.10.9.1"))
325
332
334 if self.burst_on_connect:
335 if self.burst_time and self.time_bursting_supported(sink):
336 self.debug("Configuring burst mode for %f second burst",
337 self.burst_time)
338 # Set a burst for configurable minimum time, plus extra to
339 # start from a keyframe if needed.
340 sink.set_property('sync-method', 4) # burst-keyframe
341 sink.set_property('burst-unit', 2) # time
342 sink.set_property('burst-value',
343 long(self.burst_time * gst.SECOND))
344
345 # We also want to ensure that we have sufficient data available
346 # to satisfy this burst; and an appropriate maximum, all
347 # specified in units of time.
348 sink.set_property('time-min',
349 long((self.burst_time + 5) * gst.SECOND))
350
351 sink.set_property('unit-type', 2) # time
352 sink.set_property('units-soft-max',
353 long((self.burst_time + 8) * gst.SECOND))
354 sink.set_property('units-max',
355 long((self.burst_time + 10) * gst.SECOND))
356 elif self.burst_size:
357 self.debug("Configuring burst mode for %d kB burst",
358 self.burst_size)
359 # If we have a burst-size set, use modern
360 # needs-recent-multifdsink behaviour to have complex bursting.
361 # In this mode, we burst a configurable minimum, plus extra
362 # so we start from a keyframe (or less if we don't have a
363 # keyframe available)
364 sink.set_property('sync-method', 'burst-keyframe')
365 sink.set_property('burst-unit', 'bytes')
366 sink.set_property('burst-value', self.burst_size * 1024)
367
368 # To use burst-on-connect, we need to ensure that multifdsink
369 # has a minimum amount of data available - assume 512 kB beyond
370 # the burst amount so that we should have a keyframe available
371 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
372
373 # And then we need a maximum still further above that - the
374 # exact value doesn't matter too much, but we want it reasonably
375 # small to limit memory usage. multifdsink doesn't give us much
376 # control here, we can only specify the max values in buffers.
377 # We assume each buffer is close enough to 4kB - true for asf
378 # and ogg, at least
379 sink.set_property('buffers-soft-max',
380 (self.burst_size + 1024) / 4)
381 sink.set_property('buffers-max',
382 (self.burst_size + 2048) / 4)
383
384 else:
385 # Old behaviour; simple burst-from-latest-keyframe
386 self.debug("simple burst-on-connect, setting sync-method 2")
387 sink.set_property('sync-method', 2)
388
389 sink.set_property('buffers-soft-max', 250)
390 sink.set_property('buffers-max', 500)
391 else:
392 self.debug("no burst-on-connect, setting sync-method 0")
393 sink.set_property('sync-method', 0)
394
395 sink.set_property('buffers-soft-max', 250)
396 sink.set_property('buffers-max', 500)
397
399 Stats.__init__(self, sink=self.get_element('sink'))
400
401 self._updateCallLaterId = reactor.callLater(10, self._updateStats)
402
403 mountPoint = properties.get('mount-point', '')
404 if not mountPoint.startswith('/'):
405 mountPoint = '/' + mountPoint
406 self.mountPoint = mountPoint
407
408 # Hostname is used for a variety of purposes. We do a best-effort guess
409 # where nothing else is possible, but it's much preferable to just
410 # configure this
411 self.hostname = properties.get('hostname', None)
412 self.iface = self.hostname # We listen on this if explicitly configured,
413 # but not if it's only guessed at by the
414 # below code.
415 if not self.hostname:
416 # Don't call this nasty, nasty, probably flaky function unless we
417 # need to.
418 self.hostname = netutils.guess_public_hostname()
419
420 self.description = properties.get('description', None)
421 if self.description is None:
422 self.description = "Flumotion Stream"
423
424 # FIXME: tie these together more nicely
425 self.resource = resources.HTTPStreamingResource(self)
426
427 # check how to set client sync mode
428 sink = self.get_element('sink')
429 self.burst_on_connect = properties.get('burst-on-connect', False)
430 self.burst_size = properties.get('burst-size', 0)
431 self.burst_time = properties.get('burst-time', 0.0)
432
433 self.setup_burst_mode(sink)
434
435 sink.connect('deep-notify::caps', self._notify_caps_cb)
436
437 # these are made threadsafe using idle_add in the handler
438 sink.connect('client-added', self._client_added_handler)
439
440 # We now require a sufficiently recent multifdsink anyway that we can
441 # use the new client-fd-removed signal
442 sink.connect('client-fd-removed', self._client_fd_removed_cb)
443 sink.connect('client-removed', self._client_removed_cb)
444
445 if properties.has_key('client-limit'):
446 self.resource.setUserLimit(int(properties['client-limit']))
447
448 if properties.has_key('bouncer'):
449 self.resource.setBouncerName(properties['bouncer'])
450
451 if properties.has_key('issuer-class'):
452 self.resource.setIssuerClass(properties['issuer-class'])
453
454 if properties.has_key('duration'):
455 self.resource.setDefaultDuration(float(properties['duration']))
456
457 if properties.has_key('domain'):
458 self.resource.setDomain(properties['domain'])
459
460 if self.config.has_key('avatarId'):
461 self.resource.setRequesterId(self.config['avatarId'])
462
463 if properties.has_key('ip-filter'):
464 filter = http.LogFilter()
465 for f in properties['ip-filter']:
466 filter.addIPFilter(f)
467 self.resource.setLogFilter(filter)
468
469 self.type = properties.get('type', 'master')
470 if self.type == 'slave':
471 # already checked for these in do_check
472 self._porterPath = properties['porter-socket-path']
473 self._porterUsername = properties['porter-username']
474 self._porterPassword = properties['porter-password']
475
476 self.port = int(properties.get('port', 8800))
477
479 return '<MultifdSinkStreamer (%s)>' % self.name
480
483
487
489 mime = self.get_mime()
490 if mime == 'multipart/x-mixed-replace':
491 mime += ";boundary=ThisRandomString"
492 return mime
493
495 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
496
498 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
499 if self.plugs[socket]:
500 plug = self.plugs[socket][-1]
501 return plug.getStreamData()
502 else:
503 return {
504 'protocol': 'HTTP',
505 'description': self.description,
506 'url' : self.getUrl()
507 }
508
510 """
511 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
512 current_clients, current_load) of our current bandwidth and user values.
513 The deltas are estimates of how much bitrate is added, removed
514 due to client connections, disconnections, per second.
515 """
516 # We calculate the estimated clients added/removed per second, then
517 # multiply by the stream bitrate
518 deltaadded, deltaremoved = self.getLoadDeltas()
519
520 bytes_received = self.getBytesReceived()
521 uptime = self.getUptime()
522 bitrate = bytes_received * 8 / uptime
523
524 bytes_sent = self.getBytesSent()
525 clients_connected = self.getClients()
526 current_load = bitrate * clients_connected
527
528 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
529 clients_connected, current_load)
530
534
538
543 # fixme: have updateState just update what changed itself
544 # without the hack above
545 self.updateState(set)
546
548 self.log('[fd %5d] client_added_handler', fd)
549 Stats.clientAdded(self)
550 self.update_ui_state()
551
553 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
554 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
555 self.warning('[fd %5d] Client removed because of write error' % fd)
556
557 self.emit('client-removed', sink, fd, reason, stats)
558 Stats.clientRemoved(self)
559 self.update_ui_state()
560
561 ### START OF THREAD-AWARE CODE (called from non-reactor threads)
562
564 caps = pad.get_negotiated_caps()
565 if caps == None:
566 return
567
568 caps_str = gstreamer.caps_repr(caps)
569 self.debug('Got caps: %s' % caps_str)
570
571 if not self.caps == None:
572 self.warning('Already had caps: %s, replacing' % caps_str)
573
574 self.debug('Storing caps: %s' % caps_str)
575 self.caps = caps
576
577 reactor.callFromThread(self.update_ui_state)
578
579 # We now use both client-removed and client-fd-removed. We call get-stats
580 # from the first callback ('client-removed'), but don't actually start
581 # removing the client until we get 'client-fd-removed'. This ensures that
582 # there's no window in which multifdsink still knows about the fd, but we've # actually closed it, so we no longer get spurious duplicates.
583 # this can be called from both application and streaming thread !
587
588 # this can be called from both application and streaming thread !
590 (stats, reason) = self._pending_removals.pop(fd)
591
592 reactor.callFromThread(self._client_removed_handler, sink, fd,
593 reason, stats)
594
595 ### END OF THREAD-AWARE CODE
596
598 if self._updateCallLaterId:
599 self._updateCallLaterId.cancel()
600 self._updateCallLaterId = None
601
602 if self.type == 'slave' and self._pbclient:
603 d1 = self._pbclient.deregisterPath(self.mountPoint)
604 d2 = feedcomponent.ParseLaunchComponent.do_stop(self)
605 return defer.DeferredList([d1,d2])
606 else:
607 return feedcomponent.ParseLaunchComponent.do_stop(self)
608
610 """
611 Provide a new set of porter login information, for when we're in slave
612 mode and the porter changes.
613 If we're currently connected, this won't disconnect - it'll just change
614 the information so that next time we try and connect we'll use the
615 new ones
616 """
617 if self.type == 'slave':
618 self._porterUsername = username
619 self._porterPassword = password
620
621 creds = credentials.UsernamePassword(self._porterUsername,
622 self._porterPassword)
623 self._pbclient.startLogin(creds, self.medium)
624
625 # If we've changed paths, we must do some extra work.
626 if path != self._porterPath:
627 self.debug("Changing porter login to use \"%s\"", path)
628 self._porterPath = path
629 self._pbclient.stopTrying() # Stop trying to connect with the
630 # old connector.
631 self._pbclient.resetDelay()
632 reactor.connectWith(
633 fdserver.FDConnector, self._porterPath,
634 self._pbclient, 10, checkPID=False)
635 else:
636 raise errors.WrongStateError(
637 "Can't specify porter details in master mode")
638
640 root = resources.HTTPRoot()
641 # TwistedWeb wants the child path to not include the leading /
642 mount = self.mountPoint[1:]
643 root.putChild(mount, self.resource)
644 if self.type == 'slave':
645 # Streamer is slaved to a porter.
646
647 # We have two things we want to do in parallel:
648 # - ParseLaunchComponent.do_start()
649 # - log in to the porter, then register our mountpoint with
650 # the porter.
651 # So, we return a DeferredList with a deferred for each of
652 # these tasks. The second one's a bit tricky: we pass a dummy
653 # deferred to our PorterClientFactory that gets fired once
654 # we've done all of the tasks the first time (it's an
655 # automatically-reconnecting client factory, and we only fire
656 # this deferred the first time)
657
658 d1 = feedcomponent.ParseLaunchComponent.do_start(self,
659 *args, **kwargs)
660
661 d2 = defer.Deferred()
662 mountpoints = [self.mountPoint]
663 self._pbclient = porterclient.HTTPPorterClientFactory(
664 server.Site(resource=root), mountpoints, d2)
665
666 creds = credentials.UsernamePassword(self._porterUsername,
667 self._porterPassword)
668 self._pbclient.startLogin(creds, self.medium)
669
670 self.debug("Starting porter login at \"%s\"", self._porterPath)
671 # This will eventually cause d2 to fire
672 reactor.connectWith(
673 fdserver.FDConnector, self._porterPath,
674 self._pbclient, 10, checkPID=False)
675
676 return defer.DeferredList([d1, d2])
677 else:
678 # Streamer is standalone.
679 try:
680 self.debug('Listening on %d' % self.port)
681 iface = self.iface or ""
682 reactor.listenTCP(self.port, server.Site(resource=root),
683 interface=iface)
684 return feedcomponent.ParseLaunchComponent.do_start(self, *args,
685 **kwargs)
686 except error.CannotListenError:
687 t = 'Port %d is not available.' % self.port
688 self.warning(t)
689 m = messages.Error(T_(N_(
690 "Network error: TCP port %d is not available."), self.port))
691 self.addMessage(m)
692 self.setMood(moods.sad)
693 return defer.fail(errors.ComponentStartHandledError(t))
694
695 pygobject.type_register(MultifdSinkStreamer)
696
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sun Mar 7 10:46:46 2010 | http://epydoc.sourceforge.net |