Revert "remove encoder/decoder log, add experimental ingest stream, add readme to output."

This reverts commit 8eeecd096c.
This commit is contained in:
jonathan 2022-01-24 17:32:57 +01:00
parent 8eeecd096c
commit a639e3722f
10 changed files with 64 additions and 260 deletions

View File

@ -44,8 +44,7 @@ The purpose with ffplayout is to provide a 24/7 broadcasting solution that plays
- **desktop**
- **HLS**
- **custom**
- Multi channel
- Live rtmp ingest (experimental)
- Multi Channel
Requirements
-----

View File

@ -57,13 +57,6 @@ processing:
loud_LRA: 11
output_count: 1
ingest:
helptext: Works only in combination with output -> mode = live_switch! Run a rtmp server
for a ingest stream. This stream will override the normal streaming until is done.
There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app.
address: localhost
port: 1936
playlist:
helptext: Set 'playlist_mode' to 'False' if you want to play clips from the 'storage'
section. Put only the root path here, for example '/playlists' subdirectories
@ -107,10 +100,10 @@ text:
out:
helptext: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'live_switch', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'stream_output' is for streaming output, two ffmpeg instances are fired up,
for pre- and post-processing. 'hls_output' is for direct output to hls playlist,
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made outputs
can be define, by adding script in output folder with an 'output' function inside.
'stream_output' is for streaming output, two ffmpeg instances are fired up, for
pre- and post-processing. 'hls_output' is for direct output to hls playlist,
without pre- and post-processing, mode must be 'hls'.
mode: 'stream'
service_name: "Live Stream"

View File

@ -1,5 +0,0 @@
## Custom
ffplayout has a modularized output system, which mean you can write your own output function. Just create a python file in this folder with an **output()** function in it. In this function you ca do what ever you want. Use the other output files as references.
#### Activating Output
To use one of the outputs you need to edit the **ffplayout.yml** config, here under **out** set your **mode** to the file name, without extension. if you need it feel free to extend the config to your needs.

View File

@ -68,7 +68,7 @@ def output():
ff_proc.encoder = Popen(enc_cmd, stderr=PIPE, stdin=PIPE, stdout=None)
enc_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr, '[Encoder]'))
args=(ff_proc.encoder.stderr, False))
enc_err_thread.daemon = True
enc_err_thread.start()
@ -101,7 +101,7 @@ def output():
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.decoder.stderr,
'[Decoder]'))
True))
dec_err_thread.daemon = True
dec_err_thread.start()

View File

@ -98,7 +98,7 @@ def output():
stderr_reader_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr,
'[Encoder]'))
False))
stderr_reader_thread.daemon = True
stderr_reader_thread.start()
stderr_reader_thread.join()

View File

