Skip to content
Snippets Groups Projects
Commit 51b991a1 authored by David Trattnig's avatar David Trattnig
Browse files

Scheduled fallback logic. #44

parent a74044ef
No related branches found
No related tags found
No related merge requests found
......@@ -30,10 +30,9 @@ from sqlalchemy import BigInteger, Boolean, Column, Date
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from modules.scheduling.types import PlaylistType
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil, EngineUtil
from modules.base.utils import SimpleUtil
from modules.core.resources import ResourceUtil
......@@ -159,7 +158,6 @@ class Schedule(DB.Model, AuraDatabaseModel):
schedule_fallback_id = Column(Integer)
show_fallback_id = Column(Integer)
station_fallback_id = Column(Integer)
fallback_state = PlaylistType.DEFAULT
fadeouttimer = None # Used to fade-out the schedule, even when entries are longer
......@@ -215,6 +213,27 @@ class Schedule(DB.Model, AuraDatabaseModel):
return schedules
def get_playlist(self):
"""
Returns the assigned playlist.
"""
# TODO Refactor to avoid storing array of playlists.
if self.playlist and self.playlist[0]:
return self.playlist[0]
return None
def has_queued_entries(self):
"""
Checks if entries of this timeslot have been queued at the engine.
"""
#TODO Make logic more transparent
if hasattr(self, "queued_entries"):
if len(self.queued_entries) > 0:
return True
return False
@hybrid_property
def start_unix(self):
"""
......@@ -256,7 +275,6 @@ class Schedule(DB.Model, AuraDatabaseModel):
"show": {
"name": self.show_name,
"type": self.get_type(),
"host": self.show_hosts
},
......@@ -289,7 +307,6 @@ class Playlist(DB.Model, AuraDatabaseModel):
# data
playlist_id = Column(Integer, autoincrement=False) # , ForeignKey("schedule.playlist_id"))
show_name = Column(String(256))
fallback_type = Column(Integer)
entry_count = Column(Integer)
......@@ -490,8 +507,8 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
def volume(self):
return 100 # FIXME Make DB Column
def get_type(self):
return EngineUtil.get_channel_type(self.uri)
def get_content_type(self):
return ResourceUtil.get_content_type(self.uri)
def get_prev_entries(self):
......
......@@ -26,7 +26,7 @@ import logging
from datetime import datetime
from modules.scheduling.types import PlaylistType
# from modules.scheduling.types import PlaylistType
from modules.base.utils import SimpleUtil as SU
from modules.base.models import Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData
from modules.scheduling.calender_fetcher import CalendarFetcher
......@@ -140,13 +140,22 @@ class AuraCalendarService(threading.Thread):
schedule_db = self.store_schedule(schedule)
# Store playlists to play
self.store_playlist(schedule_db, schedule_db.playlist_id, schedule["playlist"], PlaylistType.DEFAULT.id)
self.store_playlist(schedule_db, schedule_db.playlist_id, schedule["playlist"])
if schedule_db.schedule_fallback_id:
self.store_playlist(schedule_db, schedule_db.schedule_fallback_id, schedule["schedule_fallback"], PlaylistType.TIMESLOT.id)
self.store_playlist(schedule_db, schedule_db.schedule_fallback_id, schedule["schedule_fallback"])
if schedule_db.show_fallback_id:
self.store_playlist(schedule_db, schedule_db.show_fallback_id, schedule["show_fallback"], PlaylistType.SHOW.id)
self.store_playlist(schedule_db, schedule_db.show_fallback_id, schedule["show_fallback"])
if schedule_db.station_fallback_id:
self.store_playlist(schedule_db, schedule_db.station_fallback_id, schedule["station_fallback"], PlaylistType.STATION.id)
self.store_playlist(schedule_db, schedule_db.station_fallback_id, schedule["station_fallback"])
# self.store_playlist(schedule_db, schedule_db.playlist_id, schedule["playlist"], PlaylistType.DEFAULT.id)
# if schedule_db.schedule_fallback_id:
# self.store_playlist(schedule_db, schedule_db.schedule_fallback_id, schedule["schedule_fallback"], PlaylistType.TIMESLOT.id)
# if schedule_db.show_fallback_id:
# self.store_playlist(schedule_db, schedule_db.show_fallback_id, schedule["show_fallback"], PlaylistType.SHOW.id)
# if schedule_db.station_fallback_id:
# self.store_playlist(schedule_db, schedule_db.station_fallback_id, schedule["station_fallback"], PlaylistType.STATION.id)
result.append(schedule_db)
......@@ -203,12 +212,14 @@ class AuraCalendarService(threading.Thread):
def store_playlist(self, schedule_db, playlist_id, fetched_playlist, fallbackplaylist_type=0):
# def store_playlist(self, schedule_db, playlist_id, fetched_playlist, fallbackplaylist_type=0):
def store_playlist(self, schedule_db, playlist_id, fetched_playlist):
"""
Stores the Playlist to the database.
"""
if not playlist_id or not fetched_playlist:
self.logger.debug("Playlist type %s with ID '%s' is not available!" % (fallbackplaylist_type, playlist_id))
self.logger.debug(f"Playlist ID#{playlist_id} is not available!")
# self.logger.debug("Playlist type %s with ID '%s' is not available!" % (fallbackplaylist_type, playlist_id))
return
playlist_db = Playlist.select_playlist_for_schedule(schedule_db.schedule_start, playlist_id)
......@@ -222,7 +233,7 @@ class AuraCalendarService(threading.Thread):
playlist_db.playlist_id = playlist_id
playlist_db.schedule_start = schedule_db.schedule_start
playlist_db.show_name = schedule_db.show_name
playlist_db.fallback_type = fallbackplaylist_type
# playlist_db.fallback_type = fallbackplaylist_type
if "entries" in fetched_playlist:
playlist_db.entry_count = len(fetched_playlist["entries"])
else:
......
......@@ -19,15 +19,34 @@
import logging
import os, os.path
import random
from enum import Enum
from threading import Thread, Timer
from datetime import datetime, timedelta
from accessify import private, protected
from modules.scheduling.types import PlaylistType
from modules.base.utils import SimpleUtil, EngineUtil
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
from modules.core.channels import ChannelType
from modules.core.resources import ResourceClass
from modules.core.engine import Engine
class FallbackType(Enum):
"""
Types of playlists.
"""
NONE = { "id": 0, "name": "default" } # No fallback active, default playout
SCHEDULE = { "id": 1, "name": "schedule" } # The first played when some default playlist fails
SHOW = { "id": 2, "name": "show" } # The second played when the timeslot fallback fails
STATION = { "id": 3, "name": "station" } # The last played when everything else fails
@property
def id(self):
return self.value["id"]
def __str__(self):
return str(self.value["name"])
class FallbackManager:
......@@ -40,21 +59,16 @@ class FallbackManager:
logger (AuraLogger): The logger
mail (AuraMailer): Mail service
scheduler (AuraScheduler): The scheduler
fallback_history (Dict): Holds a 24h history of played, local tracks to avoid re-play
last_fallback (Integer): Timestamp, when the last local file fallback was played
is_processing (Boolean): Flag to avoid race-conditions, as Liquidsoap sends plenty of requests at once
"""
"""
config = None
logger = None
mailer = None
scheduler = None
fallback_history = {}
last_fallback = 0
is_processing = False
message_timer = None
def __init__(self, config, logger, scheduler):
def __init__(self, config, logger, scheduler, message_timer):
"""
Constructor
......@@ -65,162 +79,103 @@ class FallbackManager:
self.logger = logger
self.mailer = AuraMailer(self.config)
self.scheduler = scheduler
self.logger = logger
# self.message_timer = message_timer
self.message_timer = []
#
# PUBLIC METHODS
#
def resolve_playlist(self, schedule):
def schedule_fallback_playlist(self, schedule, schedule_now=False):
"""
Resolves the (fallback) playlist for the given schedule in case of pro-active fallback scenarios.
A resolved playlist represents the state how it would currently be aired. For example the `FallbackManager`
evaluated, that the actually planned playlist cannot be played for various reasons (e.g. entries n/a).
Instead one of the fallback playlists should be played. If the method is called some time later,
it actually planned playlist might be valid, thus returned as the resolved playlist.
As long the adressed schedule is still within the scheduling window, the resolved playlist can
always change.
This method also updates `schedule.fallback_state` to the current fallback type (`PlaylistType`).
Evaluates the scheduled fallback and queues it using a timed thread.
Args:
schedule (Schedule): The schedule to resolve the playlist for
Returns:
(Playlist): The resolved playlist
schedule_now (Boolean): If `True` it is executed immediately
"""
playlist = None
type = None
self.logger.info("Resolving playlist for schedule #%s ..." % schedule.schedule_id)
if not self.validate_playlist(schedule, "playlist"):
if not self.validate_playlist(schedule, "schedule_fallback"):
if not self.validate_playlist(schedule, "show_fallback"):
if not self.validate_playlist(schedule, "station_fallback"):
self.logger.error(SimpleUtil.red("No (fallback) playlists for schedule #%s available - not even a single one!" % schedule.schedule_id))
return None
else:
type = PlaylistType.STATION
playlist = schedule.station_fallback
else:
type = PlaylistType.TIMESLOT
playlist = schedule.schedule_fallback
else:
type = PlaylistType.SHOW
playlist = schedule.show_fallback
else:
type = PlaylistType.DEFAULT
playlist = schedule.playlist
if type and type != PlaylistType.DEFAULT:
previous_type = schedule.fallback_state
if type == previous_type:
self.logger.info("Fallback state for schedule #%s is still '%s'" % (schedule.schedule_id, type))
else:
self.logger.warn("Detected fallback type switch from '%s' to '%s' is required for schedule %s." % (previous_type, type, str(schedule)))
schedule.fallback_state = type
return playlist[0]
timer_start = None
timer_end = None
(fallback_type, playlist) = self.get_fallback_playlist(schedule)
if playlist:
self.logger.info(f"Resolved {fallback_type.value} fallback")
def do_schedule(entries):
self.logger.info(SU.cyan(f"=== set_fallback_playlist('{entries}') ==="))
self.scheduler.engine.player.start_fallback_playlist(entries)
def do_unschedule():
self.logger.info(SU.cyan("=== clear_fallback_playlist() ==="))
self.scheduler.engine.player.stop_fallback_playlist()
if schedule_now == True:
# Update queue immediately
thread = Thread(target = do_schedule, args = (playlist.entries,))
thread.start()
else:
# Update queue at the beginning of the timeslot
timer_start = FallbackCommandTimer(schedule.start_unix, do_schedule, playlist.entries)
self.message_timer.append(timer_start)
timer_start.start()
# Update fallback channel to be cleared at the end of the timeslot
timer_end = FallbackCommandTimer(schedule.end_unix, do_unschedule)
self.message_timer.append(timer_end)
timer_end.start()
return (timer_start, timer_end)
def handle_proactive_fallback(self, scheduler, playlist):
"""
This is the 1st level strategy for fallback handling. When playlist entries are pre-rolled their
state is validated. If any of them doesn't become "ready to play" in time, some fallback entries
are queued.
"""
resolved_playlist = self.resolve_playlist(playlist.schedule)
if playlist != resolved_playlist:
self.logger.info("Switching from playlist #%s to fallback playlist #%s ..." % (playlist.playlist_id, resolved_playlist.playlist_id))
# Destroy any existing queue timers
for entry in playlist.entries:
scheduler.stop_timer(entry.switchtimer)
self.logger.info("Stopped existing timers for entries")
# Queue the fallback playlist
scheduler.queue_playlist_entries(resolved_playlist.schedule, resolved_playlist.entries, False, True)
self.logger.info("Queued fallback playlist entries (Fallback type: %s)" % playlist.type)
else:
self.logger.critical(SimpleUtil.red("For some strange reason the fallback playlist equals the currently failed one?!"))
msg = f"There is no schedule- or show-fallback defined for timeslot#{schedule.schedule_id}. "
msg += f"The station fallback will be used automatically."
self.logger.info(msg)
def get_fallback_for(self, fallbackname):
def resolve_playlist(self, schedule):
"""
Retrieves a random fallback audio source for any of the types:
- timeslot/schedule
- show
- station
Args:
fallbackname (String): Fallback type
Retrieves the currently planned (fallback) playlist. If a normal playlist is available,
this one is returned. In case of station fallback no playlist is returned.
Args:
schedule (Schedule)
Returns:
(String): Absolute path to the file
(FallbackType, Playlist)
"""
file = ""
media_type = "PLAYLIST"
active_schedule, active_playlist = self.scheduler.get_active_playlist()
planned_playlist = None
fallback_type = None
# Block access to avoid race-conditions
if self.is_processing:
return None
if self.validate_playlist(schedule, "playlist"):
planned_playlist = schedule.get_playlist()
fallback_type = FallbackType.NONE
else:
self.is_processing = True
# Get fallback track(s) by fallback-type
if fallbackname == "timeslot":
file = self.get_playlist_items(active_schedule, "schedule_fallback")
elif fallbackname == "show":
file = self.get_playlist_items(active_schedule, "show_fallback")
elif fallbackname == "station":
file = self.get_playlist_items(active_schedule, "station_fallback")
if not file:
media_type = "TRACK"
file = self.get_random_local_track()
if not file:
self.logger.critical("Got no file for station fallback! Playing default test track, to play anything at all.")
file = "../../test/content/ernie_mayne_sugar.mp3"
media_type = "DEFAULT TRACK"
else:
file = ""
self.logger.critical("Should set next fallback file for " + fallbackname + ", but this fallback is unknown!")
if file:
# Send admin email to notify about the fallback state
if not active_playlist:
active_playlist = "-"
msg = "AURA ENGINE %s FALLBACK DETECTED!\n\n" % fallbackname
msg += "Expected, active Schedule: %s \n" % active_schedule
msg += "Expected, active Playlist: %s \n\n" % active_playlist
msg += "Providing FALLBACK-%s for %s '%s'\n\n" % (media_type, fallbackname, file)
msg += "Please review the schedules or contact your Aura Engine administrator."
self.mailer.send_admin_mail("CRITICAL - Detected fallback for %s" % fallbackname, msg)
self.logger.warn("Providing fallback %s: '%s'. Sent admin email about fallback state" % (media_type, file))
(fallback_type, planned_playlist) = self.get_fallback_playlist(schedule)
self.is_processing = False
return file
return (fallback_type, planned_playlist)
def fallback_has_started(self, artist, title):
def get_fallback_playlist(self, schedule):
"""
Called when a fallback track has actually started playing
"""
self.logger.info("Now playing: fallback track '%s - %s'." % (artist, title))
Retrieves the playlist to be used in a fallback scenario.
Args:
schedule (Schedule)
Returns:
(Playlist)
"""
playlist = None
fallback_type = FallbackType.STATION
if self.validate_playlist(schedule, "schedule_fallback"):
playlist = schedule.schedule_fallback[0]
fallback_type = FallbackType.SCHEDULE
elif self.validate_playlist(schedule, "show_fallback"):
playlist = schedule.show_fallback[0]
fallback_type = FallbackType.SHOW
return (fallback_type, playlist)
......@@ -229,10 +184,21 @@ class FallbackManager:
#
def validate_playlist(self, schedule, playlist_type):
"""
Checks if a playlist is valid for play-out.
Following checks are done for all playlists:
- has one or more entries
Fallback playlists should either:
- have filesystem entries only
- reference a recording of a previous playout of a show (also filesystem)
Otherwise, if a fallback playlist contains Live or Stream entries,
the exact playout behaviour can hardly be predicted.
"""
playlist = getattr(schedule, playlist_type)
if playlist \
......@@ -240,105 +206,116 @@ class FallbackManager:
and playlist[0].entries \
and len(playlist[0].entries) > 0:
return True
# Default playlist
if playlist_type == "playlist":
return True
# Fallback playlist
elif playlist[0].entries:
is_fs_only = True
for entry in playlist[0].entries:
if entry.get_content_type() not in ResourceClass.FILE.types:
self.logger.error(SU.red("Fallback playlist of type '%s' contains not only file-system entries! \
Skipping fallback level..." % playlist_type))
is_fs_only = False
break
return is_fs_only
return False
def validate_entries(self, entries):
"""
Checks if playlist entries are valid for play-out.
"""
for entry in entries:
if entry.get_type() == ChannelType.FILESYSTEM:
audio_store = self.config.get("audiofolder")
filepath = EngineUtil.uri_to_filepath(audio_store, entry.source)
if not self.is_audio_file(filepath):
self.logger.warn("Invalid filesystem path '%s' in entry '%s'" % (filepath, str(entry)))
return False
return True
class EngineCommandTimer(Timer):
"""
Timer for timed executing of Engine commands.
"""
timer_store = {}
logger = logging.getLogger("AuraEngine")
timer_id = None
timer_type = None
param = None
diff = None
dt = None
def get_playlist_items(self, schedule, fallback_key):
def __init__(self, timer_type="BASE", due_time=None, func=None, param=None):
"""
Retrieves the list of tracks from a playlist defined by `fallback_key`.
Constructor
"""
playlist_files = ""
if hasattr(schedule, fallback_key):
playlist = getattr(schedule, fallback_key)
if len(playlist) > 0:
playlist = playlist[0]
if playlist and playlist.entries:
for entry in playlist.entries:
playlist_files += entry.source + "\n"
now_unix = Engine.engine_time()
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
diff = due_time - now_unix
if diff < 0.0:
msg = f"Trying to create timer in the past: {self.timer_id}"
self.logger.error(SU.red(msg))
raise Exception(msg)
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
def wrapper_func(param=None):
# Remove from cache
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineCommandTimer.timer_store[self.timer_id]
# Call actual function
if param: func(param,)
else: func()
Timer.__init__(self, diff, wrapper_func, (param,))
self.update_cache()
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
return playlist_files
def update_cache(self):
"""
Adds the instance to the cache and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineCommandTimer.timer_store:
existing_command = EngineCommandTimer.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
EngineCommandTimer.timer_store[self.timer_id] = self
def get_random_local_track(self):
def print_active_timers(self):
"""
Retrieves a random audio track from the local station-fallback directory.
Returns:
(String): Absolute path to an audio file
Prints a list of active timers to the log.
"""
dir = self.config.fallback_music_folder
files = os.listdir(dir)
audio_files = list(filter(lambda f: self.is_audio_file(os.path.join(dir, f)), files))
if not dir or not audio_files:
self.logger.error("Folder 'fallback_music_folder = %s' is empty!" % dir)
return None
# If last played fallback is > 24 hours ago, ignore play history
# This should save used memory if the engine runs for a long time
if self.last_fallback < SimpleUtil.timestamp() - (60*60*24):
self.fallback_history = {}
self.logger.info("Cleared fallback history.")
self.last_fallback = SimpleUtil.timestamp()
# Retrieve files which haven't been played yet
history = set(self.fallback_history.keys())
left_audio_files = list( set(audio_files) - (history) )
self.logger.info("Left fallback audio-files: %d/%d" % (len(left_audio_files), len(audio_files)))
# If nothing left, clear history and start with all files again
if not len(left_audio_files):
self.fallback_history = {}
left_audio_files = audio_files
for id, timer in EngineCommandTimer.timer_store.values():
EngineCommandTimer.logger.info(str(timer))
# Select random track from directory
i = random.randint(0, len(left_audio_files)-1)
file = os.path.join(dir, left_audio_files[i])
# Store track in history, to avoid playing it multiple times
if file:
self.fallback_history[left_audio_files[i]] = SimpleUtil.timestamp()
return file
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] COMMAND TIMER due at {str(self.dt)} (alive: {self.is_alive()})"
def is_audio_file(self, file):
"""
Checks if the passed file is an audio file i.e. has a file-extension
known for audio files.
Args:
dir (String):
file (File): the file object.
Returns:
(Boolean): True, if it's an audio file.
class FallbackCommandTimer(EngineCommandTimer):
"""
Timer for executing timed scheduling of fallback playlists.
"""
def __init__(self, diff=None, func=None, param=None):
"""
audio_extensions = [".wav", ".flac", ".mp3", ".ogg", ".m4a"]
ext = os.path.splitext(file)[1]
Constructor
"""
super().__init__("FALLBACK", diff, func, param)
self.logger.info("Executing scheduled fallback playlist update '%s' in %s seconds..." % \
(str(func.__name__), str(diff)))
if os.path.isfile(file):
if any(ext in s for s in audio_extensions):
return True
return False
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment