Package flumotion :: Package manager :: Module admin
[hide private]

Source Code for Module flumotion.manager.admin

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_admin -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle administrative clients 
 24  """ 
 25   
 26  import re 
 27  import os 
 28  import errno 
 29  from StringIO import StringIO 
 30   
 31  from twisted.internet import reactor 
 32  from twisted.python import failure 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.manager import base 
 37  from flumotion.common import errors, interfaces, log, planet, registry, debug 
 38  from flumotion.common import common 
 39  from flumotion.common.python import makedirs 
 40  from flumotion.monitor.nagios import util 
 41   
 42  # make Result and Message proxyable 
 43  from flumotion.common import messages 
 44   
 45  # make ComponentState proxyable 
 46  from flumotion.twisted import flavors 
 47  from flumotion.common import componentui 
 48   
 49  __version__ = "$Rev: 8731 $" 
 50   
 51   
 52  # FIXME: rename to Avatar since we are in the admin. namespace ? 
 53   
 54   
55 -class AdminAvatar(base.ManagerAvatar):
56 """ 57 I am an avatar created for an administrative client interface. 58 A reference to me is given (for example, to gui.AdminInterface) 59 when logging in and requesting an "admin" avatar. 60 I live in the manager. 61 """ 62 logCategory = 'admin-avatar' 63 64 # override pb.Avatar implementation so we can run admin actions 65
66 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
67 benignMethods = ('ping', ) 68 69 args = broker.unserialize(args) 70 kwargs = broker.unserialize(kwargs) 71 72 if message not in benignMethods: 73 self.vishnu.adminAction(self.remoteIdentity, message, args, kwargs) 74 75 return base.ManagerAvatar.perspectiveMessageReceivedUnserialised( 76 self, broker, message, args, kwargs)
77 78 ### pb.Avatar IPerspective methods 79
81 """ 82 Get the planet state. 83 84 @rtype: L{flumotion.common.planet.ManagerPlanetState} 85 """ 86 self.debug("returning planet state %r" % self.vishnu.state) 87 return self.vishnu.state
88
90 """ 91 Get the worker heaven state. 92 93 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState} 94 """ 95 self.debug("returning worker heaven state %r" % self.vishnu.state) 96 return self.vishnu.workerHeaven.state
97
98 - def perspective_componentStart(self, componentState):
99 """ 100 Start the given component. The component should be sleeping before 101 this. 102 103 @type componentState: L{planet.ManagerComponentState} 104 """ 105 self.debug('perspective_componentStart(%r)' % componentState) 106 return self.vishnu.componentCreate(componentState)
107
108 - def perspective_componentStop(self, componentState):
109 """ 110 Stop the given component. 111 If the component was sad, we clear its sad state as well, 112 since the stop was explicitly requested by the admin. 113 114 @type componentState: L{planet.ManagerComponentState} 115 """ 116 self.debug('perspective_componentStop(%r)' % componentState) 117 return self.vishnu.componentStop(componentState)
118
119 - def perspective_componentRestart(self, componentState):
120 """ 121 Restart the given component. 122 123 @type componentState: L{planet.ManagerComponentState} 124 """ 125 self.debug('perspective_componentRestart(%r)' % componentState) 126 d = self.perspective_componentStop(componentState) 127 d.addCallback(lambda *x: self.perspective_componentStart( 128 componentState)) 129 return d
130 131 # Generic interface to call into a component 132
133 - def perspective_componentCallRemote(self, componentState, methodName, 134 *args, **kwargs):
135 """ 136 Call a method on the given component on behalf of an admin client. 137 138 @param componentState: state of the component to call the method on 139 @type componentState: L{planet.ManagerComponentState} 140 @param methodName: name of the method to call. Gets proxied to 141 L{flumotion.component.component.""" \ 142 """BaseComponentMedium}'s remote_(methodName) 143 @type methodName: str 144 145 @rtype: L{twisted.internet.defer.Deferred} 146 """ 147 assert isinstance(componentState, planet.ManagerComponentState), \ 148 "%r is not a componentState" % componentState 149 150 if methodName == "start": 151 self.warning('forwarding "start" to perspective_componentStart') 152 return self.perspective_componentStart(componentState) 153 154 m = self.vishnu.getComponentMapper(componentState) 155 if not m: 156 self.warning('Component not mapped. Maybe deleted.') 157 raise errors.UnknownComponentError(componentState) 158 159 avatar = m.avatar 160 161 if not avatar: 162 self.warning('No avatar for %s, cannot call remote' % 163 componentState.get('name')) 164 raise errors.SleepingComponentError(componentState) 165 166 # XXX: Maybe we need to have a prefix, so we can limit what an 167 # admin interface can call on a component 168 try: 169 return avatar.mindCallRemote(methodName, *args, **kwargs) 170 except Exception, e: 171 msg = "exception on remote call %s: %s" % (methodName, 172 log.getExceptionMessage(e)) 173 self.warning(msg) 174 raise errors.RemoteMethodError(methodName, 175 log.getExceptionMessage(e))
176
178 """ 179 List components in the planet. Returns a list of avatar ids. 180 """ 181 componentStates = self.vishnu.state.getComponents() 182 avatar_ids = [common.componentId(c.get('parent').get('name'), 183 c.get('name')) 184 for c in componentStates] 185 return avatar_ids
186
187 - def perspective_componentInvoke(self, avatarId, methodName, 188 *args, **kwargs):
189 """ 190 Call a remote method on the component. 191 192 @param avatarId: the component avatar id 193 @type avatarId: str 194 @param methodName: name of the method to call 195 @type methodName: str 196 """ 197 component = util.findComponent(self.vishnu.state, avatarId) 198 if not component: 199 self.warning('No component with avatar id %s' % avatarId) 200 raise errors.UnknownComponentError(avatarId) 201 return self.perspective_componentCallRemote(component, methodName, 202 *args, **kwargs)
203
204 - def perspective_upstreamList(self, avatarId):
205 """ 206 List a component and its upstream components along with 207 types and worker hosts. 208 209 @param avatarId: the component avatar id 210 @type avatarId: str 211 """ 212 213 def get_eaters_ids(eaters_dic): 214 avatars = [] 215 for flow in eaters_dic.keys(): 216 comps = eaters_dic[flow] 217 for c in comps: 218 (name, what) = c[0].split(':') 219 avatars.append('/%s/%s' % (flow, name)) 220 return avatars
221 222 def create_response(components, workers): 223 comps = [] 224 for c in components: 225 workerName = c.get('workerName') 226 host = "unknown" 227 for w in workers: 228 if workerName == w.get('name'): 229 host = w.get('host') 230 break 231 comps.append((c.get('name'), c.get('type'), host)) 232 return comps
233 234 component = util.findComponent(self.vishnu.state, avatarId) 235 if not component: 236 self.warning('No component with avatar id %s' % avatarId) 237 raise errors.UnknownComponentError(avatarId) 238 239 eaters = component.get('config').get('eater', {}) 240 eaters_id = get_eaters_ids(eaters) 241 comps = [component] 242 while len(eaters_id) > 0: 243 eaters = {} 244 for i in eaters_id: 245 try: 246 compState = util.findComponent(self.vishnu.state, i) 247 comps.append(compState) 248 eaters.update(compState.get('config').get('eater', {})) 249 except Exception, e: 250 self.debug(log.getExceptionMessage(e)) 251 emsg = "Error retrieving component '%s'" % i 252 raise errors.UnknownComponentError(emsg) 253 eaters_id = get_eaters_ids(eaters) 254 255 workers = self.vishnu.workerHeaven.state.get('workers') 256 return create_response(comps, workers) 257
258 - def perspective_workerCallRemote(self, workerName, methodName, 259 *args, **kwargs):
260 """ 261 Call a remote method on the worker. 262 This is used so that admin clients can call methods from the interface 263 to the worker. 264 265 @param workerName: the worker to call 266 @type workerName: str 267 @param methodName: Name of the method to call. Gets proxied to 268 L{flumotion.worker.medium.WorkerMedium} 's 269 remote_(methodName) 270 @type methodName: str 271 """ 272 273 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % ( 274 workerName, methodName)) 275 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName) 276 277 # XXX: Maybe we need to a prefix, so we can limit what an admin 278 # interface can call on a worker 279 try: 280 return workerAvatar.mindCallRemote(methodName, *args, **kwargs) 281 except Exception, e: 282 self.warning("exception on remote call: %s" % 283 log.getExceptionMessage(e)) 284 return failure.Failure(errors.RemoteMethodError(methodName, 285 log.getExceptionMessage(e)))
286
287 - def perspective_getEntryByType(self, componentType, entryType):
288 """ 289 Get the entry point for a piece of bundled code in a component by type. 290 @param componentType: the component 291 @type componentType: a string 292 @param entryType: location of the entry point 293 @type entryType: a string 294 Returns: a (filename, methodName) tuple, or raises:: 295 - NoBundleError if the entry location does not exist 296 """ 297 assert componentType is not None 298 299 self.debug('getting entry of type %s for component type %s', 300 entryType, componentType) 301 302 try: 303 componentRegistryEntry = registry.getRegistry().getComponent( 304 componentType) 305 # FIXME: add logic here for default entry points and functions 306 entry = componentRegistryEntry.getEntryByType(entryType) 307 except KeyError: 308 self.warning("Could not find bundle for %s(%s)" % ( 309 componentType, entryType)) 310 raise errors.NoBundleError("entry type %s in component type %s" % 311 (entryType, componentType)) 312 313 filename = os.path.join(componentRegistryEntry.base, entry.location) 314 self.debug('entry point is in file path %s and function %s' % ( 315 filename, entry.function)) 316 return (filename, entry.function)
317
318 - def perspective_getPlugEntry(self, plugType, entryType):
319 """ 320 Get the entry point for a piece of bundled code in a plug by type. 321 @param plugType: the plug 322 @type plugType: a string 323 @param entryType: location of the entry point 324 @type entryType: a string 325 Returns: a (filename, methodName) tuple, or raises:: 326 - NoBundleError if the entry location does not exist 327 """ 328 assert plugType is not None 329 330 self.debug('getting entry of type %s for plug type %s', 331 entryType, plugType) 332 333 try: 334 plugRegistryEntry = registry.getRegistry().getPlug(plugType) 335 entry = plugRegistryEntry.getEntryByType(entryType) 336 except KeyError: 337 self.warning("Could not find bundle for %s(%s)" % ( 338 plugType, entryType)) 339 raise errors.NoBundleError("entry type %s in plug type %s" % 340 (entryType, plugType)) 341 342 self.debug('entry point is in file path %s and function %s' % ( 343 entry.location, entry.function)) 344 return (entry.location, entry.function)
345
346 - def perspective_getConfiguration(self):
347 """ 348 Get the configuration of the manager as an XML string. 349 350 @rtype: str 351 """ 352 return self.vishnu.getConfiguration()
353
354 - def perspective_getScenarioByType(self, scenarioType, entryType):
355 """ 356 Remote method that gets the scenario of a given type. 357 358 @param scenarioType: the component 359 @type scenarioType: a string 360 Returns: a (filename, methodName) tuple, or raises:: 361 - NoBundleError if the entry location does not exist 362 """ 363 assert scenarioType is not None 364 365 self.debug('getting entry of type %s for scenario type %s', 366 entryType, scenarioType) 367 368 try: 369 scenarioRegistryEntry = registry.getRegistry().getScenarioByType( 370 scenarioType) 371 # FIXME: add logic here for default entry points and functions 372 entry = scenarioRegistryEntry.getEntryByType(entryType) 373 except KeyError: 374 self.warning("Could not find bundle for %s(%s)" % ( 375 scenarioType, entryType)) 376 raise errors.NoBundleError("entry type %s in component type %s" % 377 (entryType, scenarioType)) 378 379 filename = os.path.join(scenarioRegistryEntry.getBase(), 380 entry.getLocation()) 381 self.debug('entry point is in file path %s and function %s' % ( 382 filename, entry.function)) 383 384 return (filename, entry.getFunction())
385
386 - def perspective_getScenarios(self):
387 """ 388 Get all the scenarios defined on the registry. 389 390 @rtype : List of L{IScenarioAssistantPlugin} 391 """ 392 r = registry.getRegistry() 393 return r.getScenarios()
394
395 - def _saveFlowFile(self, filename):
396 """Opens a file that the flow should be written to. 397 398 Note that the returned file object might be an existing file, 399 opened in append mode; if the loadConfiguration operation 400 succeeds, the file should first be truncated before writing. 401 """ 402 self.vishnu.adminAction(self.remoteIdentity, 403 '_saveFlowFile', (), {}) 404 405 def ensure_sane(name, extra=''): 406 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name): 407 raise errors.ConfigError, \ 408 'Invalid planet or saveAs name: %s' % name
409 410 ensure_sane(self.vishnu.configDir, '/') 411 ensure_sane(filename) 412 directory = os.path.join(self.vishnu.configDir, "flows") 413 self.debug('told to save flow as %s/%s.xml', directory, filename) 414 try: 415 makedirs(directory, 0770) 416 except OSError, e: 417 if e.errno != errno.EEXIST: 418 raise e 419 prev = os.umask(0007) 420 output = open(os.path.join(directory, filename + '.xml'), 'a') 421 os.umask(prev) 422 return output 423
424 - def perspective_loadConfiguration(self, xml, saveAs=None):
425 """ 426 Load the given XML configuration into the manager. If the 427 optional saveAs parameter is passed, the XML snippet will be 428 saved to disk in the manager's flows directory. 429 430 @param xml: the XML configuration snippet. 431 @type xml: str 432 @param saveAs: The name of a file to save the XML as. 433 @type saveAs: str 434 """ 435 436 if saveAs: 437 output = self._saveFlowFile(saveAs) 438 439 # Update the registry if needed, so that new/changed component types 440 # can be parsed. 441 registry.getRegistry().verify() 442 443 f = StringIO(xml) 444 res = self.vishnu.loadComponentConfigurationXML(f, self.remoteIdentity) 445 f.close() 446 447 if saveAs: 448 449 def success(res): 450 self.debug('loadConfiguration succeeded, writing flow to %r', 451 output) 452 output.truncate(0) 453 output.write(xml) 454 output.close() 455 return res
456 457 def failure(res): 458 self.debug('loadConfiguration failed, leaving %r as it was', 459 output) 460 output.close() 461 return res 462 res.addCallbacks(success, failure) 463 464 return res 465
466 - def perspective_loadComponent(self, componentType, componentId, 467 componentLabel, properties, workerName, 468 plugs=None, eaters=None, 469 isClockMaster=None, virtualFeeds=None):
470 """ 471 Load a component into the manager configuration. 472 Returns a deferred that will be called with the component state. 473 474 @param componentType: The registered type of the component to be added 475 @type componentType: str 476 @param componentId: The identifier of the component to add, 477 should be created by the function 478 L{flumotion.common.common.componentId} 479 @type componentId: str 480 @param componentLabel: The human-readable label of the component. 481 if None, no label will be set. 482 @type componentLabel: str or None 483 @param properties: List of property name-value pairs. 484 See L{flumotion.common.config.buildPropertyDict} 485 @type properties: list of (str, object) 486 @param workerName: the name of the worker where the added 487 component should run. 488 @type workerName: str 489 @param plugs: List of plugs, as type-propertyList pairs. 490 See {flumotion.manager.config.buildPlugsSet}. 491 @type plugs: [(str, [(str, object)])] 492 @param eaters: List of (eater name, feed ID) pairs. 493 See L{flumotion.manager.config.buildEatersDict} 494 @type eaters: [(str, str)] 495 @param isClockMaster: True if the component to be added must be 496 a clock master. Passing False here means 497 that the manager will choose what 498 component, if any, will be clock master 499 for this flow. 500 @type isClockMaster: bool 501 @param virtualFeeds: List of (virtual feed, feeder name) pairs. 502 See L{flumotion.manager.config.buildVirtualFeeds} 503 @type virtualFeeds: [(str, str)] 504 """ 505 return self.vishnu.loadComponent(self.remoteIdentity, componentType, 506 componentId, componentLabel, 507 properties, workerName, 508 plugs or [], eaters or [], 509 isClockMaster, virtualFeeds or [])
510
511 - def perspective_deleteFlow(self, flowName):
512 return self.vishnu.deleteFlow(flowName)
513
514 - def perspective_deleteComponent(self, componentState):
515 """Delete a component from the manager. 516 517 A component can only be deleted when it is sleeping or sad. It 518 is the caller's job to ensure this is the case; calling this 519 function on a running component will raise a ComponentBusyError. 520 521 @returns: a deferred that will fire when all listeners have been 522 notified of the component removal 523 """ 524 return self.vishnu.deleteComponent(componentState)
525
526 - def perspective_getVersions(self):
527 return debug.getVersions()
528
529 - def perspective_cleanComponents(self):
530 return self.vishnu.emptyPlanet()
531
532 - def perspective_getWizardEntries(self, types=None, provides=None, 533 accepts=None):
534 """ 535 Fetches the wizard entries which matches the parameters sent in 536 537 @param types: list of component types to fetch, is usually 538 something like ['video-producer'] or ['audio-encoder'] 539 @type types: list of strings 540 @param provides: formats provided, eg ['jpeg', 'speex'] 541 @type provides: list of strings 542 @param accepts: formats accepted, eg ['theora'] 543 @type accepts: list of strings 544 @returns: L{componentui.WizardEntryState} 545 """ 546 547 def extract(wizards): 548 for wizard in wizards: 549 if types is not None: 550 if wizard.type not in types: 551 continue 552 if provides is not None: 553 for format in wizard.provides: 554 if format.media_type in provides: 555 break 556 else: 557 continue 558 if accepts is not None: 559 for format in wizard.accepts: 560 if format.media_type in accepts: 561 break 562 else: 563 continue 564 yield wizard
565 566 retval = [] 567 r = registry.getRegistry() 568 for component in r.getComponents(): 569 retval += extract(component.wizards) 570 for plug in r.getPlugs(): 571 retval += extract(plug.wizards) 572 del r 573 574 return retval 575
576 - def perspective_getComponentEntry(self, componentType):
577 """Fetches a ComponentRegistryEntry given a componentType 578 @param componentType: component type 579 @type componentType: string 580 @returns: the component 581 @rtype: L{ComponentRegistryEntry} 582 """ 583 try: 584 componentRegistryEntry = registry.getRegistry().getComponent( 585 componentType) 586 except KeyError: 587 return None 588 return componentRegistryEntry
589
590 - def perspective_invokeOnComponents(self, componentType, methodName, 591 *args, **kwargs):
592 return self.vishnu.invokeOnComponents(componentType, methodName, 593 *args, **kwargs)
594 595
596 -class AdminHeaven(base.ManagerHeaven):
597 """ 598 I interface between the Manager and administrative clients. 599 For each client I create an L{AdminAvatar} to handle requests. 600 I live in the manager. 601 """ 602 603 logCategory = "admin-heaven" 604 implements(interfaces.IHeaven) 605 avatarClass = AdminAvatar
606