@ -1,211 +0,0 @@
#!/usr/bin/env python3
# This file is part of ffplayout.
#
# ffplayout is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ffplayout is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ffplayout. If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------
from platform import system
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep
from ..folder import GetSourceFromFolder, MediaStore, MediaWatcher
from ..playlist import GetSourceFromPlaylist
from ..utils import (ff_proc, ffmpeg_stderr_reader, get_date, get_time, ingest,
log, lower_third, messenger, playlist, playout, pre,
pre_audio_codec, stdin_args, sync_op, terminate_processes)
COPY_BUFSIZE = 1024 * 1024 if system() == 'Windows' else 65424
def rtmp_server(que, pre_settings):
server_cmd = [
'ffmpeg', '-hide_banner', '-nostats', '-v', 'level+error',
'-f', 'live_flv', '-listen', '1',
'-i', f'rtmp://{ingest.address}:{ingest.port}/live/stream'] + pre_settings
messenger.warning('Ingest stream is experimental, use it at your own risk!')
messenger.info(f'Start listening on "{ingest.address}:{ingest.port}"')
while True:
with Popen(server_cmd, stderr=PIPE, stdout=PIPE) as ff_proc.live:
err_thread = Thread(name='stderr_server',
target=ffmpeg_stderr_reader,
args=(ff_proc.live.stderr, '[Server]'))
err_thread.daemon = True
err_thread.start()
while True:
buffer = ff_proc.live.stdout.read(COPY_BUFSIZE)
if not buffer:
break
que.put(buffer)
sleep(.33)
def check_time(node, get_source):
current_time = get_time('full_sec')
clip_length = node['out'] - node['seek']
clip_end = current_time + clip_length
if playlist.mode and not stdin_args.folder and clip_end > current_time:
get_source.first = True
def output():
"""
this output is for streaming to a target address,
like rtmp, rtp, svt, etc.
"""
year = get_date(False).split('-')[0]
overlay = []
node = None
dec_cmd = []
live_on = False
streaming_queue = Queue(maxsize=0)
ff_pre_settings = [
'-pix_fmt', 'yuv420p', '-r', str(pre.fps),
'-c:v', 'mpeg2video', '-g', '1',
'-b:v', f'{pre.v_bitrate}k',
'-minrate', f'{pre.v_bitrate}k',
'-maxrate', f'{pre.v_bitrate}k',
'-bufsize', f'{pre.v_bufsize}k'
] + pre_audio_codec() + ['-f', 'mpegts', '-']
if lower_third.add_text and not lower_third.over_pre:
messenger.info(
f'Using drawtext node, listening on address: {lower_third.address}'
)
overlay = [
'-vf',
"null,zmq=b=tcp\\\\://'{}',drawtext=text='':fontfile='{}'".format(
lower_third.address.replace(':', '\\:'), lower_third.fontfile)
]
rtmp_server_thread = Thread(name='ffmpeg_server',target=rtmp_server,
args=(streaming_queue, ff_pre_settings))
rtmp_server_thread.daemon = True
rtmp_server_thread.start()
try:
enc_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}', '-hide_banner',
'-nostats', '-re', '-thread_queue_size', '160', '-i', 'pipe:0'
] + overlay + [
'-metadata', f'service_name={playout.name}',
'-metadata', f'service_provider={playout.provider}',
'-metadata', f'year={year}'
] + playout.ffmpeg_param + playout.stream_output
messenger.debug(f'Encoder CMD: "{" ".join(enc_cmd)}"')
ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE)
enc_err_thread = Thread(name='stderr_encoder',
target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr, '[Encoder]'))
enc_err_thread.daemon = True
enc_err_thread.start()
if playlist.mode and not stdin_args.folder:
watcher = None
get_source = GetSourceFromPlaylist()
else:
messenger.info('Start folder mode')
media = MediaStore()
watcher = MediaWatcher(media)
get_source = GetSourceFromFolder(media)
try:
for node in get_source.next():
if watcher is not None:
watcher.current_clip = node.get('source')
messenger.info(f'Play: {node.get("source")}')
dec_cmd = [
'ffmpeg', '-v', f'level+{log.ff_level.lower()}',
'-hide_banner', '-nostats'
] + node['src_cmd'] + node['filter'] + ff_pre_settings
messenger.debug(f'Decoder CMD: "{" ".join(dec_cmd)}"')
with Popen(
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(name='stderr_decoder',
target=ffmpeg_stderr_reader,
args=(ff_proc.decoder.stderr,
'[Decoder]'))
dec_err_thread.daemon = True
dec_err_thread.start()
while True:
buf_dec = ff_proc.decoder.stdout.read(COPY_BUFSIZE)
if not streaming_queue.empty():
buf_live = streaming_queue.get()
ff_proc.encoder.stdin.write(buf_live)
live_on = True
del buf_dec
elif buf_dec:
ff_proc.encoder.stdin.write(buf_dec)
else:
if live_on:
check_time(node, get_source)
live_on = False
break
except BrokenPipeError as err:
messenger.error('Broken Pipe!')
messenger.debug(79 * '-')
messenger.debug(f'error: "{err}"')
messenger.debug(f'delta: "{sync_op.time_delta}"')
messenger.debug(f'node: "{node}"')
messenger.debug(f'dec_cmd: "{dec_cmd}"')
messenger.debug(79 * '-')
terminate_processes(watcher)
except SystemExit:
messenger.info('Got close command')
terminate_processes(watcher)
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.terminate()
except KeyboardInterrupt:
messenger.warning('Program terminated')
terminate_processes(watcher)
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.terminate()
# close encoder when nothing is to do anymore
if ff_proc.encoder.poll() is None:
ff_proc.encoder.kill()
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.kill()
finally:
if ff_proc.encoder.poll() is None:
ff_proc.encoder.kill()
ff_proc.encoder.wait()
if ff_proc.live and ff_proc.live.poll() is None:
ff_proc.live.kill()

View File

@ -58,7 +58,7 @@ def output():
ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE)
enc_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr, '[Encoder]'))
args=(ff_proc.encoder.stderr, False))
enc_err_thread.daemon = True
enc_err_thread.start()
@ -91,7 +91,7 @@ def output():
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.decoder.stderr,
'[Decoder]'))
True))
dec_err_thread.daemon = True
dec_err_thread.start()

