Package flumotion :: Package component :: Package producers :: Package looper :: Module looper
[hide private]

Source Code for Module flumotion.component.producers.looper.looper

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  from flumotion.common import errors, messages 
 26  from flumotion.common.i18n import N_, gettexter 
 27  from flumotion.component import feedcomponent 
 28   
 29  __version__ = "$Rev: 8442 $" 
 30  T_ = gettexter() 
 31   
 32   
33 -class LooperMedium(feedcomponent.FeedComponentMedium):
34
35 - def __init__(self, comp):
37
38 - def remote_restartLoop(self):
39 return self.comp.do_seek(False)
40
41 - def remote_getNbIterations(self):
42 return self.comp.nbiterations
43
45 return self.comp.fileinformation
46 47 48 # How to start the first segment: 49 # 1) Make your pipeline, but don't link the sinks 50 # 2) Block the source pads of what would be the sinks' peers 51 # 3) When both block functions fire, link the pads, then do a segment seek 52 # 4) Then you can unblock pads and the sinks will receive exactly one 53 # new segment with all gst versions 54 # 55 # To loop a segment, when you get the segment_done message 56 # asynchronously, just do a new segment seek. 57 58
59 -class Looper(feedcomponent.ParseLaunchComponent):
60 61 componentMediumClass = LooperMedium 62
63 - def init(self):
64 self.initial_seek = False 65 self.nbiterations = 0 66 self.fileinformation = None 67 self.timeoutid = 0 68 self.pads_awaiting_block = [] 69 self.pads_to_link = [] 70 self.bus = None 71 self.uiState.addKey('info-location', '') 72 self.uiState.addKey('info-duration', 0) 73 self.uiState.addKey('info-audio', None) 74 self.uiState.addKey('info-video', None) 75 self.uiState.addKey('num-iterations', 0) 76 self.uiState.addKey('position', 0)
77
78 - def do_check(self):
79 80 def on_result(result): 81 for m in result.messages: 82 self.addMessage(m)
83 84 from flumotion.component.producers import checks 85 version = checks.get_pygst_version(gst) 86 if version >= (0, 10, 11, 0) and version < (0, 10, 14, 0): 87 # if it's going to segfault it won't have time to deliver 88 # messages to manager, otherwise we don't need to show it! 89 # but we can add a log message 90 self.warning('the version of gst-python you are using is known to ' 91 'cause segfault in the looper component, please ' 92 'update to the latest release') 93 self.warning('... just so you know, in case it crashes') 94 95 d = checks.checkTicket349() 96 d.addCallback(on_result) 97 return d
98
99 - def get_pipeline_string(self, properties):
100 # setup the properties 101 self.bus = None 102 self.videowidth = properties.get('width', 240) 103 self.videoheight = properties.get( 104 'height', int(576 * self.videowidth/720.)) 105 self.videoframerate = properties.get('framerate', (25, 2)) 106 self.filelocation = properties.get('location') 107 108 vstruct = gst.structure_from_string( 109 "video/x-raw-yuv,width=%(width)d,height=%(height)d" % 110 dict(width=self.videowidth, height=self.videoheight)) 111 vstruct['framerate'] = gst.Fraction(self.videoframerate[0], 112 self.videoframerate[1]) 113 114 vcaps = gst.Caps(vstruct) 115 116 self.run_discoverer() 117 118 template = ( 119 'filesrc location=%(location)s' 120 ' ! oggdemux name=demux' 121 ' demux. ! queue ! theoradec name=theoradec' 122 ' ! identity name=videolive single-segment=true silent=true' 123 ' ! videorate name=videorate' 124 ' ! videoscale' 125 ' ! %(vcaps)s' 126 ' ! identity name=vident sync=true silent=true' 127 ' ! @feeder:video@' 128 ' demux. ! queue ! vorbisdec name=vorbisdec' 129 ' ! identity name=audiolive single-segment=true silent=true' 130 ' ! audioconvert' 131 ' ! audio/x-raw-int,width=16,depth=16,signed=(boolean)true' 132 ' ! identity name=aident sync=true silent=true' 133 ' ! @feeder:audio@' 134 % dict(location=self.filelocation, vcaps=vcaps)) 135 136 return template
137
138 - def make_message_for_gstreamer_error(self, gerror, debug):
139 if gerror.domain == 'gst-resource-error-quark': 140 return messages.Error(T_(N_( 141 "Could not open file '%s' for reading."), self.filelocation), 142 debug='%s\n%s' % (gerror.message, debug), 143 mid=gerror.domain, priority=40) 144 base = feedcomponent.ParseLaunchComponent 145 return base.make_message_for_gstreamer_error(gerror, debug)
146
147 - def run_discoverer(self):
148 149 def discovered(d, ismedia): 150 self.uiState.set('info-location', self.filelocation) 151 self.uiState.set('info-duration', 152 max(d.audiolength, d.videolength)) 153 if d.is_audio: 154 self.uiState.set('info-audio', 155 "%d channel(s) %dHz" % (d.audiochannels, 156 d.audiorate)) 157 if d.is_video: 158 self.uiState.set('info-video', 159 "%d x %d at %d/%d fps" % (d.videowidth, 160 d.videoheight, 161 d.videorate.num, 162 d.videorate.denom))
163 164 from gst.extend import discoverer 165 d = discoverer.Discoverer(self.filelocation) 166 d.connect('discovered', discovered) 167 d.discover() 168
169 - def on_segment_done(self):
170 self.do_seek(False) 171 self.nbiterations += 1 172 self.uiState.set('num-iterations', self.nbiterations)
173
174 - def on_pads_blocked(self):
175 for src, sink in self.pads_to_link: 176 src.link(sink) 177 self.do_seek(True) 178 for src, sink in self.pads_to_link: 179 src.set_blocked_async(False, lambda *x: None) 180 self.pads_to_link = [] 181 self.nbiterations = 0 182 self.uiState.set('num-iterations', self.nbiterations)
183
184 - def configure_pipeline(self, pipeline, properties):
185 186 def on_message(bus, message): 187 handlers = {(pipeline, gst.MESSAGE_SEGMENT_DONE): 188 self.on_segment_done, 189 (pipeline, gst.MESSAGE_APPLICATION): 190 self.on_pads_blocked} 191 192 if (message.src, message.type) in handlers: 193 handlers[(message.src, message.type)]()
194 195 self.oggdemux = pipeline.get_by_name("demux") 196 197 for name in 'aident', 'vident': 198 199 def blocked(x, is_blocked): 200 if not x in self.pads_awaiting_block: 201 return 202 self.pads_awaiting_block.remove(x) 203 if not self.pads_awaiting_block: 204 s = gst.Structure('pads-blocked') 205 m = gst.message_new_application(pipeline, s) 206 # marshal to the main thread 207 pipeline.post_message(m) 208 209 e = pipeline.get_by_name(name) 210 src = e.get_pad('src') 211 sink = src.get_peer() 212 src.unlink(sink) 213 src.set_blocked_async(True, blocked) 214 self.pads_awaiting_block.append(src) 215 self.pads_to_link.append((src, sink)) 216 217 self.bus = pipeline.get_bus() 218 self.bus.add_signal_watch() 219 220 self.bus.connect('message', on_message) 221
222 - def do_seek(self, flushing):
223 """ 224 Restarts the looping. 225 226 Returns True if the seeking was accepted, 227 Returns False otherwiser 228 """ 229 self.debug("restarting looping") 230 flags = gst.SEEK_FLAG_SEGMENT | (flushing and gst.SEEK_FLAG_FLUSH or 0) 231 return self.oggdemux.seek(1.0, gst.FORMAT_TIME, flags, 232 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_END, 0)
233
234 - def do_setup(self):
235 236 def check_time(): 237 self.log("checking position") 238 try: 239 pos, format = self.pipeline.query_position(gst.FORMAT_TIME) 240 except: 241 self.debug("position query didn't succeed") 242 else: 243 self.uiState.set('position', pos) 244 return True
245 246 if not self.timeoutid: 247 self.timeoutid = gobject.timeout_add(500, check_time) 248
249 - def do_stop(self):
250 if self.bus: 251 self.bus.remove_signal_watch() 252 self.bus = None 253 254 if self.timeoutid: 255 gobject.source_remove(self.timeoutid) 256 self.timeoutid = 0 257 258 self.nbiterations = 0
259