test Queue instead of external buffer tool

This commit is contained in:
Jonathan Baecker 2019-05-27 10:31:26 +02:00
parent c5f8ebc7a7
commit f52a045eef

View File

@ -26,6 +26,7 @@ import os
import smtplib
import socket
import sys
import traceback
from argparse import ArgumentParser
from ast import literal_eval
from datetime import date, datetime, timedelta
@ -38,6 +39,7 @@ from subprocess import PIPE, CalledProcessError, Popen, check_output
from threading import Thread
from time import sleep
from types import SimpleNamespace
from queue import Queue
# ------------------------------------------------------------------------------
# read variables from config file
@ -325,19 +327,17 @@ def calc_buffer_size():
# check if processes a well
def check_process(play_thread, playout, mbuffer):
def check_process(play_thread, playout):
while True:
sleep(4)
if playout.poll() is not None:
logger.error(
'post-process is not alive anymore, terminate ffplayout!')
mbuffer.terminate()
break
if not play_thread.is_alive():
logger.error(
'pre-process is not alive anymore, terminate ffplayout!')
mbuffer.terminate()
break
@ -357,6 +357,9 @@ def check_sync(begin):
if 0 <= time_now < _playlist.start and not begin == _playlist.start:
time_distance -= 86400.0
# TODO: this is only for debugging
print(time_distance)
# check that we are in tolerance time
if abs(time_distance) > 5 + tolerance:
mailer.warning(
@ -943,11 +946,11 @@ class GetSourceIter:
# independent thread for clip preparation
def play_clips(out_file, GetSourceIter):
def play_clips(buffer, GetSourceIter):
# send current file to buffer stdin
iter = GetSourceIter()
get_source = GetSourceIter()
for src_cmd, filtergraph in iter.next():
for src_cmd, filtergraph in get_source.next():
if _pre_comp.copy:
ff_pre_settings = _pre_comp.copy_settings
else:
@ -970,85 +973,92 @@ def play_clips(out_file, GetSourceIter):
logger.info('play: "{}"'.format(current_file))
file_piper = Popen(
# TODO: stderr and stdin only when they are used
decoder = Popen(
[
'ffmpeg', '-v', 'error', '-hide_banner', '-nostats'
] + src_cmd + list(ff_pre_settings),
stdout=PIPE,
bufsize=0
stderr=PIPE,
stdin=PIPE
)
copyfileobj(file_piper.stdout, out_file)
for package in iter(decoder.stdout.readline, ''):
buffer.put(package)
# TODO: make this nicer
except Exception:
print(traceback.format_exc())
finally:
file_piper.wait()
decoder.wait()
def main():
year = get_date(False).split('-')[0]
# open a buffer for the streaming pipeline
# stdin get the files loop
# stdout pipes to ffmpeg rtmp streaming
# TODO: have an eye on maxsize
buffer = Queue(maxsize=12)
try:
# open a buffer for the streaming pipeline
# stdin get the files loop
# stdout pipes to ffmpeg rtmp streaming
mbuffer = Popen(
[_buffer.cli] + list(_buffer.cmd)
+ ['{}k'.format(calc_buffer_size())],
stdin=PIPE,
stdout=PIPE,
bufsize=0
)
try:
if _playout.preview:
# preview playout to player
playout = Popen([
'ffplay', '-v', 'error',
'-hide_banner', '-nostats', '-i', 'pipe:0'],
stdin=mbuffer.stdout,
bufsize=0
)
if _playout.preview:
# preview playout to player
playout = Popen([
'ffplay',
'-hide_banner', '-nostats', '-i', 'pipe:0'],
stderr=None,
stdin=PIPE,
stdout=None
)
else:
# playout to rtmp
if _pre_comp.copy:
playout_pre = [
'ffmpeg', '-v', 'info', '-hide_banner', '-nostats',
'-re', '-i', 'pipe:0', '-c', 'copy'
] + _playout.post_comp_copy
else:
# playout to rtmp
if _pre_comp.copy:
playout_pre = [
'ffmpeg', '-v', 'info', '-hide_banner', '-nostats',
'-re', '-i', 'pipe:0', '-c', 'copy'
] + _playout.post_comp_copy
else:
playout_pre = [
'ffmpeg', '-v', 'info', '-hide_banner', '-nostats',
'-re', '-thread_queue_size', '256',
'-fflags', '+igndts', '-i', 'pipe:0',
'-fflags', '+genpts'
] + _playout.post_comp_video + \
_playout.post_comp_audio
playout_pre = [
'ffmpeg', '-v', 'info', '-hide_banner', '-nostats',
'-re', '-thread_queue_size', '256',
'-i', 'pipe:0'
] + _playout.post_comp_video + \
_playout.post_comp_audio
playout = Popen(
list(playout_pre)
+ [
'-metadata', 'service_name=' + _playout.name,
'-metadata', 'service_provider=' + _playout.provider,
'-metadata', 'year=' + year
] + list(_playout.post_comp_extra)
+ [
_playout.out_addr
],
stdin=mbuffer.stdout,
bufsize=0
)
play_thread = Thread(
name='play_clips', target=play_clips, args=(
mbuffer.stdin,
GetSourceIter,
)
playout = Popen(
list(playout_pre)
+ [
'-metadata', 'service_name=' + _playout.name,
'-metadata', 'service_provider=' + _playout.provider,
'-metadata', 'year=' + year
] + list(_playout.post_comp_extra)
+ [
_playout.out_addr
],
stdin=PIPE,
)
play_thread.daemon = True
play_thread.start()
check_process(play_thread, playout, mbuffer)
finally:
playout.wait()
play_thread = Thread(
name='play_clips', target=play_clips, args=(
buffer,
GetSourceIter,
)
)
play_thread.daemon = True
play_thread.start()
# TODO: this needs to be changed,
# while True is bad, it needs a check to be able to exit
# with this loop we also end up never in the check_process function
while True:
line = buffer.get()
playout.stdin.write(line)
check_process(play_thread, playout)
finally:
mbuffer.wait()
playout.wait()
if __name__ == '__main__':