View File

@ -76,7 +76,7 @@ def output():
ff_proc.encoder = Popen(enc_cmd, stdin=PIPE, stderr=PIPE)
enc_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.encoder.stderr, '[Encoder]'))
args=(ff_proc.encoder.stderr, False))
enc_err_thread.daemon = True
enc_err_thread.start()
@ -107,7 +107,7 @@ def output():
dec_cmd, stdout=PIPE, stderr=PIPE) as ff_proc.decoder:
dec_err_thread = Thread(target=ffmpeg_stderr_reader,
args=(ff_proc.decoder.stderr,
'[Decoder]'))
True))
dec_err_thread.daemon = True
dec_err_thread.start()

View File

@ -135,7 +135,6 @@ sync_op = SimpleNamespace(time_delta=0, realtime=False)
mail = SimpleNamespace()
log = SimpleNamespace()
pre = SimpleNamespace()
ingest = SimpleNamespace()
playlist = SimpleNamespace()
storage = SimpleNamespace()
lower_third = SimpleNamespace()
@ -273,9 +272,6 @@ pre.v_bitrate = _cfg['processing']['width'] * _cfg['processing']['height'] / 10
pre.v_bufsize = pre.v_bitrate / 2
pre.output_count = _cfg['processing']['output_count']
ingest.address = _cfg['ingest']['address']
ingest.port = _cfg['ingest']['port']
playout.mode = _cfg['out']['mode']
playout.name = _cfg['out']['service_name']
playout.provider = _cfg['out']['service_provider']
@ -348,27 +344,52 @@ class CustomFormatter(logging.Formatter):
if stdin_args.log:
log.path = stdin_args.log
logger = logging.getLogger('playout')
logger.setLevel(log.level)
playout_logger = logging.getLogger('playout')
playout_logger.setLevel(log.level)
decoder_logger = logging.getLogger('decoder')
decoder_logger.setLevel(log.ff_level)
encoder_logger = logging.getLogger('encoder')
encoder_logger.setLevel(log.ff_level)
if log.to_file and log.path != 'none':
if log.path.is_dir():
playout_log = log.path.joinpath('ffplayout.log')
decoder_log = log.path.joinpath('decoder.log')
encoder_log = log.path.joinpath('encoder.log')
else:
log_dir = Path(__file__).parent.absolute().joinpath('log')
log_dir.mkdir(exist_ok=True)
playout_log = log_dir.joinpath('ffplayout.log')
decoder_log = log_dir.joinpath('decoder.log')
encoder_log = log_dir.joinpath('encoder.log')
p_format = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
handler = TimedRotatingFileHandler(playout_log, when='midnight',
backupCount=log.backup_count)
f_format = logging.Formatter('[%(asctime)s] %(message)s')
p_file_handler = TimedRotatingFileHandler(playout_log, when='midnight',
backupCount=log.backup_count)
d_file_handler = TimedRotatingFileHandler(decoder_log, when='midnight',
backupCount=log.backup_count)
e_file_handler = TimedRotatingFileHandler(encoder_log, when='midnight',
backupCount=log.backup_count)
handler.setFormatter(p_format)
logger.addHandler(handler)
p_file_handler.setFormatter(p_format)
d_file_handler.setFormatter(f_format)
e_file_handler.setFormatter(f_format)
playout_logger.addHandler(p_file_handler)
decoder_logger.addHandler(d_file_handler)
encoder_logger.addHandler(e_file_handler)
DEC_PREFIX = ''
ENC_PREFIX = ''
else:
console_handler = logging.StreamHandler()
console_handler.setFormatter(CustomFormatter())
logger.addHandler(console_handler)
playout_logger.addHandler(console_handler)
decoder_logger.addHandler(console_handler)
encoder_logger.addHandler(console_handler)
DEC_PREFIX = '[decoder] '
ENC_PREFIX = '[encoder] '
# ------------------------------------------------------------------------------
@ -415,7 +436,7 @@ class Mailer:
try:
server = smtplib.SMTP(mail.server, mail.port)
except socket.error as err:
logger.error(err)
playout_logger.error(err)
server = None
if server is not None:
@ -423,7 +444,7 @@ class Mailer:
try:
login = server.login(mail.s_addr, mail.s_pass)
except smtplib.SMTPAuthenticationError as serr:
logger.error(serr)
playout_logger.error(serr)
login = None
if login is not None:
@ -483,27 +504,27 @@ class Messenger:
"""
log debugging messages
"""
logger.debug(msg.replace('\n', ' '))
playout_logger.debug(msg.replace('\n', ' '))
def info(self, msg):
"""
log and mail info messages
"""
logger.info(msg.replace('\n', ' '))
playout_logger.info(msg.replace('\n', ' '))
self._mailer.info(msg)
def warning(self, msg):
"""
log and mail warning messages
"""
logger.warning(msg.replace('\n', ' '))
playout_logger.warning(msg.replace('\n', ' '))
self._mailer.warning(msg)
def error(self, msg):
"""
log and mail error messages
"""
logger.error(msg.replace('\n', ' '))
playout_logger.error(msg.replace('\n', ' '))
self._mailer.error(msg)
@ -565,14 +586,14 @@ def validate_ffmpeg_libs():
check if ffmpeg contains some basic libs
"""
if 'libx264' not in FF_LIBS['libs']:
logger.error('ffmpeg contains no libx264!')
playout_logger.error('ffmpeg contains no libx264!')
if 'libfdk-aac' not in FF_LIBS['libs']:
logger.warning(
playout_logger.warning(
'ffmpeg contains no libfdk-aac! No high quality aac...')
if 'tpad' not in FF_LIBS['filters']:
logger.error('ffmpeg contains no tpad filter!')
playout_logger.error('ffmpeg contains no tpad filter!')
if 'zmq' not in FF_LIBS['filters']:
logger.error(
playout_logger.error(
'ffmpeg contains no zmq filter! Text messages will not work...')
@ -691,13 +712,20 @@ def terminate_processes(watcher=None):
watcher.stop()
def ffmpeg_stderr_reader(std_errors, prefix):
def ffmpeg_stderr_reader(std_errors, decoder):
"""
read ffmpeg stderr decoder and encoder instance
and log the output
"""
if decoder:
logger = decoder_logger
prefix = DEC_PREFIX
else:
logger = encoder_logger
prefix = ENC_PREFIX
def form_line(line, level):
return f'{prefix} {line.replace(level, "").rstrip()}'
return f'{prefix}{line.replace(level, "").rstrip()}'
def write_log(line):
if '[info]' in line:

View File

@ -20,7 +20,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# set time zone
_TZ = ZoneInfo("Europe/Berlin")
# fake date and time
SOURCE_TIME = [2022, 1, 24, 12, 12, 0]
SOURCE_TIME = [2022, 1, 5, 5, 57, 10]
FAKE_DELTA = -2.2