Commit 8ae1dc27 authored by David Trattnig's avatar David Trattnig
Browse files

Engine core refactoring with improved scheduling.

parent 22b0f8b4
......@@ -85,15 +85,11 @@ class Aura:
# Check if the database has to be re-created
if self.config.get("recreate_db") is not None:
AuraScheduler(self.config)
AuraScheduler(self.config, None)
# Create scheduler and Liquidsoap communicator
self.liquidsoapcommunicator = LiquidSoapCommunicator(self.config)
self.scheduler = AuraScheduler(self.config)
# Give both a reference of each other
self.liquidsoapcommunicator.scheduler = self.scheduler
self.scheduler.liquidsoapcommunicator = self.liquidsoapcommunicator
self.scheduler = AuraScheduler(self.config, self.liquidsoapcommunicator)
# Create the Redis adapter
self.messenger = ServerRedisAdapter(self.config)
......@@ -107,7 +103,6 @@ class Aura:
# And finally wait for redis message / start listener thread
self.messenger.start()
self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__))
#
......
......@@ -75,6 +75,9 @@ fade_out_time="2.5"
# all these settings from here to the bottom require a restart of the liquidsoap server
# Liquidsoap execution delay; Crucial to keep things in sync
lqs_delay_offset=1
[user]
# the user and group under which this software will run
daemongroup="david"
......
......@@ -133,8 +133,9 @@ class Guru():
self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?")
self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?")
self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing")
self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?")
self.parser.add_argument("-ts", "--adapt-trackservice-title", action="store", dest="adapt_trackservice_title", default=False, metavar="INFO", help="Update trackservice entry due to fallback")
self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing")
if len(sys.argv) == 1:
raise ValueError("No Argument passed!")
......
......@@ -377,20 +377,6 @@ class Playlist(DB.Model, AuraDatabaseModel):
return total
@hybrid_property
def current_entry(self):
"""
Retrieves the entry to be played at the very, current point in time.
"""
now_unix = SimpleUtil.timestamp()
for entry in self.entries:
if entry.start_unix < now_unix < entry.end_unix:
return entry
return None
def as_dict(self):
"""
Returns the playlist as a dictionary for serialization.
......@@ -435,6 +421,9 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
duration = Column(BigInteger)
filename = Column(String(1024))
entry_start = Column(DateTime)
queue_state = None # Assigned when entry is about to be queued
channel = None # Assigned when entry is actually played
# relationships
playlist = relationship("Playlist", uselist=False, back_populates="entries")
......@@ -454,26 +443,16 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
@hybrid_property
def end_unix(self):
return time.mktime(self.entry_start.timetuple()) + self.duration
return time.mktime(self.entry_end.timetuple())
@hybrid_property
def volume(self):
return 100
return 100 # FIXME Make DB Column
@hybrid_property
def type(self):
return EngineUtil.get_channel_type(self.uri)
@hybrid_property
def channel(self):
type = EngineUtil.get_channel_type(self.uri)
if type == ChannelType.FILESYSTEM:
return Channel.FILESYSTEM_A
elif type == ChannelType.STREAM:
return Channel.STREAM_A
else:
return "foo:bar"
#FIXME Extend & finalize!!
def get_prev_entries(self):
"""
......@@ -489,17 +468,25 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
return prev_entries
def get_next_entries(self):
def get_next_entries(self, schedule_sensitive=True):
"""
Retrieves all following entries as part of the current entry's playlist.
Args:
schedule_sensitive (Boolean): If `True` entries which start after \
the end of the schedule are excluded
Returns:
(List): List of PlaylistEntry
"""
next_entries = []
for entry in self.playlist.entries:
if entry.entry_start > self.entry_start:
next_entries.append(entry)
if schedule_sensitive:
if entry.entry_start < self.playlist.schedule.schedule_end:
next_entries.append(entry)
else:
next_entries.append(entry)
return next_entries
......@@ -523,8 +510,8 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.filename[-15:]
return "PlaylistEntry ID#%s [%s - %s | %ssec | Track: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track)
track = self.filename[-25:]
return "PlaylistEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track)
......@@ -606,7 +593,7 @@ class TrackService(DB.Model, AuraDatabaseModel):
- Scenario 1: No fallback, all info is gathered via the playlist entry
- Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry
- Scenario 3: This type of fallback didn't get scheduled; a local audio-file is played
- Scenario 3: This type of fallback didn't get scheduled; a single entry is played
"""
if self.playlist_entry:
return self.playlist_entry.as_dict()
......@@ -784,8 +771,8 @@ class SingleEntry(DB.Model, AuraDatabaseModel):
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.filename[-15:]
return "SingleEntry ID#%s [%s - %s | %ssec | Track: ...%s]" % (str(self.id), time_start, time_end, self.duration, track)
track = self.filename[-25:]
return "SingleEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.id), time_start, time_end, self.duration, track)
......
# Meta
__version__ = '0.6.1'
__author__ = "David Trattnig and Gottfried Gaisbauer"
__copyright__ = "Copyright 2017-2020, Aura Engine Team"
__credits__ = ["David Trattnig", "Gottfried Gaisbauer", "Michael Liebler"]
__license__ = "GNU Affero General Public License (AGPL) Version 3"
__version__ = "0.6.1"
__version_info__ = (0, 6, 1)
__author__ = 'David Trattnig <david.trattnig@subsquare.at>'
\ No newline at end of file
__maintainer__ = "David Trattnig"
__email__ = "david.trattnig@subsquare.at"
__status__ = "Development"
\ No newline at end of file
......@@ -112,3 +112,8 @@ class TimerType(Enum):
FADEIN = "fadein"
FADEOUT = "fadeout"
class EntryQueueState(Enum):
OKAY = "ok"
CUT = "cut"
OUT_OF_SCHEDULE = "oos"
......@@ -29,6 +29,8 @@
class NoProgrammeLoadedException(Exception):
pass
class NoActiveScheduleException(Exception):
pass
# Mixer Exceptions
......@@ -39,6 +41,8 @@ class InvalidChannelException(Exception):
class PlaylistException(Exception):
pass
class NoActiveEntryException(Exception):
pass
# Monitoring Exceptions
......
......@@ -92,8 +92,8 @@ class Padavan:
elif self.args.init_player:
self.init_player()
elif self.args.adapt_trackservice_title:
self.adapt_trackservice_title(self.args.adapt_trackservice_title)
elif self.args.on_play:
self.on_play(self.args.on_play)
elif self.args.recreatedb:
self.recreatedb()
......@@ -217,12 +217,12 @@ class Padavan:
def adapt_trackservice_title(self, info):
def on_play(self, info):
"""
Updates the tracks-service with the info on currently played fallback track.
Event handler to be called when some entry started playing.
"""
self.stringreply = self.send_and_wait_redis("aura", "adapt_trackservice_title " + info, RedisChannel.GNF_REPLY)
self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY)
# ------------------------------------------------------------------------------------------ #
def recreatedb(self):
......
......@@ -105,9 +105,9 @@ class LiquidSoapPlayerClient(LiquidSoapClient):
Liquidsoap server response
"""
if channel == Channel.FILESYSTEM_A.value:
self.command(channel.value, 'clear_filesystem_0')
self.command(channel, 'clear_filesystem_0')
elif channel == Channel.FILESYSTEM_B.value:
self.command(channel.value, 'clear_filesystem_1')
self.command(channel, 'clear_filesystem_1')
else:
return "Invalid filesystem channel '%s'" % channel
......
......@@ -175,18 +175,9 @@ class ServerRedisAdapter(threading.Thread, RedisMessenger):
#playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)
elif item["data"].find("adapt_trackservice_title") >= 0:
source = item["data"].split("|+|+|")[1]
artist = item["data"].split("|+|+|")[2]
title = item["data"].split("|+|+|")[3]
if not artist:
artist = ""
if not title:
title = ""
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.liquidsoapcommunicator.store_trackservice_info, source)
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.adapt_trackservice_title, source, artist, title)
elif item["data"].find("on_play") >= 0:
source = item["data"].split("on_play ")[1]
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.liquidsoapcommunicator.on_play, source)
elif item["data"] == "recreate_db":
self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)
......
......@@ -299,8 +299,8 @@ class RedisMessenger():
# return next.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def adapt_trackservice_title(self, info):
# result = self.rstore.db.get('adapt_trackservice_title')
# def on_play(self, info):
# result = self.rstore.db.get('on_play')
# if result is None:
# result = b""
......
......@@ -24,8 +24,9 @@ import datetime
import threading
import meta
from modules.base.enum import Channel, ChannelType
from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil
from modules.base.exceptions import NoActiveScheduleException
from modules.base.enum import Channel, ChannelType
from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil
class StartupThread(threading.Thread):
......@@ -37,99 +38,33 @@ class StartupThread(threading.Thread):
logger = None
active_entry = None
liquidsoapcommunicator = None
scheduler = None
def __init__(self, liquidsoapcommunicator, active_entry):
def __init__(self, liquidsoapcommunicator):
"""
Initialize the thread.
"""
threading.Thread.__init__(self)
self.logger = logging.getLogger("AuraEngine")
self.liquidsoapcommunicator = liquidsoapcommunicator
self.active_entry = active_entry
self.scheduler = liquidsoapcommunicator.scheduler
def run(self):
"""
Starts the LiquidSoap player including the current show.
Boots the soundsystem.
"""
try:
# Sleep needed, because the socket is created too slow by liquidsoap
time.sleep(1)
self.logger.info("Waited 1s for liquidsoap. Jez soit a si gspian")
self.liquidsoapcommunicator.enable_transaction()
# Wait another second. lqs really starts slow.. be prepared you liquidsoap you!
time.sleep(1)
self.logger.info(SimpleUtil.green("Engine Core ------[ connected ]-------- Liquidsoap"))
self.set_start_parameters()
self.liquidsoapcommunicator.start()
self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__))
self.scheduler.on_ready()
# Display the current programme
programme = self.liquidsoapcommunicator.scheduler.get_ascii_programme()
self.logger.info(programme)
# Start playing
self.set_active_show()
self.liquidsoapcommunicator.disable_transaction()
# The rest of the system now can use liquidsoap connection
self.liquidsoapcommunicator.is_liquidsoap_running = True
except NoActiveScheduleException as e:
self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.")
except Exception as e:
self.logger.critical(TerminalColors.RED.value+"Liquidsoap connection ERROR! Restart LQ Server! Reason: "+str(e)+TerminalColors.ENDC.value)
self.logger.error("Error while initializing the soundsystem: " + str(e))
def set_active_show(self):
"""
Sets and resumes the show which should be playing at the time of starting
the LiquidSoap player.
"""
if self.active_entry is not None:
channel_type = self.active_entry.type
self.logger.info("Engine Startup: Play '%s' via channel type '%s'" % (str(self.active_entry), channel_type))
# TODO Skip active entry if not enough time left; wait and play next one instead
self.liquidsoapcommunicator.play(self.active_entry, False)
if channel_type == ChannelType.FILESYSTEM:
# Have to seek? Calc how many seconds were missed
now_unix = time.mktime(datetime.datetime.now().timetuple())
seconds_to_seek = now_unix - self.active_entry.start_unix
# And seek these seconds forward
if seconds_to_seek > 0:
# Without plenty of timeout (10s) the seek doesn't work
seconds_to_seek += 10
time.sleep(10)
channel = self.liquidsoapcommunicator.active_channel[ChannelType.FILESYSTEM]
response = self.liquidsoapcommunicator.playlist_seek(channel, seconds_to_seek)
self.logger.info("LiquidSoap seek response: " + response)
else:
self.logger.warning("No active entry in the scheduler! Is a programme loaded?")
def set_start_parameters(self):
"""
Set initial parameters for the LiquidSoap player startup.
"""
self.liquidsoapcommunicator.mixer_start()
# Setting init params like a blank file..
install_dir = self.liquidsoapcommunicator.config.get("install_dir")
channel = self.liquidsoapcommunicator.active_channel[ChannelType.FILESYSTEM]
self.liquidsoapcommunicator.playlist_push(channel, install_dir + "/configuration/blank.flac")
# .. or the radio fro stream (it is overwritten as soon as one http overtake is planned)
#self.liquidsoapcommunicator.set_http_url("http://stream.fro.at/fro-128.ogg")
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from collections import deque
from modules.base.exceptions import NoActiveEntryException
from modules.base.utils import SimpleUtil
from libraries.database.broadcasts import SingleEntry, SingleEntryMetaData, PlaylistEntry, PlaylistEntryMetaData, TrackService
class PlayerStateService:
"""
PlayerStateService keeps a short history of currently playing entries. It stores the recent
active entries to a local cache `entry_history` being able to manage concurrently playing entries.
It also is in charge of storing relevant meta information of the currently playing entry to
the TrackService table.
"""
config = None
logger = None
entry_history = None
def __init__(self, config):
"""
Constructor
Args:
config (AuraConfig): Holds the engine configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.entry_history = deque([None, None, None])
#
# PUBLIC METHODS
#
def set_active_entry(self, entry):
"""
Saves the currently playing entry to the local cache.
"""
self.entry_history.pop()
self.entry_history.appendleft(entry)
self.logger.info("Active entry history:\n"+str(self.entry_history))
def get_active_entry(self):
"""
Retrieves the currently playing `Entry` from the local cache.
"""
return self.entry_history[0]
def store_trackservice_entry(self, source):
"""
Stores the given entry the Track Service.
Args:
source (String): The URI of the currently playing source
Raises:
(NoActiveEntryException): In case currently nothing is playing
"""
active_entry = self.get_active_entry()
if not active_entry:
raise NoActiveEntryException
if active_entry.filename == source:
trackservice = TrackService(active_entry)
trackservice.store(add=True, commit=True)
active_entry.trackservice_id = trackservice.id
active_entry.store(add=False, commit=True)
self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice))
else:
msg = "Active entry source '%s' != '%s' activated source." % (active_entry.filename, source)
self.logger.critical(SimpleUtil.red(msg))
# def adapt_trackservice_title(self, filename):
# """
# Updates the track-service entry with the info from a fallback track/playlist.
# """
# liquidsoap_offset = int(self.config.lqs_delay_offset)
# scheduled_entry = self.get_active_entry(liquidsoap_offset)
# entry = SingleEntry()
# meta = SingleEntryMetaData()
# # # Validate artist and title
# # if not title:
# # title = self.config.get("fallback_title_not_available")
# # Create Entry
# entry.filename = filename
# entry.duration = self.fallback_manager.get_track_duration(filename)
# if not entry.duration:
# self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry)))
# # Create track service log for local station fallback (type=4)
# trackservice = TrackService(entry, 4)
# trackservice.store(add=True, commit=True)
# entry.store(add=True, commit=True)
# # Create Meta
# meta.artist = "----FIXME"
# meta.album = ""
# meta.title = "----TODO"
# meta.single_entry_id = entry.id
# meta.store(add=True, commit=True)
# # Reference each other
# entry.meta_data_id = meta.id
# entry.trackservice_id = trackservice.id
# entry.store(add=False, commit=True)
# msg = "Track Service active track '%s' updated with fallback source '%s - %s'!" % (scheduled_entry, meta.artist, meta.title)
# self.logger.info(msg)
# return msg
#
# PRIVATE METHODS
#
......@@ -50,14 +50,20 @@ inputs = ref []
%include "in_soundcard.liq"
# fill the mixer
mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_filesystem_2, input_filesystem_3, input_filesystem_4, input_http_0, input_http_1], !inputs))
mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_http_0, input_http_1], !inputs))
# mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_filesystem_2, input_filesystem_3, input_filesystem_4, input_http_0, input_http_1], !inputs))
# output source with fallbacks
stripped_stream = strip_blank(id='strip_blank', track_sensitive=false, max_blank=fallback_max_blank, min_noise=fallback_min_noise, threshold=fallback_threshold, mixer)
# enable fallback
output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)])
# output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)])
#output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)])
output_source = mksafe(stripped_stream)
ignore(timeslot_fallback)
ignore(show_fallback)
ignore(station_fallback)
##################
# create outputs #
......
......@@ -204,9 +204,9 @@ def fallback_create(~skip=true, name, requestor)
# Tell the system when a new track is played
def do_meta(meta) =
filename = meta["filename"]
artist = meta["artist"]
title = meta["title"]
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --adapt-trackservice-title "|+|+|#{filename}|+|+|#{artist}|+|+|#{title}|+|+|"')
# artist = meta["artist"]
# title = meta["title"]
system('#{list.assoc(default="", "install_dir", ini)}/guru.py --on_play "#{filename}"')
end
source = on_metadata(do_meta, source)
......
......@@ -28,10 +28,21 @@ input_filesystem_1 = request.equeue(id="in_filesystem_1")
# input_filesystem_3 = request.equeue(id="in_filesystem_3")
# input_filesystem_4 = request.equeue(id="in_filesystem_4")
#input_fs = cue_cut(mksafe(request.equeue(id="fs")))
#req = request.queue(id="req")
# Update Trackservice
def do_meta_filesystem(meta) =