ingest server in external file

Makes it possible to use the ingest server in almost every output module. The ingest function can also be init after running the playout, with sighub.
This commit is contained in:
jb-alvarado 2022-02-12 20:34:22 +01:00
parent 99271d2bdb
commit 7babb1aff8
7 changed files with 174 additions and 279 deletions

View File

@ -58,9 +58,10 @@ processing:
output_count: 1
ingest:
helptext: Works only in combination with output -> mode = live_switch! Run a server
helptext: Works not with direct hls output, it always needs full processing! Run a server
for a ingest stream. This stream will override the normal streaming until is done.
There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app.
enable: false
stream_input: >-
-f live_flv
-listen 1

View File

@ -0,0 +1,74 @@
# This file is part of ffplayout.
#
# ffplayout is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ffplayout is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ffplayout. If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------
"""
Start a streaming server and forword it to the playout.
This stream will have the first priority and
play instead of the normal stream (playlist/folder).
"""
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep
from .filters.default import overlay_filter
from .utils import ff_proc, ffmpeg_stderr_reader, ingest, messenger, pre
def listener(que):
filter_ = (f'[0:v]fps={str(pre.fps)},scale={pre.w}:{pre.h},'
+ f'setdar=dar={pre.aspect}[v];')
filter_ += overlay_filter(0, False, False, False)
server_cmd = [
'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error'
] + ingest.stream_input + [
'-filter_complex', f'{filter_}[vout1]',
'-map', '[vout1]', '-map', '0:a'
] + pre.settings
messenger.warning(
'Ingest stream is experimental, use it at your own risk!')
messenger.debug(f'Server CMD: "{" ".join(server_cmd)}"')
while True:
with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live:
err_thread = Thread(name='stderr_server',
target=ffmpeg_stderr_reader,
args=(ff_proc.live.stderr, '[Server]'))
err_thread.daemon = True
err_thread.start()
while True:
buffer = ff_proc.live.stdout.read(pre.buffer_size)
if not buffer:
break
que.put(buffer)
sleep(.33)
def ingest_stream():
streaming_queue = Queue(maxsize=0)
rtmp_server_thread = Thread(name='ffmpeg_server',target=listener,
args=(streaming_queue,))
rtmp_server_thread.daemon = True
rtmp_server_thread.start()
return streaming_queue

View File

