From 434602ddd2654d48cb71a6fcbf5ee1a96644f023 Mon Sep 17 00:00:00 2001 From: Jonathan Baecker Date: Tue, 28 May 2019 17:43:57 +0200 Subject: [PATCH] lets try without Queue and Thread --- ffplayout.py | 116 ++++++++++++++++----------------------------------- 1 file changed, 37 insertions(+), 79 deletions(-) diff --git a/ffplayout.py b/ffplayout.py index ddae4a9e..950f4a02 100755 --- a/ffplayout.py +++ b/ffplayout.py @@ -34,10 +34,10 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate from logging.handlers import TimedRotatingFileHandler +from shutil import copyfileobj from subprocess import PIPE, CalledProcessError, Popen, check_output from threading import Thread from types import SimpleNamespace -from queue import Queue # ------------------------------------------------------------------------------ # read variables from config file @@ -273,7 +273,7 @@ def is_int(value): # compare clip play time with real time, # to see if we are sync -def check_sync(begin, buffer): +def check_sync(begin): time_now = get_time('full_sec') # around 2.5 seconds is in ffmpeg buffer @@ -294,7 +294,6 @@ def check_sync(begin, buffer): if _general.stop and abs(time_distance) > _general.threshold: logger.error('Sync tolerance value exceeded, program terminated!') - buffer.put(None) sys.exit(1) @@ -587,9 +586,8 @@ def build_filtergraph(first, duration, seek, out, ad, ad_last, ad_next, dummy): # ------------------------------------------------------------------------------ # read values from json playlist -class GetSourceIter(object): - def __init__(self, buffer): - self.buffer = buffer +class GetSourceIter: + def __init__(self): self.last_time = get_time('full_sec') if 0 <= self.last_time < _playlist.start: @@ -829,7 +827,7 @@ class GetSourceIter(object): self.last = False if self.has_begin: - check_sync(self.begin, self.buffer) + check_sync(self.begin) self.map_extension(node) self.url_or_live_source() @@ -875,66 +873,10 @@ class GetSourceIter(object): yield self.src_cmd, self.filtergraph -# independent thread for clip preparation -def play_clips(buffer, GetSourceIter): - # send current file to buffer stdin - get_source = GetSourceIter(buffer) - - for src_cmd, filtergraph in get_source.next(): - if _pre_comp.copy: - ff_pre_settings = _pre_comp.copy_settings - else: - ff_pre_settings = filtergraph + [ - '-pix_fmt', 'yuv420p', '-r', str(_pre_comp.fps), - '-c:v', 'mpeg2video', '-intra', - '-b:v', '{}k'.format(_pre_comp.v_bitrate), - '-minrate', '{}k'.format(_pre_comp.v_bitrate), - '-maxrate', '{}k'.format(_pre_comp.v_bitrate), - '-bufsize', '{}k'.format(_pre_comp.v_bufsize), - '-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2', - '-threads', '2', '-f', 'mpegts', '-' - ] - - try: - if src_cmd[0] == '-i': - current_file = src_cmd[1] - else: - current_file = src_cmd[3] - - logger.info('play: "{}"'.format(current_file)) - - decoder = Popen( - [ - 'ffmpeg', '-v', 'error', '-hide_banner', '-nostats' - ] + src_cmd + list(ff_pre_settings), - stdout=PIPE - ) - - # 65536 is the linux pipe buffer size, - # but this number is not divisible by 188 (the mpeg-ts packet size) - # so we take the next smaller number 65424 for reading - while True: - data = decoder.stdout.read(65424) - if not data: - break - - buffer.put(data) - - except Exception: - logger.error(traceback.format_exc()) - - finally: - decoder.wait() - - else: - buffer.put(None) - - def main(): + get_source = GetSourceIter() year = get_date(False).split('-')[0] - # the Queue connects pre- and post- compression - buffer = Queue(maxsize=56) try: if _playout.preview: # preview playout to player @@ -943,7 +885,8 @@ def main(): '-hide_banner', '-nostats', '-i', 'pipe:0'], stderr=None, stdin=PIPE, - stdout=None + stdout=None, + bufsize=0 ) else: # playout to rtmp @@ -967,24 +910,39 @@ def main(): '-metadata', 'year=' + year ] + list(_playout.post_comp_extra) + [_playout.out_addr], - stdin=PIPE, + stdin=PIPE, bufsize=0 ) - play_thread = Thread( - name='play_clips', target=play_clips, args=( - buffer, GetSourceIter,) - ) - play_thread.daemon = True - play_thread.start() + for src_cmd, filtergraph in get_source.next(): + if _pre_comp.copy: + ff_pre_settings = _pre_comp.copy_settings + else: + ff_pre_settings = filtergraph + [ + '-pix_fmt', 'yuv420p', '-r', str(_pre_comp.fps), + '-c:v', 'mpeg2video', '-intra', + '-b:v', '{}k'.format(_pre_comp.v_bitrate), + '-minrate', '{}k'.format(_pre_comp.v_bitrate), + '-maxrate', '{}k'.format(_pre_comp.v_bitrate), + '-bufsize', '{}k'.format(_pre_comp.v_bufsize), + '-c:a', 's302m', '-strict', '-2', + '-ar', '48000', '-ac', '2', + '-threads', '2', '-f', 'mpegts', '-' + ] - # get data from Queue and write them to post process - while True: - data = buffer.get() - if not data: - playout.terminate() - break + if src_cmd[0] == '-i': + current_file = src_cmd[1] + else: + current_file = src_cmd[3] - playout.stdin.write(data) + logger.info('play: "{}"'.format(current_file)) + + with Popen( + [ + 'ffmpeg', '-v', 'error', '-hide_banner', '-nostats' + ] + src_cmd + list(ff_pre_settings), + stdout=PIPE, bufsize=0 + ) as decoder: + copyfileobj(decoder.stdout, playout.stdin) finally: playout.wait()