From 7babb1aff8a748eef14e5c95fb20e6135b8661ff Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sat, 12 Feb 2022 20:34:22 +0100 Subject: [PATCH] ingest server in external file Makes it possible to use the ingest server in almost every output module. The ingest function can also be init after running the playout, with sighub. --- ffplayout.yml | 3 +- ffplayout/ingest_server.py | 74 +++++++++++ ffplayout/output/desktop.py | 38 +++--- ffplayout/output/live_switch.py | 214 -------------------------------- ffplayout/output/null.py | 40 +++--- ffplayout/output/stream.py | 38 +++--- ffplayout/utils.py | 46 +++++-- 7 files changed, 174 insertions(+), 279 deletions(-) create mode 100644 ffplayout/ingest_server.py delete mode 100644 ffplayout/output/live_switch.py diff --git a/ffplayout.yml b/ffplayout.yml index 17ec6cd1..861bdc40 100644 --- a/ffplayout.yml +++ b/ffplayout.yml @@ -58,9 +58,10 @@ processing: output_count: 1 ingest: - helptext: Works only in combination with output -> mode = live_switch! Run a server + helptext: Works not with direct hls output, it always needs full processing! Run a server for a ingest stream. This stream will override the normal streaming until is done. There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app. + enable: false stream_input: >- -f live_flv -listen 1 diff --git a/ffplayout/ingest_server.py b/ffplayout/ingest_server.py new file mode 100644 index 00000000..ad083bbb --- /dev/null +++ b/ffplayout/ingest_server.py @@ -0,0 +1,74 @@ +# This file is part of ffplayout. +# +# ffplayout is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ffplayout is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with ffplayout. If not, see . + +# ------------------------------------------------------------------------------ + +""" +Start a streaming server and forword it to the playout. +This stream will have the first priority and +play instead of the normal stream (playlist/folder). +""" +from queue import Queue +from subprocess import PIPE, Popen +from threading import Thread +from time import sleep + +from .filters.default import overlay_filter +from .utils import ff_proc, ffmpeg_stderr_reader, ingest, messenger, pre + + +def listener(que): + filter_ = (f'[0:v]fps={str(pre.fps)},scale={pre.w}:{pre.h},' + + f'setdar=dar={pre.aspect}[v];') + filter_ += overlay_filter(0, False, False, False) + + server_cmd = [ + 'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error' + ] + ingest.stream_input + [ + '-filter_complex', f'{filter_}[vout1]', + '-map', '[vout1]', '-map', '0:a' + ] + pre.settings + + messenger.warning( + 'Ingest stream is experimental, use it at your own risk!') + messenger.debug(f'Server CMD: "{" ".join(server_cmd)}"') + + while True: + with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live: + err_thread = Thread(name='stderr_server', + target=ffmpeg_stderr_reader, + args=(ff_proc.live.stderr, '[Server]')) + err_thread.daemon = True + err_thread.start() + + while True: + buffer = ff_proc.live.stdout.read(pre.buffer_size) + if not buffer: + break + + que.put(buffer) + + sleep(.33) + + +def ingest_stream(): + streaming_queue = Queue(maxsize=0) + + rtmp_server_thread = Thread(name='ffmpeg_server',target=listener, + args=(streaming_queue,)) + rtmp_server_thread.daemon = True + rtmp_server_thread.start() + + return streaming_queue diff --git a/ffplayout/output/desktop.py b/ffplayout/output/desktop.py index 696b7b90..55084049 100644 --- a/ffplayout/output/desktop.py +++ b/ffplayout/output/desktop.py @@ -20,31 +20,25 @@ This module plays the compressed output directly on the desktop. """ from importlib import import_module -from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..utils import (ff_proc, ffmpeg_stderr_reader, log, lower_third, - messenger, play, pre, pre_audio_codec, sync_op, +from ..ingest_server import ingest_stream +from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest, + log, lower_third, messenger, play, pre, sync_op, terminate_processes) -COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 - def output(): """ this output is for playing on desktop with ffplay """ overlay = [] + live_on = False + stream_queue = None - ff_pre_settings = [ - '-pix_fmt', 'yuv420p', '-r', str(pre.fps), - '-c:v', 'mpeg2video', '-g', '1', - '-b:v', f'{pre.v_bitrate}k', - '-minrate', f'{pre.v_bitrate}k', - '-maxrate', f'{pre.v_bitrate}k', - '-bufsize', f'{pre.v_bufsize}k' - ] + pre_audio_codec() + ['-f', 'mpegts', '-'] + if ingest.enable: + stream_queue = ingest_stream() if lower_third.add_text and not lower_third.over_pre: messenger.info( @@ -83,7 +77,7 @@ def output(): dec_cmd = [ 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', '-nostats' - ] + node['src_cmd'] + node['filter'] + ff_pre_settings + ] + node['src_cmd'] + node['filter'] + pre.settings messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"') @@ -96,10 +90,20 @@ def output(): dec_err_thread.start() while True: - buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE) - if not buf: + buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size) + if stream_queue and not stream_queue.empty(): + buf_live = stream_queue.get() + ff_proc.encoder.stdin.write(buf_live) + live_on = True + + del buf_dec + elif buf_dec: + ff_proc.encoder.stdin.write(buf_dec) + else: + if live_on: + check_node_time(node, get_source) + live_on = False break - ff_proc.encoder.stdin.write(buf) except BrokenPipeError as err: messenger.error('Broken Pipe!') diff --git a/ffplayout/output/live_switch.py b/ffplayout/output/live_switch.py deleted file mode 100644 index c1a40e72..00000000 --- a/ffplayout/output/live_switch.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/env python3 - -# This file is part of ffplayout. -# -# ffplayout is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# ffplayout is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with ffplayout. If not, see . - -# ----------------------------------------------------------------------------- - -from importlib import import_module -from platform import system -from queue import Queue -from subprocess import PIPE, Popen -from threading import Thread -from time import sleep - -from ..filters.default import overlay_filter -from ..utils import (ff_proc, ffmpeg_stderr_reader, get_time, ingest, log, - lower_third, messenger, play, playout, pre, - pre_audio_codec, sync_op, terminate_processes) - -COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 - - -def rtmp_server(que, pre_settings): - filter_ = (f'[0:v]fps={str(pre.fps)},scale={pre.w}:{pre.h},' - + f'setdar=dar={pre.aspect}[v];') - filter_ += overlay_filter(0, False, False, False) - - server_cmd = [ - 'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error' - ] + ingest.stream_input + [ - '-filter_complex', f'{filter_}[vout1]', - '-map', '[vout1]', '-map', '0:a' - ] + pre_settings - - messenger.warning( - 'Ingest stream is experimental, use it at your own risk!') - messenger.debug(f'Server CMD: "{" ".join(server_cmd)}"') - - while True: - with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live: - err_thread = Thread(name='stderr_server', - target=ffmpeg_stderr_reader, - args=(ff_proc.live.stderr, '[Server]')) - err_thread.daemon = True - err_thread.start() - - while True: - buffer = ff_proc.live.stdout.read(COPY_BUFSIZE) - if not buffer: - break - - que.put(buffer) - - sleep(.33) - - -def check_time(node, get_source): - current_time = get_time('full_sec') - clip_length = node['out'] - node['seek'] - clip_end = current_time + clip_length - - if play.mode == 'playlist' and clip_end > current_time: - get_source.first = True - - -def output(): - """ - this output is for streaming to a target address, - like rtmp, rtp, svt, etc. - """ - filtering = [] - node = None - dec_cmd = [] - preview = [] - live_on = False - streaming_queue = Queue(maxsize=0) - - ff_pre_settings = [ - '-pix_fmt', 'yuv420p', '-r', str(pre.fps), - '-c:v', 'mpeg2video', '-g', '1', - '-b:v', f'{pre.v_bitrate}k', - '-minrate', f'{pre.v_bitrate}k', - '-maxrate', f'{pre.v_bitrate}k', - '-bufsize', f'{pre.v_bufsize}k' - ] + pre_audio_codec() + ['-f', 'mpegts', '-'] - - if lower_third.add_text and not lower_third.over_pre: - messenger.info( - f'Using drawtext node, listening on address: {lower_third.address}' - ) - filtering = [ - '-filter_complex', - f"[0:v]null,zmq=b=tcp\\\\://'{lower_third.address}'," - + f"drawtext=text='':fontfile='{lower_third.fontfile}'" - ] - - if playout.preview: - filtering[-1] += ',split=2[v_out1][v_out2]' - preview = ['-map', '[v_out1]', '-map', '0:a' - ] + playout.preview_param + ['-map', '[v_out2]', '-map', '0:a'] - - elif playout.preview: - preview = playout.preview_param - - rtmp_server_thread = Thread(name='ffmpeg_server',target=rtmp_server, - args=(streaming_queue, ff_pre_settings)) - rtmp_server_thread.daemon = True - rtmp_server_thread.start() - - try: - enc_cmd = [ - 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', - '-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0' - ] + filtering + preview + playout.stream_param - - messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"') - - ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE) - - enc_err_thread = Thread(name='stderr_encoder', - target=ffmpeg_stderr_reader, - args=(ff_proc.encoder.stderr, '[Encoder]')) - enc_err_thread.daemon = True - enc_err_thread.start() - - Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter - get_source = Iter() - - try: - for node in get_source.next(): - messenger.info(f'Play: {node.get("source")}') - - dec_cmd = [ - 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', - '-hide_banner', '-nostats' - ] + node['src_cmd'] + node['filter'] + ff_pre_settings - - messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"') - - with Popen( - dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder: - dec_err_thread = Thread(name='stderr_decoder', - target=ffmpeg_stderr_reader, - args=(ff_proc.decoder.stderr, - '[Decoder]')) - dec_err_thread.daemon = True - dec_err_thread.start() - - while True: - buf_dec = ff_proc.decoder.stdout.read(COPY_BUFSIZE) - if not streaming_queue.empty(): - buf_live = streaming_queue.get() - ff_proc.encoder.stdin.write(buf_live) - live_on = True - - del buf_dec - elif buf_dec: - ff_proc.encoder.stdin.write(buf_dec) - else: - if live_on: - check_time(node, get_source) - live_on = False - break - - except BrokenPipeError as err: - messenger.error('Broken Pipe!') - messenger.debug(79 * '-') - messenger.debug(f'error: "{err}"') - messenger.debug(f'delta: "{sync_op.time_delta}"') - messenger.debug(f'node: "{node}"') - messenger.debug(f'dec_cmd: "{dec_cmd}"') - messenger.debug(79 * '-') - terminate_processes(getattr(get_source, 'stop', None)) - - except SystemExit: - messenger.info('Got close command') - terminate_processes(getattr(get_source, 'stop', None)) - - if ff_proc.live and ff_proc.live.poll() is None: - ff_proc.live.terminate() - - except KeyboardInterrupt: - messenger.warning('Program terminated') - terminate_processes(getattr(get_source, 'stop', None)) - - if ff_proc.live and ff_proc.live.poll() is None: - ff_proc.live.terminate() - - # close encoder when nothing is to do anymore - if ff_proc.encoder.poll() is None: - ff_proc.encoder.kill() - - if ff_proc.live and ff_proc.live.poll() is None: - ff_proc.live.kill() - - finally: - if ff_proc.encoder.poll() is None: - ff_proc.encoder.kill() - ff_proc.encoder.wait() - - if ff_proc.live and ff_proc.live.poll() is None: - ff_proc.live.kill() diff --git a/ffplayout/output/null.py b/ffplayout/output/null.py index 3730e9e4..aa3bca1b 100644 --- a/ffplayout/output/null.py +++ b/ffplayout/output/null.py @@ -20,14 +20,12 @@ This module streams to -f null, so it is only for debugging. """ from importlib import import_module -from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..utils import (ff_proc, ffmpeg_stderr_reader, log, messenger, play, - playout, pre, pre_audio_codec, terminate_processes) - -COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 +from ..ingest_server import ingest_stream +from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest, + log, messenger, play, playout, pre, terminate_processes) def output(): @@ -35,18 +33,14 @@ def output(): this output is for streaming to a target address, like rtmp, rtp, svt, etc. """ + live_on = False + stream_queue = None + + if ingest.enable: + stream_queue = ingest_stream() messenger.info(f'Stream to null output, only usefull for debugging...') - ff_pre_settings = [ - '-pix_fmt', 'yuv420p', '-r', str(pre.fps), - '-c:v', 'mpeg2video', '-g', '1', - '-b:v', f'{pre.v_bitrate}k', - '-minrate', f'{pre.v_bitrate}k', - '-maxrate', f'{pre.v_bitrate}k', - '-bufsize', f'{pre.v_bufsize}k' - ] + pre_audio_codec() + ['-f', 'mpegts', '-'] - try: enc_cmd = [ 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', @@ -74,7 +68,7 @@ def output(): dec_cmd = [ 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', '-nostats' - ] + node['src_cmd'] + node['filter'] + ff_pre_settings + ] + node['src_cmd'] + node['filter'] + pre.settings messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"') @@ -87,10 +81,20 @@ def output(): dec_err_thread.start() while True: - buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE) - if not buf: + buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size) + if stream_queue and not stream_queue.empty(): + buf_live = stream_queue.get() + ff_proc.encoder.stdin.write(buf_live) + live_on = True + + del buf_dec + elif buf_dec: + ff_proc.encoder.stdin.write(buf_dec) + else: + if live_on: + check_node_time(node, get_source) + live_on = False break - ff_proc.encoder.stdin.write(buf) except BrokenPipeError: messenger.error('Broken Pipe!') diff --git a/ffplayout/output/stream.py b/ffplayout/output/stream.py index bb7dbb89..a94a7358 100644 --- a/ffplayout/output/stream.py +++ b/ffplayout/output/stream.py @@ -20,16 +20,14 @@ This module streams the files out to a remote target. """ from importlib import import_module -from platform import system from subprocess import PIPE, Popen from threading import Thread -from ..utils import (ff_proc, ffmpeg_stderr_reader, log, lower_third, - messenger, play, playout, pre, pre_audio_codec, sync_op, +from ..ingest_server import ingest_stream +from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest, + log, lower_third, messenger, play, playout, pre, sync_op, terminate_processes) -COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 - def output(): """ @@ -40,15 +38,11 @@ def output(): node = None dec_cmd = [] preview = [] + live_on = False + stream_queue = None - ff_pre_settings = [ - '-pix_fmt', 'yuv420p', '-r', str(pre.fps), - '-c:v', 'mpeg2video', '-g', '1', - '-b:v', f'{pre.v_bitrate}k', - '-minrate', f'{pre.v_bitrate}k', - '-maxrate', f'{pre.v_bitrate}k', - '-bufsize', f'{pre.v_bufsize}k' - ] + pre_audio_codec() + ['-f', 'mpegts', '-'] + if ingest.enable: + stream_queue = ingest_stream() if lower_third.add_text and not lower_third.over_pre: messenger.info( @@ -93,7 +87,7 @@ def output(): dec_cmd = [ 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', '-nostats' - ] + node['src_cmd'] + node['filter'] + ff_pre_settings + ] + node['src_cmd'] + node['filter'] + pre.settings messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"') @@ -106,10 +100,20 @@ def output(): dec_err_thread.start() while True: - buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE) - if not buf: + buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size) + if stream_queue and not stream_queue.empty(): + buf_live = stream_queue.get() + ff_proc.encoder.stdin.write(buf_live) + live_on = True + + del buf_dec + elif buf_dec: + ff_proc.encoder.stdin.write(buf_dec) + else: + if live_on: + check_node_time(node, get_source) + live_on = False break - ff_proc.encoder.stdin.write(buf) except BrokenPipeError as err: messenger.error('Broken Pipe!') diff --git a/ffplayout/utils.py b/ffplayout/utils.py index 14baf053..fd855ce9 100644 --- a/ffplayout/utils.py +++ b/ffplayout/utils.py @@ -241,6 +241,9 @@ def load_config(): lower_third.style = cfg['text']['style'] lower_third.regex = cfg['text']['regex'] + ingest.enable = cfg['ingest']['enable'] + ingest.stream_input = shlex.split(cfg['ingest']['stream_input']) + return cfg @@ -280,6 +283,19 @@ log.path = Path(_cfg['logging']['log_path']) log.level = _cfg['logging']['log_level'] log.ff_level = _cfg['logging']['ffmpeg_level'] + +def pre_audio_codec(): + """ + when add_loudnorm is False we use a different audio encoder, + s302m has higher quality, but is experimental + and works not well together with the loudnorm filter + """ + if pre.add_loudnorm: + return ['-c:a', 'mp2', '-b:a', '384k', '-ar', '48000', '-ac', '2'] + + return ['-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2'] + + pre.w = _cfg['processing']['width'] pre.h = _cfg['processing']['height'] pre.aspect = _cfg['processing']['aspect'] @@ -287,8 +303,16 @@ pre.fps = _cfg['processing']['fps'] pre.v_bitrate = _cfg['processing']['width'] * _cfg['processing']['height'] / 10 pre.v_bufsize = pre.v_bitrate / 2 pre.output_count = _cfg['processing']['output_count'] +pre.buffer_size = 1024 * 1024 if system() == 'Windows' else 65424 -ingest.stream_input = shlex.split(_cfg['ingest']['stream_input']) +pre.settings = [ + '-pix_fmt', 'yuv420p', '-r', str(pre.fps), + '-c:v', 'mpeg2video', '-g', '1', + '-b:v', f'{pre.v_bitrate}k', + '-minrate', f'{pre.v_bitrate}k', + '-maxrate', f'{pre.v_bitrate}k', + '-bufsize', f'{pre.v_bufsize}k' + ] + pre_audio_codec() + ['-f', 'mpegts', '-'] if stdin_args.output: playout.mode = stdin_args.output @@ -824,6 +848,15 @@ def check_sync(delta, node=None): sys.exit(1) +def check_node_time(node, get_source): + current_time = get_time('full_sec') + clip_length = node['out'] - node['seek'] + clip_end = current_time + clip_length + + if play.mode == 'playlist' and clip_end > current_time: + get_source.first = True + + def seek_in(seek): """ seek in clip @@ -949,14 +982,3 @@ def src_or_dummy(node): return node - -def pre_audio_codec(): - """ - when add_loudnorm is False we use a different audio encoder, - s302m has higher quality, but is experimental - and works not well together with the loudnorm filter - """ - if pre.add_loudnorm: - return ['-c:a', 'mp2', '-b:a', '384k', '-ar', '48000', '-ac', '2'] - - return ['-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2']