From f52a045eefb8399ab82ee7d0d6f640fd5bfcffe9 Mon Sep 17 00:00:00 2001 From: Jonathan Baecker Date: Mon, 27 May 2019 10:31:26 +0200 Subject: [PATCH] test Queue instead of external buffer tool --- ffplayout.py | 146 +++++++++++++++++++++++++++------------------------ 1 file changed, 78 insertions(+), 68 deletions(-) diff --git a/ffplayout.py b/ffplayout.py index 067a57b8..5f9ec9c1 100755 --- a/ffplayout.py +++ b/ffplayout.py @@ -26,6 +26,7 @@ import os import smtplib import socket import sys +import traceback from argparse import ArgumentParser from ast import literal_eval from datetime import date, datetime, timedelta @@ -38,6 +39,7 @@ from subprocess import PIPE, CalledProcessError, Popen, check_output from threading import Thread from time import sleep from types import SimpleNamespace +from queue import Queue # ------------------------------------------------------------------------------ # read variables from config file @@ -325,19 +327,17 @@ def calc_buffer_size(): # check if processes a well -def check_process(play_thread, playout, mbuffer): +def check_process(play_thread, playout): while True: sleep(4) if playout.poll() is not None: logger.error( 'post-process is not alive anymore, terminate ffplayout!') - mbuffer.terminate() break if not play_thread.is_alive(): logger.error( 'pre-process is not alive anymore, terminate ffplayout!') - mbuffer.terminate() break @@ -357,6 +357,9 @@ def check_sync(begin): if 0 <= time_now < _playlist.start and not begin == _playlist.start: time_distance -= 86400.0 + # TODO: this is only for debugging + print(time_distance) + # check that we are in tolerance time if abs(time_distance) > 5 + tolerance: mailer.warning( @@ -943,11 +946,11 @@ class GetSourceIter: # independent thread for clip preparation -def play_clips(out_file, GetSourceIter): +def play_clips(buffer, GetSourceIter): # send current file to buffer stdin - iter = GetSourceIter() + get_source = GetSourceIter() - for src_cmd, filtergraph in iter.next(): + for src_cmd, filtergraph in get_source.next(): if _pre_comp.copy: ff_pre_settings = _pre_comp.copy_settings else: @@ -970,85 +973,92 @@ def play_clips(out_file, GetSourceIter): logger.info('play: "{}"'.format(current_file)) - file_piper = Popen( + # TODO: stderr and stdin only when they are used + decoder = Popen( [ 'ffmpeg', '-v', 'error', '-hide_banner', '-nostats' ] + src_cmd + list(ff_pre_settings), stdout=PIPE, - bufsize=0 + stderr=PIPE, + stdin=PIPE ) - copyfileobj(file_piper.stdout, out_file) + for package in iter(decoder.stdout.readline, ''): + buffer.put(package) + + # TODO: make this nicer + except Exception: + print(traceback.format_exc()) + finally: - file_piper.wait() + decoder.wait() def main(): year = get_date(False).split('-')[0] + + # open a buffer for the streaming pipeline + # stdin get the files loop + # stdout pipes to ffmpeg rtmp streaming + # TODO: have an eye on maxsize + buffer = Queue(maxsize=12) try: - # open a buffer for the streaming pipeline - # stdin get the files loop - # stdout pipes to ffmpeg rtmp streaming - mbuffer = Popen( - [_buffer.cli] + list(_buffer.cmd) - + ['{}k'.format(calc_buffer_size())], - stdin=PIPE, - stdout=PIPE, - bufsize=0 - ) - try: - if _playout.preview: - # preview playout to player - playout = Popen([ - 'ffplay', '-v', 'error', - '-hide_banner', '-nostats', '-i', 'pipe:0'], - stdin=mbuffer.stdout, - bufsize=0 - ) + if _playout.preview: + # preview playout to player + playout = Popen([ + 'ffplay', + '-hide_banner', '-nostats', '-i', 'pipe:0'], + stderr=None, + stdin=PIPE, + stdout=None + ) + else: + # playout to rtmp + if _pre_comp.copy: + playout_pre = [ + 'ffmpeg', '-v', 'info', '-hide_banner', '-nostats', + '-re', '-i', 'pipe:0', '-c', 'copy' + ] + _playout.post_comp_copy else: - # playout to rtmp - if _pre_comp.copy: - playout_pre = [ - 'ffmpeg', '-v', 'info', '-hide_banner', '-nostats', - '-re', '-i', 'pipe:0', '-c', 'copy' - ] + _playout.post_comp_copy - else: - playout_pre = [ - 'ffmpeg', '-v', 'info', '-hide_banner', '-nostats', - '-re', '-thread_queue_size', '256', - '-fflags', '+igndts', '-i', 'pipe:0', - '-fflags', '+genpts' - ] + _playout.post_comp_video + \ - _playout.post_comp_audio + playout_pre = [ + 'ffmpeg', '-v', 'info', '-hide_banner', '-nostats', + '-re', '-thread_queue_size', '256', + '-i', 'pipe:0' + ] + _playout.post_comp_video + \ + _playout.post_comp_audio - playout = Popen( - list(playout_pre) - + [ - '-metadata', 'service_name=' + _playout.name, - '-metadata', 'service_provider=' + _playout.provider, - '-metadata', 'year=' + year - ] + list(_playout.post_comp_extra) - + [ - _playout.out_addr - ], - stdin=mbuffer.stdout, - bufsize=0 - ) - - play_thread = Thread( - name='play_clips', target=play_clips, args=( - mbuffer.stdin, - GetSourceIter, - ) + playout = Popen( + list(playout_pre) + + [ + '-metadata', 'service_name=' + _playout.name, + '-metadata', 'service_provider=' + _playout.provider, + '-metadata', 'year=' + year + ] + list(_playout.post_comp_extra) + + [ + _playout.out_addr + ], + stdin=PIPE, ) - play_thread.daemon = True - play_thread.start() - check_process(play_thread, playout, mbuffer) - finally: - playout.wait() + play_thread = Thread( + name='play_clips', target=play_clips, args=( + buffer, + GetSourceIter, + ) + ) + play_thread.daemon = True + play_thread.start() + + # TODO: this needs to be changed, + # while True is bad, it needs a check to be able to exit + # with this loop we also end up never in the check_process function + while True: + line = buffer.get() + playout.stdin.write(line) + + check_process(play_thread, playout) finally: - mbuffer.wait() + playout.wait() if __name__ == '__main__':