| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com).
6 # All rights reserved.
7
8 # This file may be distributed and/or modified under the terms of
9 # the GNU General Public License version 2 as published by
10 # the Free Software Foundation.
11 # This file is distributed without any warranty; without even the implied
12 # warranty of merchantability or fitness for a particular purpose.
13 # See "LICENSE.GPL" in the source distribution for more information.
14
15 # Licensees having purchased or holding a valid Flumotion Advanced
16 # Streaming Server license may use this file in accordance with the
17 # Flumotion Advanced Streaming Server Commercial License Agreement.
18 # See "LICENSE.Flumotion" in the source distribution for more information.
19
20 # Headers in this file shall remain intact.
21
22 """
23 model abstraction for administration clients supporting different views
24 """
25
26 import sys
27 import gobject
28
29 from twisted.spread import pb
30 from twisted.internet import error, defer, reactor
31 from twisted.cred import error as crederror
32 from twisted.python import rebuild, reflect, failure
33
34 from flumotion.common import common, errors, interfaces, log, pygobject
35 from flumotion.common import keycards, worker, planet, medium, package, messages
36 # serializable worker and component state
37 from flumotion.twisted import flavors
38 from flumotion.twisted.defer import defer_generator_method
39
40 from flumotion.configure import configure
41 from flumotion.common import reload, connection
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44 from flumotion.twisted.compat import implements
45
46 from flumotion.common.pygobject import gsignal, gproperty
47
48 from flumotion.common.messages import N_
49 T_ = messages.gettexter('flumotion')
50
52 perspectiveInterface = interfaces.IAdminMedium
53
55 """
56 @type medium: AdminModel
57 """
58 fpb.ReconnectingFPBClientFactory.__init__(self)
59 self.medium = medium
60 self.maxDelay = 20
61
62 self.extraTenacious = extraTenacious
63 self.hasBeenConnected = 0
64
66 self.hasBeenConnected = 1
67
68 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
69
71 """
72 @param reason: L{twisted.spread.pb.failure.Failure}
73 """
74 if reason.check(error.DNSLookupError):
75 self.debug('DNS lookup error')
76 if not self.extraTenacious:
77 self.medium.connectionFailed(reason)
78 return
79 elif (reason.check(error.ConnectionRefusedError)
80 or reason.check(error.ConnectError)):
81 # If we're logging in for the first time, we want to make this a
82 # real error; we present a dialog, etc.
83 # However, if we fail later on (e.g. manager shut down, and
84 # hasn't yet been restarted), we want to keep trying to reconnect,
85 # so we just log a message.
86 self.debug("Error connecting: %s", log.getFailureMessage(reason))
87 if self.hasBeenConnected:
88 self.log("we've been connected before though, so going "
89 "to retry")
90 # fall through
91 elif self.extraTenacious:
92 self.log("trying again due to +100 tenacity")
93 # fall through
94 else:
95 self.log("telling medium about connection failure")
96 self.medium.connectionFailed(reason)
97 # return
98 return
99
100 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self,
101 connector, reason)
102 # delay is now updated
103 self.debug("will try reconnect in %f seconds" % self.delay)
104
105 # vmethod implementation
107 yield d
108
109 try:
110 try:
111 result = d.value()
112 assert result
113 except Exception, e:
114 if self.extraTenacious:
115 self.debug('connection problem: %s',
116 log.getExceptionMessage(e))
117 self.debug('we are tenacious, so trying again later')
118 self.disconnect()
119 yield None
120 else:
121 raise
122 # if it's not a reference, we need to respond to a
123 # challenge...
124 if not isinstance(result, pb.RemoteReference):
125 keycard = result
126 keycard.setPassword(self.passwd)
127 self.log("_loginCallback: responding to challenge")
128 d = self.login(keycard, self.medium, interfaces.IAdminMedium)
129 yield d
130 result = d.value()
131
132 self.medium.setRemoteReference(result)
133
134 except errors.ConnectionFailedError:
135 self.debug("emitting connection-failed")
136 self.medium.emit('connection-failed', "I failed my master")
137 self.debug("emitted connection-failed")
138
139 except errors.ConnectionRefusedError:
140 self.debug("emitting connection-refused")
141 self.medium.emit('connection-refused')
142 self.debug("emitted connection-refused")
143
144 except crederror.UnauthorizedLogin:
145 # FIXME: unauthorized login emit !
146 self.debug("emitting connection-refused")
147 self.medium.emit('connection-refused')
148 self.debug("emitted connection-refused")
149
150 except Exception, e:
151 self.medium.emit('connection-error', e)
152 self.medium._defaultErrback(failure.Failure(e))
153
154 gotDeferredLogin = defer_generator_method(gotDeferredLogin)
155
156 # FIXME: stop using signals, we can provide a richer interface with actual
157 # objects and real interfaces for the views a model communicates with
159 """
160 I live in the admin client.
161 I am a data model for any admin view implementing a UI to
162 communicate with one manager.
163 I send signals when things happen.
164
165 Manager calls on us through L{flumotion.manager.admin.AdminAvatar}
166 """
167 gsignal('connected')
168 gsignal('disconnected')
169 gsignal('connection-refused')
170 gsignal('connection-failed', str)
171 gsignal('connection-error', object)
172 gsignal('component-property-changed', str, str, object)
173 gsignal('reloading', str)
174 gsignal('message', str)
175 gsignal('update')
176
177 logCategory = 'adminmodel'
178
179 implements(interfaces.IAdminMedium, flavors.IStateListener)
180
181 # Public instance variables (read-only)
182 planet = None
183
185 self.__gobject_init__()
186
187 # All of these instance variables are private. Cuidado cabrones!
188 self.authenticator = authenticator
189 self.host = self.port = self.use_insecure = None
190
191 self.managerId = '<uninitialized>'
192
193 self.state = 'disconnected'
194 self.clientFactory = self._makeFactory(authenticator)
195 # 20 secs max for an admin to reconnect
196 self.clientFactory.maxDelay = 20
197
198 self._components = {} # dict of components
199 self.planet = None
200 self._workerHeavenState = None
201
202 # a method so mock testing frameworks can override it
204 # FIXME: this needs further refactoring, so we only ever pass
205 # an authenticator. For that we need to fix all users of this
206 # class too
207 factory = AdminClientFactory(self)
208 factory.startLogin(authenticator)
209 return factory
210
213 'Connect to a host.'
214 self.host = host
215 self.port = port
216 self.use_insecure = use_insecure
217
218 # the intention here is to give an id unique to the manager --
219 # if a program is adminning multiple managers, this id should
220 # tell them apart (and identify duplicates)
221 info = connection.PBConnectionInfo(host, port, not use_insecure,
222 self.authenticator)
223 self.managerId = str(info)
224
225 self.info('Connecting to manager %s with %s',
226 self.managerId, use_insecure and 'TCP' or 'SSL')
227 if keep_trying:
228 self.info('AdminClientFactory, now with extra tenacity')
229 self.clientFactory.extraTenacious = True
230
231 if use_insecure:
232 reactor.connectTCP(host, port, self.clientFactory)
233 else:
234 from twisted.internet import ssl
235 reactor.connectSSL(host, port, self.clientFactory,
236 ssl.ClientContextFactory())
237
238 def connected(model, d, ids):
239 map(model.disconnect, ids)
240 d.callback(model)
241
242 def connection_refused(model, d, ids):
243 map(model.disconnect, ids)
244 d.errback(errors.ConnectionRefusedError())
245
246 def connection_failed(model, reason, d, ids):
247 map(model.disconnect, ids)
248 d.errback(errors.ConnectionFailedError(reason))
249
250 def connection_error(model, exception, d, ids):
251 map(model.disconnect, ids)
252 d.errback(exception)
253
254 d = defer.Deferred()
255 ids = []
256 ids.append(self.connect('connected', connected, d, ids))
257 ids.append(self.connect('connection-refused',
258 connection_refused, d, ids))
259 ids.append(self.connect('connection-failed',
260 connection_failed, d, ids))
261 ids.append(self.connect('connection-error',
262 connection_error, d, ids))
263 return d
264
265 # default Errback
266 # FIXME: we can set it up with a list of types not to warn for ?
268 self.debug('Possibly unhandled deferred failure: %r (%s)' % (
269 failure, failure.getErrorMessage()))
270 return failure
271
273 self.debug('asked to log in again')
274 self.clientFactory.stopTrying()
275 # this also makes it try to connect again
276 self.clientFactory.resetDelay()
277 self.connectToHost(self.host, self.port, self.use_insecure)
278
279 # FIXME: give these three sensible names
282
286
287 # used in fgc
291
293 # called by client factory
294 if failure.check(error.DNSLookupError):
295 message = "Could not look up host '%s'." % self.host
296 elif (failure.check(error.ConnectionRefusedError)
297 or failure.check(error.ConnectionRefusedError)):
298 message = ("Could not connect to host '%s' on port %d."
299 % (self.host, self.port))
300 else:
301 message = ("Unexpected failure.\nDebug information: %s"
302 % log.getFailureMessage (failure))
303 self.debug('emitting connection-failed')
304 self.emit('connection-failed', message)
305 self.debug('emitted connection-failed')
306
308 self.debug("setRemoteReference %r" % remoteReference)
309 def writeConnection():
310 if not (self.authenticator.username
311 and self.authenticator.password):
312 self.log('not caching connection information')
313 return
314 s = ''.join(['<connection>',
315 '<host>%s</host>' % self.host,
316 '<manager>%s</manager>' % self.planet.get('name'),
317 '<port>%d</port>' % self.port,
318 '<use_insecure>%d</use_insecure>'
319 % (self.use_insecure and 1 or 0),
320 '<user>%s</user>' % self.authenticator.username,
321 '<passwd>%s</passwd>' % self.authenticator.password,
322 '</connection>'])
323
324 import os
325 import md5
326 sum = md5.new(s).hexdigest()
327 f = os.path.join(configure.registrydir, '%s.connection' % sum)
328 try:
329 h = open(f, 'w')
330 h.write(s)
331 h.close()
332 except Exception, e:
333 self.info('failed to write connection cache file %s: %s',
334 f, log.getExceptionMessage(e))
335
336 # chain up
337 medium.PingingMedium.setRemoteReference(self, remoteReference)
338
339 # fixme: push the disconnect notification upstream
340 def remoteDisconnected(remoteReference):
341 self.debug("emitting disconnected")
342 self.state = 'disconnected'
343 self.emit('disconnected')
344 self.debug("emitted disconnected")
345 self.remote.notifyOnDisconnect(remoteDisconnected)
346
347 d = self.callRemote('getPlanetState')
348 yield d
349 self.planet = d.value()
350 # monkey, Monkey, MONKEYPATCH!!!!!
351 self.planet.admin = self
352 self.debug('got planet state')
353
354 d = self.callRemote('getWorkerHeavenState')
355 yield d
356 self._workerHeavenState = d.value()
357 self.debug('got worker state')
358
359 writeConnection()
360
361 self.debug('Connected to manager and retrieved all state')
362 self.state = 'connected'
363 self.emit('connected')
364 setRemoteReference = defer_generator_method(setRemoteReference)
365
366 ### pb.Referenceable methods
369
370 # IStateListener interface
373
376
377 # if a flow gets added to a planet, add ourselves as a listener
378
381
382 ### model functions; called by UI's to send requests to manager or comp
383
384 ## view management functions
385 # FIXME: what is this crap ? strings as enums ?
387 return self.state == 'connected'
388
390 self.debug('shutting down')
391 if self.state != 'disconnected':
392 self.clientFactory.disconnect()
393 self.clientFactory.stopTrying()
394
395 ## generic remote call methods
397 """
398 Call the given method on the given component with the given args.
399
400 @param componentState: component to call the method on
401 @type componentState: L{flumotion.common.planet.AdminComponentState}
402 @param methodName: name of method to call; serialized to a
403 remote_methodName on the worker's medium
404
405 @rtype: L{twisted.internet.defer.Deferred}
406 """
407 assert isinstance(componentState, planet.AdminComponentState), \
408 "componentState %r is of the wrong type calling %s" % (
409 componentState, methodName)
410 componentName = componentState.get('name')
411
412 self.debug('Calling remote method %s on component %s' % (
413 methodName, componentName))
414 d = self.callRemote('componentCallRemote',
415 componentState, methodName,
416 *args, **kwargs)
417 d.addCallback(self._callRemoteCallback, methodName, componentName)
418 def errback(failure):
419 msg = None
420 if failure.check(errors.NoMethodError):
421 msg = "Remote method '%s' does not exist." % methodName
422 msg += "\n" + failure.value
423 else:
424 msg = log.getFailureMessage(failure)
425
426 # FIXME: we probably need a nicer way of getting component
427 # messages shown from the admin model, but this allows us to
428 # make sure every type of admin has these messages
429 self.warning(msg)
430 m = messages.Warning(T_(N_("Internal error in component.")),
431 debug=msg)
432 componentState.observe_append('messages', m)
433 return failure
434
435 d.addErrback(errback)
436 # FIXME: dialog for other errors ?
437 return d
438
440 self.debug('Called remote method %s on component %s successfully' % (
441 methodName, componentName))
442 return result
443
445 """
446 Call the the given method on the given worker with the given args.
447
448 @param workerName: name of the worker to call the method on
449 @param methodName: name of method to call; serialized to a
450 remote_methodName on the worker's medium
451
452 @rtype: L{twisted.internet.defer.Deferred}
453 """
454 r = common.argRepr(args, kwargs, max=20)
455 self.debug('calling remote method %s(%s) on worker %s' % (methodName, r,
456 workerName))
457 d = self.callRemote('workerCallRemote', workerName,
458 methodName, *args, **kwargs)
459 d.addErrback(self._callRemoteErrback, "worker",
460 workerName, methodName)
461 return d
462
464 print "THOMAS: errback: failure %r" % failure
465 if failure.check(errors.NoMethodError):
466 self.warning("method '%s' on component '%s' does not exist, "
467 "component bug" % (methodName, name))
468 else:
469 self.debug("passing through failure on remote call to %s(%s): %r" %
470 (name, methodName, failure))
471
472 # FIXME: throw up some sort of dialog with debug info
473 return failure
474
475 ## component remote methods
477 """
478 @type componentState: L{flumotion.common.planet.AdminComponentState}
479 """
480 return self.componentCallRemote(componentState, 'setElementProperty',
481 element, property, value)
482
484 """
485 @type componentState: L{flumotion.common.planet.AdminComponentState}
486 """
487 return self.componentCallRemote(componentState, 'getElementProperty',
488 element, property)
489
490 ## reload methods for everything
492 name = reflect.filenameToModuleName(__file__)
493
494 self.info('Reloading admin code')
495 self.debug("rebuilding '%s'" % name)
496 rebuild.rebuild(sys.modules[name])
497 self.debug("reloading '%s'" % name)
498 reload.reload()
499 self.info('Reloaded admin code')
500
502 # XXX: reload admin.py too
503 d = defer.execute(self.reloadAdmin)
504
505 d = d.addCallback(lambda result, self: self.reloadManager(), self)
506 d.addErrback(self._defaultErrback)
507 # stack callbacks so that a new one only gets sent after the previous
508 # one has completed
509 for name in self._components.keys():
510 d = d.addCallback(lambda result, name: self.reloadComponent(name), name)
511 d.addErrback(self._defaultErrback)
512 return d
513
514 # used by other admin clients
515 # FIXME: isn't it great how hard it is to guess what duckport is ?
517 name = reflect.filenameToModuleName(__file__)
518
519 self.info("rebuilding '%s'" % name)
520 rebuild.rebuild(sys.modules[name])
521
522 d = self.reloadManager()
523 yield d
524 try:
525 d.value()
526 duckport.write('Reloaded manager')
527 except Exception, e:
528 duckport.write('Failed to reload manager: %s' % e)
529
530 for name in self._components.keys():
531 d = self.reloadComponent(name)
532 yield d
533 try:
534 d.value()
535 duckport.write('Reloaded component %s' % name)
536 except Exception, e:
537 duckport.write('Failed to reload component %s: %s' % (name, e))
538 duckport.close()
539 reload_async = defer_generator_method(reload_async)
540
542 """
543 Tell the manager to reload its code.
544
545 @rtype: deferred
546 """
547 def _reloaded(result, self):
548 self.info("reloaded manager code")
549
550 self.emit('reloading', 'manager')
551 self.info("reloading manager code")
552 d = self.callRemote('reloadManager')
553 d.addCallback(_reloaded, self)
554 d.addErrback(self._defaultErrback)
555 return d
556
558 """
559 Tell the manager to reload code for a component.
560
561 @type componentState: L{flumotion.common.planet.AdminComponentState}
562
563 @rtype: L{twisted.internet.defer.Deferred}
564 """
565 def _reloaded(result, self, state):
566 self.info("reloaded component %s code" % state.get('name'))
567
568 name = componentState.get('name')
569 self.info("reloading component %s code" % name)
570 self.emit('reloading', name)
571 d = self.callRemote('reloadComponent', componentState)
572 d.addCallback(_reloaded, self, componentState)
573 d.addErrback(self._defaultErrback)
574 return d
575
576 ## manager remote methods
578 return self.callRemote('loadConfiguration', xml_string)
579
581 return self.callRemote('getConfiguration')
582
584 return self.callRemote('cleanComponents')
585
586 # function to get remote code for admin parts
587 # FIXME: rename slightly ?
588 # FIXME: still have hard-coded os.path.join stuff in here for md5sum,
589 # move to bundleloader ?
591 """
592 Do everything needed to set up the entry point for the given
593 component and type, including transferring and setting up bundles.
594
595 Caller is responsible for adding errbacks to the deferred.
596
597 Returns: a deferred returning (entryPath, filename, methodName) with
598 entryPath: the full local path to the bundle's base
599 fileName: the relative location of the bundled file
600 methodName: the method to instantiate with
601 """
602 d = self.callRemote('getEntryByType', componentState, type)
603 yield d
604
605 fileName, methodName = d.value()
606
607 self.debug("entry for %r of type %s is in file %s and method %s" % (
608 componentState, type, fileName, methodName))
609 d = self.bundleLoader.getBundles(fileName=fileName)
610 yield d
611
612 name, bundlePath = d.value()[-1]
613 yield (bundlePath, fileName, methodName)
614 getEntry = defer_generator_method(getEntry)
615
616 ## worker remote methods
618 d = self.workerCallRemote(workerName, 'checkElements', elements)
619 d.addErrback(self._defaultErrback)
620 return d
621
623 d = self.workerCallRemote(workerName, 'checkImport', moduleName)
624 d.addErrback(self._defaultErrback)
625 return d
626
628 """
629 Run the given function and args on the given worker. If the
630 worker does not already have the module, or it is out of date,
631 it will be retrieved from the manager.
632
633 @rtype: L{twisted.internet.defer.Deferred} firing an
634 L{flumotion.common.messages.Result}
635 """
636 return self.workerCallRemote(workerName, 'runFunction', moduleName,
637 functionName, *args, **kwargs)
638
639 # FIXME: this should not be allowed to be called, move away
640 # by abstracting callers further
644 getComponents = get_components
645
647 self._workerHeavenState = state
648
651
652 pygobject.type_register(AdminModel)
653
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sun Mar 7 10:47:31 2010 | http://epydoc.sourceforge.net |