From ae427b948be359b6f4e77613562780b58014964a Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 26 Jan 2022 12:03:59 +0100 Subject: [PATCH] Modularize play mode, so it is possible to create a custom video source generator. --- ffplayout/output/desktop.py | 26 ++++++------------ ffplayout/output/hls.py | 44 ++++++++++++++++-------------- ffplayout/output/live_switch.py | 31 ++++++++------------- ffplayout/output/null.py | 31 ++++++++------------- ffplayout/output/stream.py | 26 ++++++------------ ffplayout/player/Readme.md | 7 +++++ ffplayout/player/__init__.py | 0 ffplayout/{ => player}/folder.py | 31 ++++++++++++--------- ffplayout/{ => player}/playlist.py | 6 ++-- 9 files changed, 89 insertions(+), 113 deletions(-) create mode 100644 ffplayout/player/Readme.md create mode 100644 ffplayout/player/__init__.py rename ffplayout/{ => player}/folder.py (87%) rename ffplayout/{ => player}/playlist.py (99%) diff --git a/ffplayout/output/desktop.py b/ffplayout/output/desktop.py index 8d5dab28..184e72f5 100644 --- a/ffplayout/output/desktop.py +++ b/ffplayout/output/desktop.py @@ -19,15 +19,14 @@ This module plays the compressed output directly on the desktop. """ +from importlib import import_module from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher -from ..playlist import GetSourceFromPlaylist from ..utils import (ff_proc, ffmpeg_stderr_reader, log, lower_third, - messenger, playlist, pre, pre_audio_codec, stdin_args, - sync_op, terminate_processes) + messenger, play, pre, pre_audio_codec, sync_op, + terminate_processes) COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 @@ -72,20 +71,11 @@ def output(): enc_err_thread.daemon = True enc_err_thread.start() - if playlist.mode and not stdin_args.folder: - watcher = None - get_source = GetSourceFromPlaylist() - else: - messenger.info('Start folder mode') - media = MediaStore() - watcher = MediaWatcher(media) - get_source = GetSourceFromFolder(media) + Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter + get_source = Iter() try: for node in get_source.next(): - if watcher is not None: - watcher.current_clip = node.get('source') - messenger.info( f'Play for {node["out"] - node["seek"]:.2f} ' f'seconds: {node.get("source")}') @@ -119,15 +109,15 @@ def output(): messenger.debug(f'node: "{node}"') messenger.debug(f'dec_cmd: "{dec_cmd}"') messenger.debug(79 * '-') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except SystemExit: messenger.info('Got close command') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except KeyboardInterrupt: messenger.warning('Program terminated') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) # close encoder when nothing is to do anymore if ff_proc.encoder.poll() is None: diff --git a/ffplayout/output/hls.py b/ffplayout/output/hls.py index dcb0e8da..ed4437a2 100644 --- a/ffplayout/output/hls.py +++ b/ffplayout/output/hls.py @@ -16,19 +16,30 @@ # ------------------------------------------------------------------------------ """ -This module write the files compression directly to a hls (m3u8) playlist. +This module write the files compression directly to a hls (m3u8) playlist, +without pre- and post-processing. + +Example config: + +out: + stream_output: >- + -flags +cgop + -f hls + -hls_time 6 + -hls_list_size 600 + -hls_flags append_list+delete_segments+omit_endlist+program_date_time + -hls_segment_filename /var/www/srs/live/stream-%09d.ts /var/www/srs/live/stream.m3u8 + """ import re +from importlib import import_module from pathlib import Path from subprocess import PIPE, Popen from threading import Thread -from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher -from ..playlist import GetSourceFromPlaylist from ..utils import (ff_proc, ffmpeg_stderr_reader, get_date, log, messenger, - playlist, playout, stdin_args, sync_op, - terminate_processes) + play, playout, sync_op, terminate_processes) def clean_ts(): @@ -38,7 +49,7 @@ def clean_ts(): then it checks if files on hard drive are older then this first *.ts and if so delete them """ - m3u8_files = [p for p in playout.hls_output if 'm3u8' in p] + m3u8_files = [p for p in playout.stream_output if 'm3u8' in p] for m3u8_file in m3u8_files: messenger.debug(f'cleanup *.ts files from: "{m3u8_file}"') @@ -67,20 +78,11 @@ def output(): sync_op.realtime = True try: - if playlist.mode and not stdin_args.folder: - watcher = None - get_source = GetSourceFromPlaylist() - else: - messenger.info('Start folder mode') - media = MediaStore() - watcher = MediaWatcher(media) - get_source = GetSourceFromFolder(media) + Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter + get_source = Iter() try: for node in get_source.next(): - if watcher is not None: - watcher.current_clip = node.get('source') - messenger.info(f'Play: {node.get("source")}') cmd = [ @@ -90,7 +92,7 @@ def output(): '-metadata', f'service_name={playout.name}', '-metadata', f'service_provider={playout.provider}', '-metadata', f'year={year}' - ] + playout.ffmpeg_param + playout.hls_output + ] + playout.ffmpeg_param + playout.stream_output messenger.debug(f'Encoder CMD: "{" ".join(cmd)}"') @@ -109,15 +111,15 @@ def output(): except BrokenPipeError: messenger.error('Broken Pipe!') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except SystemExit: messenger.info('Got close command') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except KeyboardInterrupt: messenger.warning('Program terminated') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) # close encoder when nothing is to do anymore if ff_proc.encoder.poll() is None: diff --git a/ffplayout/output/live_switch.py b/ffplayout/output/live_switch.py index 23a9bc64..49a62eae 100644 --- a/ffplayout/output/live_switch.py +++ b/ffplayout/output/live_switch.py @@ -15,7 +15,9 @@ # You should have received a copy of the GNU General Public License # along with ffplayout. If not, see . -# ------------------------------------------------------------------------------ +# ----------------------------------------------------------------------------- + +from importlib import import_module from platform import system from queue import Queue from subprocess import PIPE, Popen @@ -23,11 +25,9 @@ from threading import Thread from time import sleep from ..filters.default import overlay_filter -from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher -from ..playlist import GetSourceFromPlaylist from ..utils import (ff_proc, ffmpeg_stderr_reader, get_date, get_time, ingest, - log, lower_third, messenger, playlist, playout, pre, - pre_audio_codec, stdin_args, sync_op, terminate_processes) + log, lower_third, messenger, play, playout, pre, + pre_audio_codec, sync_op, terminate_processes) COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 @@ -71,7 +71,7 @@ def check_time(node, get_source): clip_length = node['out'] - node['seek'] clip_end = current_time + clip_length - if playlist.mode and not stdin_args.folder and clip_end > current_time: + if play.mode == 'playlist' and clip_end > current_time: get_source.first = True @@ -131,20 +131,11 @@ def output(): enc_err_thread.daemon = True enc_err_thread.start() - if playlist.mode and not stdin_args.folder: - watcher = None - get_source = GetSourceFromPlaylist() - else: - messenger.info('Start folder mode') - media = MediaStore() - watcher = MediaWatcher(media) - get_source = GetSourceFromFolder(media) + Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter + get_source = Iter() try: for node in get_source.next(): - if watcher is not None: - watcher.current_clip = node.get('source') - messenger.info(f'Play: {node.get("source")}') dec_cmd = [ @@ -187,18 +178,18 @@ def output(): messenger.debug(f'node: "{node}"') messenger.debug(f'dec_cmd: "{dec_cmd}"') messenger.debug(79 * '-') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except SystemExit: messenger.info('Got close command') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) if ff_proc.live and ff_proc.live.poll() is None: ff_proc.live.terminate() except KeyboardInterrupt: messenger.warning('Program terminated') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) if ff_proc.live and ff_proc.live.poll() is None: ff_proc.live.terminate() diff --git a/ffplayout/output/null.py b/ffplayout/output/null.py index 5c5341f1..4ef312c0 100644 --- a/ffplayout/output/null.py +++ b/ffplayout/output/null.py @@ -19,15 +19,13 @@ This module streams to -f null, so it is only for debugging. """ +from importlib import import_module from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher -from ..playlist import GetSourceFromPlaylist -from ..utils import (ff_proc, ffmpeg_stderr_reader, log, messenger, playlist, - playout, pre, pre_audio_codec, stdin_args, - terminate_processes) +from ..utils import (ff_proc, ffmpeg_stderr_reader, log, messenger, play, + playout, pre, pre_audio_codec, terminate_processes) COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 @@ -38,9 +36,11 @@ def output(): like rtmp, rtp, svt, etc. """ + messenger.info(f'Stream to null output, only usefull for debugging...') + ff_pre_settings = [ '-pix_fmt', 'yuv420p', '-r', str(pre.fps), - '-c:v', 'mpeg2video', '-intra', + '-c:v', 'mpeg2video', '-g', '1', '-b:v', f'{pre.v_bitrate}k', '-minrate', f'{pre.v_bitrate}k', '-maxrate', f'{pre.v_bitrate}k', @@ -62,20 +62,11 @@ def output(): enc_err_thread.daemon = True enc_err_thread.start() - if playlist.mode and not stdin_args.folder: - watcher = None - get_source = GetSourceFromPlaylist() - else: - messenger.info('Start folder mode') - media = MediaStore() - watcher = MediaWatcher(media) - get_source = GetSourceFromFolder(media) + Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter + get_source = Iter() try: for node in get_source.next(): - if watcher is not None: - watcher.current_clip = node.get('source') - messenger.info( f'Play for {node["out"] - node["seek"]:.2f} ' f'seconds: {node.get("source")}') @@ -103,15 +94,15 @@ def output(): except BrokenPipeError: messenger.error('Broken Pipe!') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except SystemExit: messenger.info('Got close command') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except KeyboardInterrupt: messenger.warning('Program terminated') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) # close encoder when nothing is to do anymore if ff_proc.encoder.poll() is None: diff --git a/ffplayout/output/stream.py b/ffplayout/output/stream.py index ea4fa861..1399c618 100644 --- a/ffplayout/output/stream.py +++ b/ffplayout/output/stream.py @@ -19,15 +19,14 @@ This module streams the files out to a remote target. """ +from importlib import import_module from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher -from ..playlist import GetSourceFromPlaylist from ..utils import (ff_proc, ffmpeg_stderr_reader, get_date, log, lower_third, - messenger, playlist, playout, pre, pre_audio_codec, - stdin_args, sync_op, terminate_processes) + messenger, play, playout, pre, pre_audio_codec, sync_op, + terminate_processes) COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 @@ -80,20 +79,11 @@ def output(): enc_err_thread.daemon = True enc_err_thread.start() - if playlist.mode and not stdin_args.folder: - watcher = None - get_source = GetSourceFromPlaylist() - else: - messenger.info('Start folder mode') - media = MediaStore() - watcher = MediaWatcher(media) - get_source = GetSourceFromFolder(media) + Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter + get_source = Iter() try: for node in get_source.next(): - if watcher is not None: - watcher.current_clip = node.get('source') - messenger.info(f'Play: {node.get("source")}') dec_cmd = [ @@ -125,15 +115,15 @@ def output(): messenger.debug(f'node: "{node}"') messenger.debug(f'dec_cmd: "{dec_cmd}"') messenger.debug(79 * '-') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except SystemExit: messenger.info('Got close command') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) except KeyboardInterrupt: messenger.warning('Program terminated') - terminate_processes(watcher) + terminate_processes(getattr(get_source, 'stop', None)) # close encoder when nothing is to do anymore if ff_proc.encoder.poll() is None: diff --git a/ffplayout/player/Readme.md b/ffplayout/player/Readme.md new file mode 100644 index 00000000..353a7971 --- /dev/null +++ b/ffplayout/player/Readme.md @@ -0,0 +1,7 @@ +Here you have the possibility to add you own player module. Defaults are: playing a playlist, or the content of a folder. + +If you need your own module, create a python file with the desire name. Inside it need a generator class with the name: **GetSourceIter**. + +Check **folder.py** and **playlist.py** to get an idea how it needs to work. + +After creating the custom module, set in config **play: -> mode:** the file name of your module without extension. diff --git a/ffplayout/player/__init__.py b/ffplayout/player/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ffplayout/folder.py b/ffplayout/player/folder.py similarity index 87% rename from ffplayout/folder.py rename to ffplayout/player/folder.py index f15cb341..2d667b72 100644 --- a/ffplayout/folder.py +++ b/ffplayout/player/folder.py @@ -27,8 +27,8 @@ from pathlib import Path from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer -from .filters.default import build_filtergraph -from .utils import (MediaProbe, ff_proc, get_float, messenger, stdin_args, +from ..filters.default import build_filtergraph +from ..utils import (MediaProbe, ff_proc, get_float, messenger, stdin_args, storage) # ------------------------------------------------------------------------------ @@ -163,13 +163,14 @@ class MediaWatcher: self.observer.join() -class GetSourceFromFolder: +class GetSourceIter: """ give next clip, depending on shuffle mode """ - def __init__(self, media): - self._media = media + def __init__(self): + self.media = MediaStore() + self.watcher = MediaWatcher(self.media) self.last_played = [] self.index = 0 @@ -179,28 +180,31 @@ class GetSourceFromFolder: self.node_last = None self.node_next = None + def stop(self): + self.watcher.stop() + def next(self): """ generator for getting always a new file """ while True: - while self.index < len(self._media.store): + while self.index < len(self.media.store): if self.node_next: self.node = deepcopy(self.node_next) self.probe = deepcopy(self.next_probe) else: - self.probe.load(self._media.store[self.index]) + self.probe.load(self.media.store[self.index]) duration = get_float(self.probe.format.get('duration'), 0) self.node = { 'in': 0, 'seek': 0, 'out': duration, 'duration': duration, - 'source': self._media.store[self.index], + 'source': self.media.store[self.index], 'probe': self.probe } - if self.index < len(self._media.store) - 1: - self.next_probe.load(self._media.store[self.index + 1]) + if self.index < len(self.media.store) - 1: + self.next_probe.load(self.media.store[self.index + 1]) next_duration = get_float( self.next_probe.format.get('duration'), 0) self.node_next = { @@ -208,17 +212,18 @@ class GetSourceFromFolder: 'seek': 0, 'out': next_duration, 'duration': next_duration, - 'source': self._media.store[self.index + 1], + 'source': self.media.store[self.index + 1], 'probe': self.next_probe } else: - self._media.rand() + self.media.rand() self.node_next = None - self.node['src_cmd'] = ['-i', self._media.store[self.index]] + self.node['src_cmd'] = ['-i', self.media.store[self.index]] self.node['filter'] = build_filtergraph( self.node, self.node_last, self.node_next) + self.watcher.current_clip = self.node.get('source') yield self.node self.index += 1 self.node_last = deepcopy(self.node) diff --git a/ffplayout/playlist.py b/ffplayout/player/playlist.py similarity index 99% rename from ffplayout/playlist.py rename to ffplayout/player/playlist.py index ce7573bd..3a73b941 100644 --- a/ffplayout/playlist.py +++ b/ffplayout/player/playlist.py @@ -29,8 +29,8 @@ from threading import Thread import requests -from .filters.default import build_filtergraph -from .utils import (MediaProbe, check_sync, get_date, get_delta, +from ..filters.default import build_filtergraph +from ..utils import (MediaProbe, check_sync, get_date, get_delta, get_float, get_time, messenger, playlist, sec_to_time, src_or_dummy, stdin_args, storage, sync_op, valid_json) @@ -255,7 +255,7 @@ class PlaylistReader: self.error = True -class GetSourceFromPlaylist: +class GetSourceIter: """ read values from json playlist, get current clip in time,