1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import time
25 import tempfile
26
27 import gobject
28 import gst
29
30 from twisted.internet import reactor
31
32 from flumotion.component import feedcomponent
33 from flumotion.common import log, gstreamer, pygobject, messages, errors
34 from flumotion.common import documentation, format
35 from flumotion.common import eventcalendar, poller
36 from flumotion.common.i18n import N_, gettexter
37 from flumotion.common.mimetypes import mimeTypeToExtention
38 from flumotion.common.pygobject import gsignal
39
40
41
42
43
44
45
46
47
48 from flumotion.component.component import moods
49
50 __all__ = ['Disker']
51 __version__ = "$Rev: 8644 $"
52 T_ = gettexter()
53
54
55 DISKPOLL_FREQ = 60
56
57
58 FILELIST_SIZE = 100
59
60 """
61 Disker has a property 'ical-schedule'. This allows an ical file to be
62 specified in the config and have recordings scheduled based on events.
63 This file will be monitored for changes and events reloaded if this
64 happens.
65
66 The filename of a recording started from an ical file will be produced
67 via passing the ical event summary through strftime, so that an archive
68 can encode the date and time that it was begun.
69
70 The time that will be given to strftime will be given in the timezone of
71 the ical event. In practice this will either be UTC or the local time of
72 the machine running the disker, as the ical scheduler does not
73 understand arbitrary timezones.
74 """
75
76
78
79
80
83
84
85
86
89
99
100
101
104
105
106 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
107 componentMediumClass = DiskerMedium
108 checkOffset = True
109 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false'
110 file = None
111 directory = None
112 location = None
113 caps = None
114 last_tstamp = None
115
116 _startFilenameTemplate = None
117 _startTimeTuple = None
118 _rotateTimeDelayedCall = None
119 _pollDiskDC = None
120 _symlinkToLastRecording = None
121 _symlinkToCurrentRecording = None
122
123
124
125
126
127
128
145
146
147
161
173
174
175
177 directory = properties['directory']
178
179 self.directory = directory
180
181 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')])
182
183 rotateType = properties.get('rotate-type', 'none')
184
185
186 if not rotateType in ['none', 'size', 'time']:
187 m = messages.Error(T_(N_(
188 "The configuration property 'rotate-type' should be set to "
189 "'size', time', or 'none', not '%s'. "
190 "Please fix the configuration."),
191 rotateType), mid='rotate-type')
192 self.addMessage(m)
193 raise errors.ComponentSetupHandledError()
194
195
196 if rotateType in ['size', 'time']:
197 if rotateType not in properties.keys():
198 m = messages.Error(T_(N_(
199 "The configuration property '%s' should be set. "
200 "Please fix the configuration."),
201 rotateType), mid='rotate-type')
202 self.addMessage(m)
203 raise errors.ComponentSetupHandledError()
204
205
206 if rotateType == 'size':
207 self.setSizeRotate(properties['size'])
208 self.uiState.set('rotate-type',
209 'every %sB' % \
210 format.formatStorage(properties['size']))
211 elif rotateType == 'time':
212 self.setTimeRotate(properties['time'])
213 self.uiState.set('rotate-type',
214 'every %s' % \
215 format.formatTime(properties['time']))
216 else:
217 self.uiState.set('rotate-type', 'disabled')
218
219
220 return self.pipe_template
221
246
247 if not eventcalendar.HAS_ICALENDAR:
248 missingModule('icalendar')
249 if not eventcalendar.HAS_DATEUTIL:
250 missingModule('dateutil')
251
252 raise errors.ComponentSetupHandledError()
253
254 sink = self.get_element('fdsink')
255
256 if gstreamer.element_factory_has_property('multifdsink',
257 'resend-streamheader'):
258 sink.set_property('resend-streamheader', False)
259 else:
260 self.debug("resend-streamheader property not available, "
261 "resending streamheader when it changes in the caps")
262 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb)
263
264 sink.connect('client-removed', self._client_removed_cb)
265
266
267 react_to_marks = properties.get('react-to-stream-markers', False)
268 if react_to_marks:
269 pfx = properties.get('stream-marker-filename-prefix', '%03d.')
270 self._markerPrefix = pfx
271 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
272
273
274
276
277
278 self._pollDiskDC = None
279 s = None
280 try:
281 s = os.statvfs(self.directory)
282 except Exception, e:
283 self.debug('failed to figure out disk space: %s',
284 log.getExceptionMessage(e))
285
286 if not s:
287 free = None
288 else:
289 free = format.formatStorage(s.f_frsize * s.f_bavail)
290
291 if self.uiState.get('disk-free') != free:
292 self.debug("disk usage changed, reporting to observers")
293 self.uiState.set('disk-free', free)
294
303
309
316
326
328 if self.caps:
329 return self.caps.get_structure(0).get_name()
330
331
332
334 mime = self.getMime()
335 if mime == 'multipart/x-mixed-replace':
336 mime += ";boundary=ThisRandomString"
337 return mime
338
370
372 """
373 @param filenameTemplate: strftime format string to decide filename
374 @param timeTuple: a valid time tuple to pass to strftime,
375 defaulting to time.localtime().
376 """
377 mime = self.getMime()
378 ext = mimeTypeToExtention(mime)
379
380 self.stopRecording()
381
382 sink = self.get_element('fdsink')
383 if sink.get_state() == gst.STATE_NULL:
384 sink.set_state(gst.STATE_READY)
385
386 filename = ""
387 if not filenameTemplate:
388 filenameTemplate = self._defaultFilenameTemplate
389 filename = "%s.%s" % (format.strftime(filenameTemplate,
390 timeTuple or time.localtime()), ext)
391 self.location = os.path.join(self.directory, filename)
392
393
394
395 location = self.location
396 i = 1
397 while os.path.exists(location):
398 mtimeTuple = time.gmtime(os.stat(location).st_mtime)
399 if mtimeTuple <= timeTuple:
400 self.info(
401 "Existing recording %s from previous event, overwriting",
402 location)
403 break
404
405 self.info(
406 "Existing recording %s from current event, changing name",
407 location)
408 location = self.location + '.' + str(i)
409 i += 1
410 self.location = location
411
412 self.info("Changing filename to %s", self.location)
413 try:
414 self.file = open(self.location, 'wb')
415 except IOError, e:
416 self.warning("Failed to open output file %s: %s",
417 self.location, log.getExceptionMessage(e))
418 m = messages.Error(T_(N_(
419 "Failed to open output file '%s' for writing. "
420 "Check permissions on the file."), self.location))
421 self.addMessage(m)
422 return
423 self._recordingStarted(self.file, self.location)
424 sink.emit('add', self.file.fileno())
425 self.last_tstamp = time.time()
426 self.uiState.set('filename', self.location)
427 self.uiState.set('recording', True)
428
429 if self._symlinkToCurrentRecording:
430 self._updateSymlink(self.location,
431 self._symlinkToCurrentRecording)
432
434 if not dest.startswith('/'):
435 dest = os.path.join(self.directory, dest)
436
437
438
439 self.debug("updating symbolic link %s to point to %s", dest, src)
440 try:
441 try:
442 os.symlink(src, dest)
443 except OSError, e:
444 if e.errno == errno.EEXIST and os.path.islink(dest):
445 os.unlink(dest)
446 os.symlink(src, dest)
447 else:
448 raise
449 except Exception, e:
450 self.info("Failed to update link %s: %s", dest,
451 log.getExceptionMessage(e))
452 m = messages.Warning(T_(N_("Failed to update symbolic link "
453 "'%s'. Check your permissions."), dest),
454 debug=log.getExceptionMessage(e))
455 self.addMessage(m)
456
458 sink = self.get_element('fdsink')
459 if sink.get_state() == gst.STATE_NULL:
460 sink.set_state(gst.STATE_READY)
461
462 if self.file:
463 self.file.flush()
464 sink.emit('remove', self.file.fileno())
465 self._recordingStopped(self.file, self.location)
466 self.file = None
467 self.uiState.set('filename', None)
468 self.uiState.set('recording', False)
469 try:
470 size = format.formatStorage(os.stat(self.location).st_size)
471 except EnvironmentError, e:
472
473 size = "unknown"
474
475
476 fl = self.uiState.get('filelist', otherwise=[])
477 if FILELIST_SIZE == len(fl):
478 self.uiState.remove('filelist', fl[0])
479
480 self.uiState.append('filelist', (self.last_tstamp,
481 self.location,
482 size))
483
484 if self._symlinkToLastRecording:
485 self._updateSymlink(self.location,
486 self._symlinkToLastRecording)
487
507
508
509
511
512
513 if client_status == 4:
514
515
516 reactor.callFromThread(self._client_error_cb)
517
528
534
539
541
542
543
544 current = self.uiState.get('next-points')[:]
545 points = self.icalScheduler.getPoints()
546 new = []
547
548
549
550
551 def _utcAndStripTZ(dt):
552 from flumotion.common import eventcalendar
553 return dt.astimezone(eventcalendar.UTC).replace(tzinfo=None)
554
555 for p in points:
556 dtUTC = _utcAndStripTZ(p.dt)
557 dtStart = p.eventInstance.start.replace(tzinfo=None)
558 new.append((dtUTC, p.which,
559 format.strftime(p.eventInstance.event.content,
560 dtStart.timetuple())))
561
562 for t in current:
563 if t not in new:
564 self.debug('removing tuple %r from next-points', t)
565 self.uiState.remove('next-points', t)
566
567 for t in new:
568 if t not in current:
569 self.debug('appending tuple %r to next-points', t)
570 self.uiState.append('next-points', t)
571
573 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
574
575 if socket not in self.plugs:
576 return
577 for plug in self.plugs[socket]:
578 self.debug('invoking recordingStarted on '
579 'plug %r on socket %s', plug, socket)
580 plug.recordingStarted(file, location)
581
583 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
584
585 if socket not in self.plugs:
586 return
587 for plug in self.plugs[socket]:
588 self.debug('invoking recordingStopped on '
589 'plug %r on socket %s', plug, socket)
590 plug.recordingStopped(file, location)
591
592
593
595 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM:
596 evt_struct = event.get_structure()
597 if evt_struct.get_name() == 'FluStreamMark':
598 if evt_struct['action'] == 'start':
599 self._onMarkerStart(evt_struct['prog_id'])
600 elif evt_struct['action'] == 'stop':
601 self._onMarkerStop()
602 return True
603
606
608 tmpl = self._defaultFilenameTemplate
609 if self._markerPrefix:
610 try:
611 tmpl = '%s%s' % (self._markerPrefix % data,
612 self._defaultFilenameTemplate)
613 except TypeError, err:
614 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
615 '%r <-- %r.'),
616 self._markerPrefix, data),
617 mid='expand-marker-prefix')
618 self.addMessage(m)
619 self.warning('Failed expanding filename prefix: '
620 '%r <-- %r; %r' %
621 (self._markerPrefix, data, err))
622 self.changeFilename(tmpl)
623
629