Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion-launch: A gst-launch analog for Flumotion. 
 24   
 25  The goal of flumotion-launch is to provide an easy way for testing 
 26  flumotion components, without involving much of Flumotion's core code. 
 27   
 28  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 29  into a component graph, and starts the components. An example would be:: 
 30   
 31    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 32   
 33  You can also set properties:: 
 34   
 35    flumotion-launch videotest framerate=15/2 
 36   
 37  You can link specific feeders as well:: 
 38   
 39    flumotion-launch firewire .audio ! vorbis-encoder 
 40    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 41   
 42  Components can be backreferenced using their names:: 
 43   
 44    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 45                     audiotest0. ! ogg-muxer0. 
 46   
 47  In addition, components can have plugs:: 
 48   
 49    flumotion-launch http-streamer /requestlogger-file,logfile=/dev/stdout 
 50   
 51  Compound properties can be specified with: 
 52   
 53    propname=[subname1=value1,subname2=[subsubname1=subsubvalue1]] 
 54   
 55  Characters '\', '[' and ']' can be escaped with '\' 
 56  to remove there special meaning. 
 57   
 58  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 59  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 60  There is no depgraph, no feed server, no job process. Although it might 
 61  be useful in the future to add a way to use the standard interfaces to 
 62  start components via admin, manager, worker, and job instances, this 
 63  low-level interface is useful in debugging problems and should be kept. 
 64  """ 
 65   
 66   
 67  import os 
 68  import sys 
 69   
 70  from twisted.python import reflect 
 71  from twisted.internet import reactor, defer 
 72   
 73  from flumotion.common import log, common, registry, errors, messages 
 74  from flumotion.common import i18n 
 75  from flumotion.common.options import OptionParser 
 76  from flumotion.configure import configure 
 77  from flumotion.twisted import flavors 
 78   
 79  from flumotion.launch import parse 
 80   
 81  from gettext import gettext as _ 
 82   
 83  __version__ = "$Rev: 8290 $" 
 84  _headings = { 
 85      messages.ERROR: _('Error'), 
 86      messages.WARNING: _('Warning'), 
 87      messages.INFO: _('Note')} 
 88   
 89   
90 -def err(x):
91 sys.stderr.write(x + '\n') 92 raise SystemExit(1)
93 94
95 -class ComponentWrapper(object, log.Loggable):
96 logCategory = "compwrapper" 97
98 - def __init__(self, config):
99 self.name = config['name'] 100 self.config = config 101 self.procedure = self._getProcedure(config['type']) 102 self.component = None
103
104 - def _getProcedure(self, type):
105 r = registry.getRegistry() 106 c = r.getComponent(type) 107 try: 108 entry = c.getEntryByType('component') 109 except KeyError: 110 err('Component %s has no component entry' % self.name) 111 importname = entry.getModuleName(c.getBase()) 112 try: 113 module = reflect.namedAny(importname) 114 except Exception, e: 115 err('Could not load module %s for component %s: %s' 116 % (importname, self.name, e)) 117 return getattr(module, entry.getFunction())
118
119 - def instantiate(self):
120 errors = [] 121 122 def haveError(value): 123 translator = i18n.Translator() 124 localedir = os.path.join(configure.localedatadir, 'locale') 125 # FIXME: add locales as messages from domains come in 126 translator.addLocaleDir(configure.PACKAGE, localedir) 127 print "%s: %s" % (_headings[value.level], 128 translator.translate(value)) 129 if value.debug: 130 print "Debug information:", value.debug 131 errors.append(value)
132 133 self.component = self.procedure(self.config, 134 haveError=haveError) 135 return not bool(errors)
136
137 - def provideMasterClock(self, port):
138 # rtype: defer.Deferred 139 d = self.component.provide_master_clock(port) 140 return d
141
142 - def set_master_clock(self, ip, port, base_time):
143 return self.component.set_master_clock(ip, port, base_time)
144
145 - def stop(self):
146 return self.component.stop()
147
148 - def feedToFD(self, feedName, fd):
149 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd)) 150 return self.component.feedToFD(feedName, fd, os.close)
151
152 - def eatFromFD(self, eaterAlias, feedId, fd):
153 self.debug('eatFromFD(eaterAlias=%s, feedId=%s, %d)', 154 eaterAlias, feedId, fd) 155 return self.component.eatFromFD(eaterAlias, feedId, fd)
156 157
158 -def make_pipes(wrappers):
159 fds = {} # feedcompname:feeder => (fd, start()) 160 wrappersByName = dict([(wrapper.name, wrapper) 161 for wrapper in wrappers]) 162 163 def starter(wrapper, feedName, write): 164 return lambda: wrapper.feedToFD(feedName, write)
165 for wrapper in wrappers: 166 eaters = wrapper.config.get('eater', {}) 167 for eaterName in eaters: 168 for feedId, eaterAlias in eaters[eaterName]: 169 compName, feederName = common.parseFeedId(feedId) 170 read, write = os.pipe() 171 log.debug('launch', '%s: read from fd %d, write to fd %d', 172 feedId, read, write) 173 start = starter(wrappersByName[compName], feederName, write) 174 fds[feedId] = (read, start) 175 return fds 176 177
178 -def start_components(wrappers, fds):
179 # figure out the links and start the components 180 181 def provide_clock(): 182 # second phase: clocking 183 need_sync = [x for x in wrappers if x.config['clock-master']] 184 185 if need_sync: 186 master = None 187 for x in need_sync: 188 if x.config['clock-master'] == x.config['avatarId']: 189 master = x 190 break 191 assert master 192 need_sync.remove(master) 193 d = master.provideMasterClock(7600 - 1) # hack! 194 195 def addNeedSync(clocking): 196 return need_sync, clocking
197 d.addCallback(addNeedSync) 198 return d 199 else: 200 return defer.succeed((None, None)) 201 202 def do_start(synchronization, wrapper): 203 need_sync, clocking = synchronization 204 205 # start it up, with clocking data only if it needs it 206 eaters = wrapper.config.get('eater', {}) 207 for eaterName in eaters: 208 for feedId, eaterAlias in eaters[eaterName]: 209 read, start = fds[feedId] 210 wrapper.eatFromFD(eaterAlias, feedId, read) 211 start() 212 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 213 clocking = None 214 if clocking: 215 wrapper.set_master_clock(*clocking) 216 return synchronization 217 218 def do_stop(failure): 219 for wrapper in wrappers: 220 wrapper.stop() 221 return failure 222 223 for wrapper in wrappers: 224 if not wrapper.instantiate(): 225 # we don't have a ComponentState, so we cheat and give the 226 # exception a wrapper 227 return defer.fail(errors.ComponentStartError(wrapper)) 228 d = provide_clock() 229 for wrapper in wrappers: 230 d.addCallback(do_start, wrapper) 231 d.addErrback(do_stop) 232 return d 233 234
235 -def main(args):
236 from flumotion.common import setup 237 setup.setupPackagePath() 238 from flumotion.configure import configure 239 log.debug('launch', 'Running Flumotion version %s' % 240 configure.version) 241 import twisted.copyright 242 log.debug('launch', 'Running against Twisted version %s' % 243 twisted.copyright.version) 244 from flumotion.project import project 245 for p in project.list(): 246 log.debug('launch', 'Registered project %s version %s' % ( 247 p, project.get(p, 'version'))) 248 249 parser = OptionParser(domain="flumotion-launch") 250 251 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args)) 252 options, args = parser.parse_args(args) 253 254 i18n.installGettext() 255 256 # verbose overrides --debug 257 if options.verbose: 258 log.setFluDebug("*:3") 259 260 # handle all options 261 if options.version: 262 print common.version("flumotion-launch") 263 return 0 264 265 if options.debug: 266 log.setFluDebug(options.debug) 267 268 # note parser versus parse 269 configs = parse.parse_args(args[1:]) 270 271 # load the modules, make the component 272 wrappers = [ComponentWrapper(config) for config in configs] 273 274 # make socket pairs 275 fds = make_pipes(wrappers) 276 277 reactor.running = False 278 reactor.failure = False 279 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 280 281 d = start_components(wrappers, fds) 282 283 def errback(failure): 284 log.debug('launch', log.getFailureMessage(failure)) 285 print "Error occurred: %s" % failure.getErrorMessage() 286 failure.printDetailedTraceback() 287 reactor.failure = True 288 if reactor.running: 289 print "Stopping reactor." 290 reactor.stop()
291 d.addErrback(errback) 292 293 if not reactor.failure: 294 print 'Running the reactor. Press Ctrl-C to exit.' 295 296 log.debug('launch', 'Starting reactor') 297 reactor.run() 298 299 log.debug('launch', 'Reactor stopped') 300 301 if reactor.failure: 302 return 1 303 else: 304 return 0 305