better live ingest handling

This commit is contained in:
jb-alvarado 2022-03-24 14:06:57 +01:00
parent aec6b7e34a
commit aa1d886ae7
5 changed files with 95 additions and 66 deletions

View File

@ -46,15 +46,15 @@ def listener(que):
messenger.debug(f'Server CMD: "{" ".join(server_cmd)}"')
while True:
with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live:
with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.server:
err_thread = Thread(name='stderr_server',
target=ffmpeg_stderr_reader,
args=(ff_proc.live.stderr, '[Server]'))
args=(ff_proc.server.stderr, '[Server]'))
err_thread.daemon = True
err_thread.start()
while True:
buffer = ff_proc.live.stdout.read(pre.buffer_size)
buffer = ff_proc.server.stdout.read(pre.buffer_size)
if not buffer:
break

View File

@ -24,9 +24,8 @@ from subprocess import PIPE, Popen
from threading import Thread
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, pre, sync_op,
terminate_processes)
from ..utils import (ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, pre, terminate_processes)
def output():
@ -43,7 +42,7 @@ def output():
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
f'Using drawtext node, listening on address: {lower_third.address}'
)
)
overlay = [
'-vf',
"null,zmq=b=tcp\\\\://'{}',drawtext=text='':fontfile='{}'".format(
@ -54,7 +53,7 @@ def output():
enc_cmd = [
'ffplay', '-hide_banner', '-nostats',
'-v', f'level+{log.ff_level}', '-i', 'pipe:0'
] + overlay
] + overlay
messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"')
@ -77,10 +76,12 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + pre.settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
kill_dec = True
with Popen(
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
@ -90,29 +91,37 @@ def output():
dec_err_thread.start()
while True:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
if kill_dec:
kill_dec = False
live_on = True
get_source.first = True
messenger.info(
"Switch from offline source to live ingest")
if ff_proc.decoder.poll() is None:
ff_proc.decoder.kill()
ff_proc.decoder.wait()
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
messenger.info(
"Switch from live ingest to offline source")
kill_dec = True
live_on = False
break
except BrokenPipeError as err:
buf_dec = ff_proc.decoder.stdout.read(
pre.buffer_size)
if buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
break
except BrokenPipeError:
messenger.error('Broken Pipe!')
messenger.debug(79 * '-')
messenger.debug(f'error: "{err}"')
messenger.debug(f'delta: "{sync_op.time_delta}"')
messenger.debug(f'node: "{node}"')
messenger.debug(f'dec_cmd: "{dec_cmd}"')
messenger.debug(79 * '-')
terminate_processes(getattr(get_source, 'stop', None))
except SystemExit:

View File

@ -24,7 +24,7 @@ from subprocess import PIPE, Popen
from threading import Thread
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
from ..utils import (ff_proc, ffmpeg_stderr_reader, ingest,
log, messenger, playout, pre, terminate_processes)
@ -45,7 +45,7 @@ def output():
enc_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level}', '-hide_banner',
'-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0'
] + playout.output_param[:-3] + ['-f', 'null', '-']
] + playout.output_param[:-3] + ['-f', 'null', '-']
messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"')
@ -68,10 +68,12 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + pre.settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
kill_dec = True
with Popen(
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
@ -81,20 +83,34 @@ def output():
dec_err_thread.start()
while True:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
if kill_dec:
kill_dec = False
live_on = True
get_source.first = True
messenger.info(
"Switch from offline source to live ingest")
if ff_proc.decoder.poll() is None:
ff_proc.decoder.kill()
ff_proc.decoder.wait()
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
messenger.info(
"Switch from live ingest to offline source")
kill_dec = True
live_on = False
break
buf_dec = ff_proc.decoder.stdout.read(
pre.buffer_size)
if buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
break
except BrokenPipeError:
messenger.error('Broken Pipe!')

View File

@ -24,8 +24,8 @@ from subprocess import PIPE, Popen
from threading import Thread
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, playout, pre, sync_op,
from ..utils import (ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, playout, pre,
terminate_processes)
@ -47,7 +47,7 @@ def output():
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
f'Using drawtext node, listening on address: {lower_third.address}'
)
)
filtering = [
'-filter_complex',
f"[0:v]null,zmq=b=tcp\\\\://'{lower_third.address}',"
@ -57,7 +57,7 @@ def output():
if playout.preview:
filtering[-1] += ',split=2[v_out1][v_out2]'
preview = ['-map', '[v_out1]', '-map', '0:a'
] + playout.preview_param + ['-map', '[v_out2]', '-map', '0:a']
] + playout.preview_param + ['-map', '[v_out2]', '-map', '0:a']
elif playout.preview:
preview = playout.preview_param
@ -66,7 +66,7 @@ def output():
enc_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level}', '-hide_banner',
'-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0'
] + filtering + preview + playout.output_param
] + filtering + preview + playout.output_param
messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"')
@ -87,10 +87,12 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + pre.settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
kill_dec = True
with Popen(
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
@ -100,29 +102,37 @@ def output():
dec_err_thread.start()
while True:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
if kill_dec:
kill_dec = False
live_on = True
get_source.first = True
messenger.info(
"Switch from offline source to live ingest")
if ff_proc.decoder.poll() is None:
ff_proc.decoder.kill()
ff_proc.decoder.wait()
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
messenger.info(
"Switch from live ingest to offline source")
kill_dec = True
live_on = False
break
except BrokenPipeError as err:
buf_dec = ff_proc.decoder.stdout.read(
pre.buffer_size)
if buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
break
except BrokenPipeError:
messenger.error('Broken Pipe!')
messenger.debug(79 * '-')
messenger.debug(f'error: "{err}"')
messenger.debug(f'delta: "{sync_op.time_delta}"')
messenger.debug(f'node: "{node}"')
messenger.debug(f'dec_cmd: "{dec_cmd}"')
messenger.debug(79 * '-')
terminate_processes(getattr(get_source, 'stop', None))
except SystemExit:

View File

@ -145,7 +145,7 @@ storage = SimpleNamespace()
lower_third = SimpleNamespace()
playout = SimpleNamespace()
ff_proc = SimpleNamespace(decoder=None, encoder=None)
ff_proc = SimpleNamespace(decoder=None, encoder=None, server=None)
def str_to_sec(time_str):
@ -727,6 +727,9 @@ def terminate_processes(custom_process=None):
if ff_proc.encoder and ff_proc.encoder.poll() is None:
ff_proc.encoder.kill()
if ff_proc.server and ff_proc.server.poll() is None:
ff_proc.server.kill()
if custom_process:
custom_process()
@ -847,15 +850,6 @@ def check_sync(delta, node=None):
sys.exit(1)
def check_node_time(node, get_source):
current_time = get_time('full_sec')
clip_length = node['out'] - node['seek']
clip_end = current_time + clip_length
if pre.mode == 'playlist' and clip_end > current_time:
get_source.first = True
def seek_in(seek):
"""
seek in clip