| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import gst
27 import gst.interfaces
28 import gobject
29
30 from twisted.internet import reactor, defer
31 from twisted.spread import pb
32
33 from flumotion.configure import configure
34 from flumotion.component import component as basecomponent
35 from flumotion.common import common, interfaces, errors, log, pygobject, messages
36 from flumotion.common import gstreamer
37
38 from flumotion.common.planet import moods
39 from flumotion.common.pygobject import gsignal
40 from flumotion.twisted.compat import implements
41
42 # FIXME: maybe move feed to component ?
43 from flumotion.worker import feed
44 from flumotion.common.messages import N_
45 T_ = messages.gettexter('flumotion')
46
48 """
49 I am a component-side medium for a FeedComponent to interface with
50 the manager-side ComponentAvatar.
51 """
52 implements(interfaces.IComponentMedium)
53 logCategory = 'feedcompmed'
54 remoteLogName = 'feedserver'
55
57 """
58 @param component: L{flumotion.component.feedcomponent.FeedComponent}
59 """
60 basecomponent.BaseComponentMedium.__init__(self, component)
61
62 self._feederFeedServer = {} # FeedId -> (fullFeedId, host, port) tuple
63 # for remote feeders
64 self._feederClientFactory = {} # fullFeedId -> client factory
65 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple
66 # for remote eaters
67 self._eaterClientFactory = {} # (componentId, feedId) -> client factory
68 self._eaterTransport = {} # (componentId, feedId) -> transport
69 self.logName = component.name
70
71 def on_feed_ready(component, feedName, isReady):
72 self.callRemote('feedReady', feedName, isReady)
73
74 def on_component_error(component, element_path, message):
75 self.callRemote('error', element_path, message)
76
77 self.comp.connect('feed-ready', on_feed_ready)
78 self.comp.connect('error', on_component_error)
79
80 # override base Errback for callRemote to stop the pipeline
81 #def callRemoteErrback(reason):
82 # self.warning('stopping pipeline because of %s' % reason)
83 # self.comp.pipeline_stop()
84
85 ### Referenceable remote methods which can be called from manager
87 return self.comp.get_element_property(elementName, property)
88
91
93 """
94 Sets the GStreamer debugging levels based on the passed debug string.
95 """
96 self.debug('Setting GStreamer debug level to %s' % debug)
97 if not debug:
98 return
99
100 for part in debug.split(','):
101 glob = None
102 value = None
103 pair = part.split(':')
104 if len(pair) == 1:
105 # assume only the value
106 value = int(pair[0])
107 elif len(pair) == 2:
108 glob, value = pair
109 else:
110 self.warning("Cannot parse GStreamer debug setting '%s'." %
111 part)
112 continue
113
114 if glob:
115 gst.debug_set_threshold_for_name(glob, value)
116 else:
117 gst.debug_set_default_threshold(value)
118
120 """
121 Tell the component the host and port for the FeedServer through which
122 it can connect a local eater to a remote feeder to eat the given
123 fullFeedId.
124
125 Called on by the manager-side ComponentAvatar.
126 """
127 # we key on the feedId because a component is part of only one flow,
128 # and doesn't even know the flow name it is part of.
129 flowName, componentName, feedName = common.parseFullFeedId(fullFeedId)
130 feedId = common.feedId(componentName, feedName)
131 self._feederFeedServer[feedId] = (fullFeedId, host, port)
132 # FIXME: drop connection if we already had one
133 return self.connectEater(feedId)
134
136 """
137 Actually eat the given feed.
138 Used on initial connection, and for reconnecting.
139 """
140 (fullFeedId, host, port) = self._feederFeedServer[feedId]
141 client = feed.FeedMedium(self.comp)
142 factory = feed.FeedClientFactory(client)
143 # FIXME: maybe copy keycard instead, so we can change requester ?
144 self.debug('connecting to FeedServer on %s:%d' % (host, port))
145 reactor.connectTCP(host, port, factory)
146 d = factory.login(self.authenticator)
147 self._feederClientFactory[fullFeedId] = factory
148 def loginCb(remoteRef):
149 self.debug('logged in to feedserver, remoteRef %r' % remoteRef)
150 client.setRemoteReference(remoteRef)
151 # now call on the remoteRef to eat
152 self.debug(
153 'COMPONENT --> feedserver: sendFeed(%s)' % fullFeedId)
154 d = remoteRef.callRemote('sendFeed', fullFeedId)
155
156 def sendFeedCb(result):
157 self.debug('COMPONENT <-- feedserver: sendFeed(%s): %r' % (
158 fullFeedId, result))
159 # FIXME: why does this not return result ?
160 return None
161
162 d.addCallback(sendFeedCb)
163 return d
164
165 d.addCallback(loginCb)
166 return d
167
169 """
170 Tell the component to feed the given feed to the receiving component
171 accessible through the FeedServer on the given host and port.
172
173 Called on by the manager-side ComponentAvatar.
174 """
175 # FIXME: check if this overwrites current config, and adapt if it
176 # does
177 self._eaterFeedServer[(componentId, feedId)] = (host, port)
178 client = feed.FeedMedium(self.comp)
179 factory = feed.FeedClientFactory(client)
180 # FIXME: maybe copy keycard instead, so we can change requester ?
181 self.debug('connecting to FeedServer on %s:%d' % (host, port))
182 reactor.connectTCP(host, port, factory)
183 d = factory.login(self.authenticator)
184 self._eaterClientFactory[(componentId, feedId)] = factory
185 def loginCb(remoteRef):
186 self.debug('logged in to feedserver, remoteRef %r' % remoteRef)
187 client.setRemoteReference(remoteRef)
188 # now call on the remoteRef to eat
189 self.debug(
190 'COMPONENT --> feedserver: receiveFeed(%s, %s)' % (
191 componentId, feedId))
192 d = remoteRef.callRemote('receiveFeed', componentId, feedId)
193
194 def receiveFeedCb(result):
195 self.debug(
196 'COMPONENT <-- feedserver: receiveFeed(%s, %s): %r' % (
197 componentId, feedId, result))
198 componentName, feedName = common.parseFeedId(feedId)
199 t = remoteRef.broker.transport
200 t.stopReading()
201 t.stopWriting()
202
203 key = (componentId, feedId)
204 self._eaterTransport[key] = t
205 remoteRef.broker.transport = None
206 fd = t.fileno()
207 self.debug('Telling component to feed feedName %s to fd %d'% (
208 feedName, fd))
209 self.comp.feedToFD(feedName, fd)
210
211 d.addCallback(receiveFeedCb)
212 return d
213
214 d.addCallback(loginCb)
215 return d
216
218 """
219 Tells the component to start providing a master clock on the given
220 UDP port.
221 Can only be called if setup() has been called on the component.
222
223 The IP address returned is the local IP the clock is listening on.
224
225 @returns: (ip, port, base_time)
226 @rtype: tuple of (str, int, long)
227 """
228 self.debug('remote_provideMasterClock(port=%r)' % port)
229 return self.comp.provide_master_clock(port)
230
232 """
233 Invoke the given methodName on the given effectName in this component.
234 The effect should implement effect_(methodName) to receive the call.
235 """
236 self.debug("calling %s on effect %s" % (methodName, effectName))
237 if not effectName in self.comp.effects:
238 raise errors.UnknownEffectError(effectName)
239 effect = self.comp.effects[effectName]
240 if not hasattr(effect, "effect_%s" % methodName):
241 raise errors.NoMethodError("%s on effect %s" % (methodName,
242 effectName))
243 method = getattr(effect, "effect_%s" % methodName)
244 try:
245 result = method(*args, **kwargs)
246 except TypeError:
247 msg = "effect method %s did not accept %s and %s" % (
248 methodName, args, kwargs)
249 self.debug(msg)
250 raise errors.RemoteRunError(msg)
251 self.debug("effect: result: %r" % result)
252 return result
253
254 from feedcomponent010 import FeedComponent
255
256 FeedComponent.componentMediumClass = FeedComponentMedium
257
259 'A component using gst-launch syntax'
260
261 DELIMITER = '@'
262
263 ### FeedComponent interface implementations
265 try:
266 unparsed = self.get_pipeline_string(self.config['properties'])
267 except errors.MissingElementError, e:
268 m = messages.Error(T_(N_(
269 "The worker does not have the '%s' element installed.\n"
270 "Please install the necessary plug-in and restart "
271 "the component.\n"), e.args[0]))
272 self.state.append('messages', m)
273 raise errors.ComponentSetupHandledError(e)
274
275 self.pipeline_string = self.parse_pipeline(unparsed)
276
277 try:
278 pipeline = gst.parse_launch(self.pipeline_string)
279
280 # Connect to the client-fd-removed signals on each feeder, so we
281 # can clean up properly on removal.
282 feeder_element_names = map(lambda n: "feeder:" + n,
283 self.feeder_names)
284 for feeder in feeder_element_names:
285 element = pipeline.get_by_name(feeder)
286 element.connect('client-fd-removed', self.removeClientCallback)
287 self.debug("Connected %s to removeClientCallback", feeder)
288
289 return pipeline
290 except gobject.GError, e:
291 self.warning('Could not parse pipeline: %s' % e.message)
292 m = messages.Error(T_(N_(
293 "GStreamer error: could not parse component pipeline.")),
294 debug=e.message)
295 self.state.append('messages', m)
296 raise errors.PipelineParseError(e.message)
297
299 FeedComponent.set_pipeline(self, pipeline)
300 self.configure_pipeline(self.pipeline, self.config['properties'])
301
302 ### ParseLaunchComponent interface for subclasses
304 """
305 Method that must be implemented by subclasses to produce the
306 gstparse string for the component's pipeline. Subclasses should
307 not chain up; this method raises a NotImplemented error.
308
309 Returns: a new pipeline string representation.
310 """
311 raise NotImplementedError('subclasses should implement '
312 'get_pipeline_string')
313
315 """
316 Method that can be implemented by subclasses if they wish to
317 interact with the pipeline after it has been created and set
318 on the component.
319
320 This could include attaching signals and bus handlers.
321 """
322 pass
323
324 ### private methods
326 """
327 Expand the given string to a full element name for an eater or feeder.
328 The full name is of the form eater:(sourceComponentName):(feedName)
329 or feeder:(componentName):feedName
330 """
331 if ' ' in block:
332 raise TypeError, "spaces not allowed in '%s'" % block
333 if not ':' in block:
334 raise TypeError, "no colons in'%s'" % block
335 if block.count(':') > 2:
336 raise TypeError, "too many colons in '%s'" % block
337
338 parts = block.split(':')
339
340 if parts[0] != 'eater' and parts[0] != 'feeder':
341 raise TypeError, "'%s' does not start with eater or feeder" % block
342
343 # we can only fill in component names for feeders
344 if not parts[1]:
345 if parts[0] == 'eater':
346 raise TypeError, "'%s' should specify feeder component" % block
347 parts[1] = self.name
348 if len(parts) == 2:
349 parts.append('')
350 if not parts[2]:
351 parts[2] = 'default'
352
353 return ":".join(parts)
354
356 """
357 Expand each @..@ block to use the full element name for eater or feeder.
358 The full name is of the form eater:(sourceComponentName):(feedName)
359 or feeder:(componentName):feedName
360 This also does some basic checking of the block.
361 """
362 assert block != ''
363
364 # verify the template has an even number of delimiters
365 if block.count(self.DELIMITER) % 2 != 0:
366 raise TypeError, "'%s' contains an odd number of '%s'" % (block, self.DELIMITER)
367
368 # when splitting, the even-indexed members will remain,
369 # and the odd-indexed members are the blocks to be substituted
370 blocks = block.split(self.DELIMITER)
371
372 for i in range(1, len(blocks) - 1, 2):
373 blocks[i] = self._expandElementName(blocks[i].strip())
374 return "@".join(blocks)
375
377 """
378 Expand the given pipeline string representation by substituting
379 blocks between '@' with a filled-in template.
380
381 @param pipeline: a pipeline string representation with variables
382 @param names: the element names to substitute for @...@ segments
383 @param template_func: function to call to get the template to use for
384 element factory info
385 @param format: the format to use when substituting
386
387 Returns: a new pipeline string representation.
388 """
389 assert pipeline != ''
390
391 deli = self.DELIMITER
392
393 if len(names) == 1:
394 part_name = names[0]
395 template = template_func(part_name)
396 named = template % {'name': part_name}
397 if pipeline.find(part_name) != -1:
398 pipeline = pipeline.replace(deli + part_name + deli, named)
399 else:
400 pipeline = format % {'tmpl': named, 'pipeline': pipeline}
401 else:
402 for part in names:
403 part_name = deli + part + deli # mmm, deli sandwich
404 if pipeline.find(part_name) == -1:
405 raise TypeError, "%s needs to be specified in the pipeline '%s'" % (part_name, pipeline)
406
407 template = template_func(part)
408 pipeline = pipeline.replace(part_name,
409 template % {'name': part})
410 return pipeline
411
413 pipeline = " ".join(pipeline.split())
414 self.debug('Creating pipeline, template is %s' % pipeline)
415
416 eater_names = self.get_eater_names()
417 if pipeline == '' and not eater_names:
418 raise TypeError, "Need a pipeline or a eater"
419
420 if pipeline == '':
421 assert eater_names
422 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink'
423
424 # we expand the pipeline based on the templates and eater/feeder names
425 # elements are named eater:(source_component_name):(feed_name)
426 # or feeder:(component_name):(feed_name)
427 eater_element_names = map(lambda n: "eater:" + n, eater_names)
428 feeder_element_names = map(lambda n: "feeder:" + n, self.feeder_names)
429 self.debug('we eat with eater elements %s' % eater_element_names)
430 self.debug('we feed with feeder elements %s' % feeder_element_names)
431 pipeline = self._expandElementNames(pipeline)
432
433 pipeline = self.parse_tmpl(pipeline, eater_element_names,
434 self.get_eater_template,
435 '%(tmpl)s ! %(pipeline)s')
436 pipeline = self.parse_tmpl(pipeline, feeder_element_names,
437 self.get_feeder_template,
438 '%(pipeline)s ! %(tmpl)s')
439 pipeline = " ".join(pipeline.split())
440
441 self.debug('pipeline for %s is %s' % (self.getName(), pipeline))
442 assert self.DELIMITER not in pipeline
443
444 return pipeline
445
447 queue = self.get_queue_string(eaterName)
448 check = ""
449 if self.checkTimestamp:
450 check += " check-imperfect-timestamp=1"
451 if self.checkOffset:
452 check += " check-imperfect-offset=1"
453 if check != "":
454 check = " ! identity name=%s-identity silent=TRUE %s" % (
455 eaterName, check)
456 depay = self.DEPAY_TMPL + check
457 if not queue:
458 return self.FDSRC_TMPL + ' ! ' + depay
459 else:
460 return self.FDSRC_TMPL + ' ! ' + queue + ' ! ' + depay
461
463 return self.FEEDER_TMPL
464
466 """
467 Return a parse-launch description of a queue, if this component
468 wants an input queue on this eater, or None if not
469 """
470 return None
471
472 ### BaseComponent interface implementation
474 """
475 Tell the component to start.
476 Whatever is using the component is responsible for making sure all
477 eaters have received their file descriptor to eat from.
478
479 @param clocking: tuple of (ip, port, base_time) of a master clock,
480 or None not to slave the clock
481 @type clocking: tuple(str, int, long) or None.
482 """
483 self.debug('ParseLaunchComponent.start')
484 if clocking:
485 self.info('slaving to master clock on %s:%d with base time %d' %
486 clocking)
487
488 if clocking:
489 self.set_master_clock(*clocking)
490
491 self.link()
492
493 return defer.succeed(None)
494
496 """
497 I am a part of a feed component for a specific group
498 of functionality.
499
500 @ivar name: name of the effect
501 @type name: string
502 @ivar component: component owning the effect
503 @type component: L{FeedComponent}
504 """
505 logCategory = "effect"
506
508 """
509 @param name: the name of the effect
510 """
511 self.name = name
512 self.setComponent(None)
513
515 """
516 Set the given component as the effect's owner.
517
518 @param component: the component to set as an owner of this effect
519 @type component: L{FeedComponent}
520 """
521 self.component = component
522 self.setUIState(component and component.uiState or None)
523
525 """
526 Set the given UI state on the effect. This method is ideal for
527 adding keys to the UI state.
528
529 @param state: the UI state for the component to use.
530 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
531 """
532 self.uiState = state
533
535 """
536 Get the component owning this effect.
537
538 @rtype: L{FeedComponent}
539 """
540 return self.component
541
543 """
544 This class provides for multi-input ParseLaunchComponents, such as muxers,
545 with a queue attached to each input.
546 """
547 QUEUE_SIZE_BUFFERS = 16
548
550 """
551 Return a gst-parse description of the muxer, which must be named 'muxer'
552 """
553 raise errors.NotImplementedError("Implement in a subclass")
554
558
560 sources = self.config['source']
561
562 pipeline = self.get_muxer_string(properties) + ' '
563 for eater in sources:
564 tmpl = '@ eater:%s @ ! muxer. '
565 pipeline += tmpl % eater
566
567 pipeline += 'muxer.'
568
569 return pipeline
570
572 # Firstly, ensure that any push in progress is guaranteed to return,
573 # by temporarily enlarging the queue
574 queuename = "eater:%s-queue" % feedId
575 queue = self.pipeline.get_by_name(queuename)
576
577 size = queue.get_property("max-size-buffers")
578 queue.set_property("max-size-buffers", size + 1)
579
580 # So, now it's guaranteed to return. However, we want to return the
581 # queue size to its original value. Doing this in a thread-safe manner
582 # is rather tricky...
583 def _block_cb(pad, blocked):
584 # This is called from streaming threads, but we don't do anything
585 # here so it's safe.
586 pass
587 def _underrun_cb(element):
588 # Called from a streaming thread. The queue element does not hold
589 # the queue lock when this is called, so we block our sinkpad,
590 # then re-check the current level.
591 pad = element.get_pad("sink")
592 pad.set_blocked_async(True, _block_cb)
593 level = element.get_property("current-level-buffers")
594 if level < self.QUEUE_SIZE_BUFFERS:
595 element.set_property('max-size-buffers',
596 self.QUEUE_SIZE_BUFFERS)
597 element.disconnect(signalid)
598 pad.set_blocked_async(False, _block_cb)
599
600 signalid = queue.connect("underrun", _underrun_cb)
601
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sun Mar 7 10:47:16 2010 | http://epydoc.sourceforge.net |