lets try without Queue and Thread

This commit is contained in:
Jonathan Baecker 2019-05-28 17:43:57 +02:00
parent facaa4144a
commit 434602ddd2

View File

@ -34,10 +34,10 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from logging.handlers import TimedRotatingFileHandler
from shutil import copyfileobj
from subprocess import PIPE, CalledProcessError, Popen, check_output
from threading import Thread
from types import SimpleNamespace
from queue import Queue
# ------------------------------------------------------------------------------
# read variables from config file
@ -273,7 +273,7 @@ def is_int(value):
# compare clip play time with real time,
# to see if we are sync
def check_sync(begin, buffer):
def check_sync(begin):
time_now = get_time('full_sec')
# around 2.5 seconds is in ffmpeg buffer
@ -294,7 +294,6 @@ def check_sync(begin, buffer):
if _general.stop and abs(time_distance) > _general.threshold:
logger.error('Sync tolerance value exceeded, program terminated!')
buffer.put(None)
sys.exit(1)
@ -587,9 +586,8 @@ def build_filtergraph(first, duration, seek, out, ad, ad_last, ad_next, dummy):
# ------------------------------------------------------------------------------
# read values from json playlist
class GetSourceIter(object):
def __init__(self, buffer):
self.buffer = buffer
class GetSourceIter:
def __init__(self):
self.last_time = get_time('full_sec')
if 0 <= self.last_time < _playlist.start:
@ -829,7 +827,7 @@ class GetSourceIter(object):
self.last = False
if self.has_begin:
check_sync(self.begin, self.buffer)
check_sync(self.begin)
self.map_extension(node)
self.url_or_live_source()
@ -875,66 +873,10 @@ class GetSourceIter(object):
yield self.src_cmd, self.filtergraph
# independent thread for clip preparation
def play_clips(buffer, GetSourceIter):
# send current file to buffer stdin
get_source = GetSourceIter(buffer)
for src_cmd, filtergraph in get_source.next():
if _pre_comp.copy:
ff_pre_settings = _pre_comp.copy_settings
else:
ff_pre_settings = filtergraph + [
'-pix_fmt', 'yuv420p', '-r', str(_pre_comp.fps),
'-c:v', 'mpeg2video', '-intra',
'-b:v', '{}k'.format(_pre_comp.v_bitrate),
'-minrate', '{}k'.format(_pre_comp.v_bitrate),
'-maxrate', '{}k'.format(_pre_comp.v_bitrate),
'-bufsize', '{}k'.format(_pre_comp.v_bufsize),
'-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2',
'-threads', '2', '-f', 'mpegts', '-'
]
try:
if src_cmd[0] == '-i':
current_file = src_cmd[1]
else:
current_file = src_cmd[3]
logger.info('play: "{}"'.format(current_file))
decoder = Popen(
[
'ffmpeg', '-v', 'error', '-hide_banner', '-nostats'
] + src_cmd + list(ff_pre_settings),
stdout=PIPE
)
# 65536 is the linux pipe buffer size,
# but this number is not divisible by 188 (the mpeg-ts packet size)
# so we take the next smaller number 65424 for reading
while True:
data = decoder.stdout.read(65424)
if not data:
break
buffer.put(data)
except Exception:
logger.error(traceback.format_exc())
finally:
decoder.wait()
else:
buffer.put(None)
def main():
get_source = GetSourceIter()
year = get_date(False).split('-')[0]
# the Queue connects pre- and post- compression
buffer = Queue(maxsize=56)
try:
if _playout.preview:
# preview playout to player
@ -943,7 +885,8 @@ def main():
'-hide_banner', '-nostats', '-i', 'pipe:0'],
stderr=None,
stdin=PIPE,
stdout=None
stdout=None,
bufsize=0
)
else:
# playout to rtmp
@ -967,24 +910,39 @@ def main():
'-metadata', 'year=' + year
] + list(_playout.post_comp_extra)
+ [_playout.out_addr],
stdin=PIPE,
stdin=PIPE, bufsize=0
)
play_thread = Thread(
name='play_clips', target=play_clips, args=(
buffer, GetSourceIter,)
)
play_thread.daemon = True
play_thread.start()
for src_cmd, filtergraph in get_source.next():
if _pre_comp.copy:
ff_pre_settings = _pre_comp.copy_settings
else:
ff_pre_settings = filtergraph + [
'-pix_fmt', 'yuv420p', '-r', str(_pre_comp.fps),
'-c:v', 'mpeg2video', '-intra',
'-b:v', '{}k'.format(_pre_comp.v_bitrate),
'-minrate', '{}k'.format(_pre_comp.v_bitrate),
'-maxrate', '{}k'.format(_pre_comp.v_bitrate),
'-bufsize', '{}k'.format(_pre_comp.v_bufsize),
'-c:a', 's302m', '-strict', '-2',
'-ar', '48000', '-ac', '2',
'-threads', '2', '-f', 'mpegts', '-'
]
# get data from Queue and write them to post process
while True:
data = buffer.get()
if not data:
playout.terminate()
break
if src_cmd[0] == '-i':
current_file = src_cmd[1]
else:
current_file = src_cmd[3]
playout.stdin.write(data)
logger.info('play: "{}"'.format(current_file))
with Popen(
[
'ffmpeg', '-v', 'error', '-hide_banner', '-nostats'
] + src_cmd + list(ff_pre_settings),
stdout=PIPE, bufsize=0
) as decoder:
copyfileobj(decoder.stdout, playout.stdin)
finally:
playout.wait()