@ -20,31 +20,25 @@ This module plays the compressed output directly on the desktop.
"""
from importlib import import_module
from platform import system
from subprocess import PIPE, Popen
from threading import Thread
from ..utils import (ff_proc, ffmpeg_stderr_reader, log, lower_third,
messenger, play, pre, pre_audio_codec, sync_op,
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, play, pre, sync_op,
terminate_processes)
COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424
def output():
"""
this output is for playing on desktop with ffplay
"""
overlay = []
live_on = False
stream_queue = None
ff_pre_settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
if ingest.enable:
stream_queue = ingest_stream()
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
@ -83,7 +77,7 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + ff_pre_settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
@ -96,10 +90,20 @@ def output():
dec_err_thread.start()
while True:
buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE)
if not buf:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
live_on = False
break
ff_proc.encoder.stdin.write(buf)
except BrokenPipeError as err:
messenger.error('Broken Pipe!')

View File

@ -1,214 +0,0 @@
#!/usr/bin/env python3
# This file is part of ffplayout.
#
# ffplayout is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ffplayout is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ffplayout. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
from importlib import import_module
from platform import system
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep
from ..filters.default import overlay_filter
from ..utils import (ff_proc, ffmpeg_stderr_reader, get_time, ingest, log,
lower_third, messenger, play, playout, pre,
pre_audio_codec, sync_op, terminate_processes)
COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424
def rtmp_server(que, pre_settings):
filter_ = (f'[0:v]fps={str(pre.fps)},scale={pre.w}:{pre.h},'
+ f'setdar=dar={pre.aspect}[v];')
filter_ += overlay_filter(0, False, False, False)
server_cmd = [
'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error'
] + ingest.stream_input + [
'-filter_complex', f'{filter_}[vout1]',
'-map', '[vout1]', '-map', '0:a'
] + pre_settings
messenger.warning(
'Ingest stream is experimental, use it at your own risk!')
messenger.debug(f'Server CMD: "{" ".join(server_cmd)}"')
while True:
with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live:
err_thread = Thread(name='stderr_server',
target=ffmpeg_stderr_reader,
args=(ff_proc.live.stderr, '[Server]'))
err_thread.daemon = True
err_thread.start()
while True:
buffer = ff_proc.live.stdout.read(COPY_BUFSIZE)
if not buffer:
break
que.put(buffer)
sleep(.33)
def check_time(node, get_source):
current_time = get_time('full_sec')
clip_length = node['out'] - node['seek']
clip_end = current_time + clip_length
if play.mode == 'playlist' and clip_end > current_time:
get_source.first = True
def output():
"""
this output is for streaming to a target address,
like rtmp, rtp, svt, etc.
"""
filtering = []
node = None
dec_cmd = []
preview = []
live_on = False
streaming_queue = Queue(maxsize=0)
ff_pre_settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
f'Using drawtext node, listening on address: {lower_third.address}'
)
filtering = [
'-filter_complex',
f"[0:v]null,zmq=b=tcp\\\\://'{lower_third.address}',"
+ f"drawtext=text='':fontfile='{lower_third.fontfile}'"
]
if playout.preview:
filtering[-1] += ',split=2[v_out1][v_out2]'
preview = ['-map', '[v_out1]', '-map', '0:a'
] + playout.preview_param + ['-map', '[v_out2]', '-map', '0:a']
elif playout.preview:
preview = playout.preview_param
rtmp_server_thread = Thread(name='ffmpeg_server',target=rtmp_server,
args=(streaming_queue, ff_pre_settings))
rtmp_server_thread.daemon = True
rtmp_server_thread.start()
try:
enc_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner',
'-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0'
] + filtering + preview + playout.stream_param
messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"')
ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE)
enc_err_thread = Thread(name='stderr_encoder',
target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr, '[Encoder]'))
enc_err_thread.daemon = True
enc_err_thread.start()
Iter = import_module(f'ffplayout.player.{play.mode}').GetSourceIter
get_source = Iter()
try:
for node in get_source.next():
messenger.info(f'Play: {node.get("source")}')
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + ff_pre_settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
with Popen(
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(name='stderr_decoder',
target=ffmpeg_stderr_reader,
args=(ff_proc.decoder.stderr,
'[Decoder]'))
dec_err_thread.daemon = True
dec_err_thread.start()
while True:
buf_dec = ff_proc.decoder.stdout.read(COPY_BUFSIZE)
if not streaming_queue.empty():
buf_live = streaming_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_time(node, get_source)
live_on = False
break
except BrokenPipeError as err:
messenger.error('Broken Pipe!')
messenger.debug(79 * '-')
messenger.debug(f'error: "{err}"')
messenger.debug(f'delta: "{sync_op.time_delta}"')
messenger.debug(f'node: "{node}"')
messenger.debug(f'dec_cmd: "{dec_cmd}"')
messenger.debug(79 * '-')
terminate_processes(getattr(get_source, 'stop', None))
except SystemExit:
messenger.info('Got close command')
terminate_processes(getattr(get_source, 'stop', None))
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.terminate()
except KeyboardInterrupt:
messenger.warning('Program terminated')
terminate_processes(getattr(get_source, 'stop', None))
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.terminate()
# close encoder when nothing is to do anymore
if ff_proc.encoder.poll() is None:
ff_proc.encoder.kill()
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.kill()
finally:
if ff_proc.encoder.poll() is None:
ff_proc.encoder.kill()
ff_proc.encoder.wait()
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.kill()

View File

@ -20,14 +20,12 @@ This module streams to -f null, so it is only for debugging.
"""
from importlib import import_module
from platform import system
from subprocess import PIPE, Popen
from threading import Thread
from ..utils import (ff_proc, ffmpeg_stderr_reader, log, messenger, play,
playout, pre, pre_audio_codec, terminate_processes)
COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
log, messenger, play, playout, pre, terminate_processes)
def output():
@ -35,18 +33,14 @@ def output():
this output is for streaming to a target address,
like rtmp, rtp, svt, etc.
"""
live_on = False
stream_queue = None
if ingest.enable:
stream_queue = ingest_stream()
messenger.info(f'Stream to null output, only usefull for debugging...')
ff_pre_settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
try:
enc_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner',
@ -74,7 +68,7 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + ff_pre_settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
@ -87,10 +81,20 @@ def output():
dec_err_thread.start()
while True:
buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE)
if not buf:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
live_on = False
break
ff_proc.encoder.stdin.write(buf)
except BrokenPipeError:
messenger.error('Broken Pipe!')

View File

@ -20,16 +20,14 @@ This module streams the files out to a remote target.
"""
from importlib import import_module
from platform import system
from subprocess import PIPE, Popen
from threading import Thread
from ..utils import (ff_proc, ffmpeg_stderr_reader, log, lower_third,
messenger, play, playout, pre, pre_audio_codec, sync_op,
from ..ingest_server import ingest_stream
from ..utils import (check_node_time, ff_proc, ffmpeg_stderr_reader, ingest,
log, lower_third, messenger, play, playout, pre, sync_op,
terminate_processes)
COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424
def output():
"""
@ -40,15 +38,11 @@ def output():
node = None
dec_cmd = []
preview = []
live_on = False
stream_queue = None
ff_pre_settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
if ingest.enable:
stream_queue = ingest_stream()
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
@ -93,7 +87,7 @@ def output():
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + ff_pre_settings
] + node['src_cmd'] + node['filter'] + pre.settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
@ -106,10 +100,20 @@ def output():
dec_err_thread.start()
while True:
buf = ff_proc.decoder.stdout.read(COPY_BUFSIZE)
if not buf:
buf_dec = ff_proc.decoder.stdout.read(pre.buffer_size)
if stream_queue and not stream_queue.empty():
buf_live = stream_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_node_time(node, get_source)
live_on = False
break
ff_proc.encoder.stdin.write(buf)
except BrokenPipeError as err:
messenger.error('Broken Pipe!')

View File

@ -241,6 +241,9 @@ def load_config():
lower_third.style = cfg['text']['style']
lower_third.regex = cfg['text']['regex']
ingest.enable = cfg['ingest']['enable']
ingest.stream_input = shlex.split(cfg['ingest']['stream_input'])
return cfg
@ -280,6 +283,19 @@ log.path = Path(_cfg['logging']['log_path'])
log.level = _cfg['logging']['log_level']
log.ff_level = _cfg['logging']['ffmpeg_level']
def pre_audio_codec():
"""
when add_loudnorm is False we use a different audio encoder,
s302m has higher quality, but is experimental
and works not well together with the loudnorm filter
"""
if pre.add_loudnorm:
return ['-c:a', 'mp2', '-b:a', '384k', '-ar', '48000', '-ac', '2']
return ['-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2']
pre.w = _cfg['processing']['width']
pre.h = _cfg['processing']['height']
pre.aspect = _cfg['processing']['aspect']
@ -287,8 +303,16 @@ pre.fps = _cfg['processing']['fps']
pre.v_bitrate = _cfg['processing']['width'] * _cfg['processing']['height'] / 10
pre.v_bufsize = pre.v_bitrate / 2
pre.output_count = _cfg['processing']['output_count']
pre.buffer_size = 1024 * 1024 if system() == 'Windows' else 65424
ingest.stream_input = shlex.split(_cfg['ingest']['stream_input'])
pre.settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
if stdin_args.output:
playout.mode = stdin_args.output
@ -824,6 +848,15 @@ def check_sync(delta, node=None):
sys.exit(1)
def check_node_time(node, get_source):
current_time = get_time('full_sec')
clip_length = node['out'] - node['seek']
clip_end = current_time + clip_length
if play.mode == 'playlist' and clip_end > current_time:
get_source.first = True
def seek_in(seek):
"""
seek in clip
@ -949,14 +982,3 @@ def src_or_dummy(node):
return node
def pre_audio_codec():
"""
when add_loudnorm is False we use a different audio encoder,
s302m has higher quality, but is experimental
and works not well together with the loudnorm filter
"""
if pre.add_loudnorm:
return ['-c:a', 'mp2', '-b:a', '384k', '-ar', '48000', '-ac', '2']
return ['-c:a', 's302m', '-strict', '-2', '-ar', '48000', '-ac', '2']