diff --git a/README.md b/README.md index d2d54cb0..0abde754 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,8 @@ The purpose with ffplayout is to provide a 24/7 broadcasting solution that plays - **desktop** - **HLS** - **custom** -- Multi Channel +- Multi channel +- Live rtmp ingest (experimental) Requirements ----- diff --git a/ffplayout.yml b/ffplayout.yml index f814cde1..d9e938ea 100644 --- a/ffplayout.yml +++ b/ffplayout.yml @@ -57,6 +57,13 @@ processing: loud_LRA: 11 output_count: 1 +ingest: + helptext: Works only in combination with output -> mode = live_switch! Run a rtmp server + for a ingest stream. This stream will override the normal streaming until is done. + There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app. + address: localhost + port: 1936 + playlist: helptext: Set 'playlist_mode' to 'False' if you want to play clips from the 'storage' section. Put only the root path here, for example '/playlists' subdirectories @@ -100,10 +107,10 @@ text: out: helptext: The final playout compression. Set the settings to your needs. - 'mode' has the standard options 'desktop', 'hls', 'stream'. Self made outputs - can be define, by adding script in output folder with an 'output' function inside. - 'stream_output' is for streaming output, two ffmpeg instances are fired up, for - pre- and post-processing. 'hls_output' is for direct output to hls playlist, + 'mode' has the standard options 'desktop', 'hls', 'live_switch', 'stream'. Self made + outputs can be define, by adding script in output folder with an 'output' function + inside. 'stream_output' is for streaming output, two ffmpeg instances are fired up, + for pre- and post-processing. 'hls_output' is for direct output to hls playlist, without pre- and post-processing, mode must be 'hls'. mode: 'stream' service_name: "Live Stream" diff --git a/ffplayout/output/README.md b/ffplayout/output/README.md new file mode 100644 index 00000000..d4be9b21 --- /dev/null +++ b/ffplayout/output/README.md @@ -0,0 +1,5 @@ +## Custom +ffplayout has a modularized output system, which mean you can write your own output function. Just create a python file in this folder with an **output()** function in it. In this function you ca do what ever you want. Use the other output files as references. + +#### Activating Output +To use one of the outputs you need to edit the **ffplayout.yml** config, here under **out** set your **mode** to the file name, without extension. if you need it feel free to extend the config to your needs. diff --git a/ffplayout/output/desktop.py b/ffplayout/output/desktop.py index b939bfdd..8d5dab28 100644 --- a/ffplayout/output/desktop.py +++ b/ffplayout/output/desktop.py @@ -68,7 +68,7 @@ def output(): ff_proc.encoder = Popen(enc_cmd, stderr=PIPE, stdin=PIPE, stdout=None) enc_err_thread = Thread(target=ffmpeg_stderr_reader, - args=(ff_proc.encoder.stderr, False)) + args=(ff_proc.encoder.stderr, '[Encoder]')) enc_err_thread.daemon = True enc_err_thread.start() @@ -101,7 +101,7 @@ def output(): dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder: dec_err_thread = Thread(target=ffmpeg_stderr_reader, args=(ff_proc.decoder.stderr, - True)) + '[Decoder]')) dec_err_thread.daemon = True dec_err_thread.start() diff --git a/ffplayout/output/hls.py b/ffplayout/output/hls.py index 865eb1e7..dcb0e8da 100644 --- a/ffplayout/output/hls.py +++ b/ffplayout/output/hls.py @@ -98,7 +98,7 @@ def output(): stderr_reader_thread = Thread(target=ffmpeg_stderr_reader, args=(ff_proc.encoder.stderr, - False)) + '[Encoder]')) stderr_reader_thread.daemon = True stderr_reader_thread.start() stderr_reader_thread.join() diff --git a/ffplayout/output/live_switch.py b/ffplayout/output/live_switch.py new file mode 100644 index 00000000..38b3b581 --- /dev/null +++ b/ffplayout/output/live_switch.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 + +# This file is part of ffplayout. +# +# ffplayout is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ffplayout is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with ffplayout. If not, see . + +# ------------------------------------------------------------------------------ +from platform import system +from queue import Queue +from subprocess import PIPE, Popen +from threading import Thread +from time import sleep + +from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher +from ..playlist import GetSourceFromPlaylist +from ..utils import (ff_proc, ffmpeg_stderr_reader, get_date, get_time, ingest, + log, lower_third, messenger, playlist, playout, pre, + pre_audio_codec, stdin_args, sync_op, terminate_processes) + +COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424 + + +def rtmp_server(que, pre_settings): + server_cmd = [ + 'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error', + '-f', 'live_flv', '-listen', '1', + '-i', f'rtmp://{ingest.address}:{ingest.port}/live/stream'] + pre_settings + + messenger.warning('Ingest stream is experimental, use it at your own risk!') + messenger.info(f'Start listening on "{ingest.address}:{ingest.port}"') + + while True: + with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live: + err_thread = Thread(name='stderr_server', + target=ffmpeg_stderr_reader, + args=(ff_proc.live.stderr, '[Server]')) + err_thread.daemon = True + err_thread.start() + + while True: + buffer = ff_proc.live.stdout.read(COPY_BUFSIZE) + if not buffer: + break + + que.put(buffer) + + sleep(.33) + + +def check_time(node, get_source): + current_time = get_time('full_sec') + clip_length = node['out'] - node['seek'] + clip_end = current_time + clip_length + + if playlist.mode and not stdin_args.folder and clip_end > current_time: + get_source.first = True + + +def output(): + """ + this output is for streaming to a target address, + like rtmp, rtp, svt, etc. + """ + year = get_date(False).split('-')[0] + overlay = [] + node = None + dec_cmd = [] + live_on = False + streaming_queue = Queue(maxsize=0) + + ff_pre_settings = [ + '-pix_fmt', 'yuv420p', '-r', str(pre.fps), + '-c:v', 'mpeg2video', '-g', '1', + '-b:v', f'{pre.v_bitrate}k', + '-minrate', f'{pre.v_bitrate}k', + '-maxrate', f'{pre.v_bitrate}k', + '-bufsize', f'{pre.v_bufsize}k' + ] + pre_audio_codec() + ['-f', 'mpegts', '-'] + + if lower_third.add_text and not lower_third.over_pre: + messenger.info( + f'Using drawtext node, listening on address: {lower_third.address}' + ) + overlay = [ + '-vf', + "null,zmq=b=tcp\\\\://'{}',drawtext=text='':fontfile='{}'".format( + lower_third.address.replace(':', '\\:'), lower_third.fontfile) + ] + + rtmp_server_thread = Thread(name='ffmpeg_server',target=rtmp_server, + args=(streaming_queue, ff_pre_settings)) + rtmp_server_thread.daemon = True + rtmp_server_thread.start() + + try: + enc_cmd = [ + 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner', + '-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0' + ] + overlay + [ + '-metadata', f'service_name={playout.name}', + '-metadata', f'service_provider={playout.provider}', + '-metadata', f'year={year}' + ] + playout.ffmpeg_param + playout.stream_output + + messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"') + + ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE) + + enc_err_thread = Thread(name='stderr_encoder', + target=ffmpeg_stderr_reader, + args=(ff_proc.encoder.stderr, '[Encoder]')) + enc_err_thread.daemon = True + enc_err_thread.start() + + if playlist.mode and not stdin_args.folder: + watcher = None + get_source = GetSourceFromPlaylist() + else: + messenger.info('Start folder mode') + media = MediaStore() + watcher = MediaWatcher(media) + get_source = GetSourceFromFolder(media) + + try: + for node in get_source.next(): + if watcher is not None: + watcher.current_clip = node.get('source') + + messenger.info(f'Play: {node.get("source")}') + + dec_cmd = [ + 'ffmpeg', '-v', f'level+{log.ff_level.lower()}', + '-hide_banner', '-nostats' + ] + node['src_cmd'] + node['filter'] + ff_pre_settings + + messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"') + + with Popen( + dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder: + dec_err_thread = Thread(name='stderr_decoder', + target=ffmpeg_stderr_reader, + args=(ff_proc.decoder.stderr, + '[Decoder]')) + dec_err_thread.daemon = True + dec_err_thread.start() + + while True: + buf_dec = ff_proc.decoder.stdout.read(COPY_BUFSIZE) + if not streaming_queue.empty(): + buf_live = streaming_queue.get() + ff_proc.encoder.stdin.write(buf_live) + live_on = True + + del buf_dec + elif buf_dec: + ff_proc.encoder.stdin.write(buf_dec) + else: + if live_on: + check_time(node, get_source) + live_on = False + break + + except BrokenPipeError as err: + messenger.error('Broken Pipe!') + messenger.debug(79 * '-') + messenger.debug(f'error: "{err}"') + messenger.debug(f'delta: "{sync_op.time_delta}"') + messenger.debug(f'node: "{node}"') + messenger.debug(f'dec_cmd: "{dec_cmd}"') + messenger.debug(79 * '-') + terminate_processes(watcher) + + except SystemExit: + messenger.info('Got close command') + terminate_processes(watcher) + + if ff_proc.live and ff_proc.live.poll() is None: + ff_proc.live.terminate() + + except KeyboardInterrupt: + messenger.warning('Program terminated') + terminate_processes(watcher) + + if ff_proc.live and ff_proc.live.poll() is None: + ff_proc.live.terminate() + + # close encoder when nothing is to do anymore + if ff_proc.encoder.poll() is None: + ff_proc.encoder.kill() + + if ff_proc.live and ff_proc.live.poll() is None: + ff_proc.live.kill() + + finally: + if ff_proc.encoder.poll() is None: + ff_proc.encoder.kill() + ff_proc.encoder.wait() + + if ff_proc.live and ff_proc.live.poll() is None: + ff_proc.live.kill() diff --git a/ffplayout/output/null.py b/ffplayout/output/null.py index 26d0b339..5c5341f1 100644 --- a/ffplayout/output/null.py +++ b/ffplayout/output/null.py @@ -58,7 +58,7 @@ def output(): ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE) enc_err_thread = Thread(target=ffmpeg_stderr_reader, - args=(ff_proc.encoder.stderr, False)) + args=(ff_proc.encoder.stderr, '[Encoder]')) enc_err_thread.daemon = True enc_err_thread.start() @@ -91,7 +91,7 @@ def output(): dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder: dec_err_thread = Thread(target=ffmpeg_stderr_reader, args=(ff_proc.decoder.stderr, - True)) + '[Decoder]')) dec_err_thread.daemon = True dec_err_thread.start() diff --git a/ffplayout/output/stream.py b/ffplayout/output/stream.py index 71a81e70..ea4fa861 100644 --- a/ffplayout/output/stream.py +++ b/ffplayout/output/stream.py @@ -76,7 +76,7 @@ def output(): ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE) enc_err_thread = Thread(target=ffmpeg_stderr_reader, - args=(ff_proc.encoder.stderr, False)) + args=(ff_proc.encoder.stderr, '[Encoder]')) enc_err_thread.daemon = True enc_err_thread.start() @@ -107,7 +107,7 @@ def output(): dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder: dec_err_thread = Thread(target=ffmpeg_stderr_reader, args=(ff_proc.decoder.stderr, - True)) + '[Decoder]')) dec_err_thread.daemon = True dec_err_thread.start() diff --git a/ffplayout/utils.py b/ffplayout/utils.py index d361f87c..6a1b3b04 100644 --- a/ffplayout/utils.py +++ b/ffplayout/utils.py @@ -135,6 +135,7 @@ sync_op = SimpleNamespace(time_delta=0, realtime=False) mail = SimpleNamespace() log = SimpleNamespace() pre = SimpleNamespace() +ingest = SimpleNamespace() playlist = SimpleNamespace() storage = SimpleNamespace() lower_third = SimpleNamespace() @@ -272,6 +273,9 @@ pre.v_bitrate = _cfg['processing']['width'] * _cfg['processing']['height'] / 10 pre.v_bufsize = pre.v_bitrate / 2 pre.output_count = _cfg['processing']['output_count'] +ingest.address = _cfg['ingest']['address'] +ingest.port = _cfg['ingest']['port'] + playout.mode = _cfg['out']['mode'] playout.name = _cfg['out']['service_name'] playout.provider = _cfg['out']['service_provider'] @@ -344,52 +348,27 @@ class CustomFormatter(logging.Formatter): if stdin_args.log: log.path = stdin_args.log -playout_logger = logging.getLogger('playout') -playout_logger.setLevel(log.level) -decoder_logger = logging.getLogger('decoder') -decoder_logger.setLevel(log.ff_level) -encoder_logger = logging.getLogger('encoder') -encoder_logger.setLevel(log.ff_level) +logger = logging.getLogger('playout') +logger.setLevel(log.level) if log.to_file and log.path != 'none': if log.path.is_dir(): playout_log = log.path.joinpath('ffplayout.log') - decoder_log = log.path.joinpath('decoder.log') - encoder_log = log.path.joinpath('encoder.log') else: log_dir = Path(__file__).parent.absolute().joinpath('log') log_dir.mkdir(exist_ok=True) playout_log = log_dir.joinpath('ffplayout.log') - decoder_log = log_dir.joinpath('decoder.log') - encoder_log = log_dir.joinpath('encoder.log') p_format = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s') - f_format = logging.Formatter('[%(asctime)s] %(message)s') - p_file_handler = TimedRotatingFileHandler(playout_log, when='midnight', - backupCount=log.backup_count) - d_file_handler = TimedRotatingFileHandler(decoder_log, when='midnight', - backupCount=log.backup_count) - e_file_handler = TimedRotatingFileHandler(encoder_log, when='midnight', - backupCount=log.backup_count) + handler = TimedRotatingFileHandler(playout_log, when='midnight', + backupCount=log.backup_count) - p_file_handler.setFormatter(p_format) - d_file_handler.setFormatter(f_format) - e_file_handler.setFormatter(f_format) - playout_logger.addHandler(p_file_handler) - decoder_logger.addHandler(d_file_handler) - encoder_logger.addHandler(e_file_handler) - - DEC_PREFIX = '' - ENC_PREFIX = '' + handler.setFormatter(p_format) + logger.addHandler(handler) else: console_handler = logging.StreamHandler() console_handler.setFormatter(CustomFormatter()) - playout_logger.addHandler(console_handler) - decoder_logger.addHandler(console_handler) - encoder_logger.addHandler(console_handler) - - DEC_PREFIX = '[decoder] ' - ENC_PREFIX = '[encoder] ' + logger.addHandler(console_handler) # ------------------------------------------------------------------------------ @@ -436,7 +415,7 @@ class Mailer: try: server = smtplib.SMTP(mail.server, mail.port) except socket.error as err: - playout_logger.error(err) + logger.error(err) server = None if server is not None: @@ -444,7 +423,7 @@ class Mailer: try: login = server.login(mail.s_addr, mail.s_pass) except smtplib.SMTPAuthenticationError as serr: - playout_logger.error(serr) + logger.error(serr) login = None if login is not None: @@ -504,27 +483,27 @@ class Messenger: """ log debugging messages """ - playout_logger.debug(msg.replace('\n', ' ')) + logger.debug(msg.replace('\n', ' ')) def info(self, msg): """ log and mail info messages """ - playout_logger.info(msg.replace('\n', ' ')) + logger.info(msg.replace('\n', ' ')) self._mailer.info(msg) def warning(self, msg): """ log and mail warning messages """ - playout_logger.warning(msg.replace('\n', ' ')) + logger.warning(msg.replace('\n', ' ')) self._mailer.warning(msg) def error(self, msg): """ log and mail error messages """ - playout_logger.error(msg.replace('\n', ' ')) + logger.error(msg.replace('\n', ' ')) self._mailer.error(msg) @@ -586,14 +565,14 @@ def validate_ffmpeg_libs(): check if ffmpeg contains some basic libs """ if 'libx264' not in FF_LIBS['libs']: - playout_logger.error('ffmpeg contains no libx264!') + logger.error('ffmpeg contains no libx264!') if 'libfdk-aac' not in FF_LIBS['libs']: - playout_logger.warning( + logger.warning( 'ffmpeg contains no libfdk-aac! No high quality aac...') if 'tpad' not in FF_LIBS['filters']: - playout_logger.error('ffmpeg contains no tpad filter!') + logger.error('ffmpeg contains no tpad filter!') if 'zmq' not in FF_LIBS['filters']: - playout_logger.error( + logger.error( 'ffmpeg contains no zmq filter! Text messages will not work...') @@ -712,20 +691,13 @@ def terminate_processes(watcher=None): watcher.stop() -def ffmpeg_stderr_reader(std_errors, decoder): +def ffmpeg_stderr_reader(std_errors, prefix): """ read ffmpeg stderr decoder and encoder instance and log the output """ - if decoder: - logger = decoder_logger - prefix = DEC_PREFIX - else: - logger = encoder_logger - prefix = ENC_PREFIX - def form_line(line, level): - return f'{prefix}{line.replace(level, "").rstrip()}' + return f'{prefix} {line.replace(level, "").rstrip()}' def write_log(line): if '[info]' in line: