Skip to content
Snippets Groups Projects
Commit 7aaa3022 authored by David Trattnig's avatar David Trattnig
Browse files

First working version of Engine's scheduling.

parent c22ea569
Branches topic/tank_connection_david
No related tags found
No related merge requests found
......@@ -41,11 +41,12 @@ import threading
from operator import attrgetter
from modules.base.simpleutil import SimpleUtil
from modules.communication.redis.messenger import RedisMessenger
from modules.scheduling.calendar import AuraCalendarService
from libraries.database.broadcasts import Schedule, Playlist, AuraDatabaseModel
from libraries.exceptions.exception_logger import ExceptionLogger
from libraries.enum.auraenumerations import ScheduleEntryType, TimerType
from libraries.enum.auraenumerations import ScheduleEntryType, TimerType, TerminalColors
def alchemyencoder(obj):
......@@ -67,7 +68,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
"""
Aura Scheduler Class
- Gets data from Steering and Tanks
- Retrieves data from Steering and Tank
- Stores and fires events for LiquidSoap
Attributes:
......@@ -79,7 +80,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
programme: The current radio programme to be played as defined in the local engine database
active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track`
message_timer(Array<threading.Timer>): The message queue of tracks to be played
message_timer(Array<threading.Timer>): The message queue of Liquidsoap commands for playlists/tracks to be played
"""
redismessenger = None
job_result = {}
......@@ -115,34 +116,32 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# init threading
threading.Thread.__init__(self)
# init messenger.. probably not needed anymore
# init messenger.. FIXME probably not needed anymore
self.redismessenger.set_channel('scheduler')
self.redismessenger.set_section('execjob')
#self.redismessenger.send('Scheduler started', '0000', 'success', 'initApp', None, 'appinternal')
# Create exit event
self.exit_event = threading.Event()
# Start thread to load new programme info every hour
self.start()
def run(self):
"""
Called when thread is started via `start()`. It calls `self.fetch_new_programme()`
periodically depending on the `fetching_frequency` define engine configuration.
Called when thread is started via `start()`. It does following:
1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration.
2. Loads the latest programme from the database and sets the instance state `self.programme` with current schedules.
3. Queues all playlists of the programm, other than the playlist currently to be played (This is triggered by Liquidsoap itself).
"""
while not self.exit_event.is_set():
seconds_to_wait = int(self.config.get("fetching_frequency"))
next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait)
self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time)))
# empty database
# self.logger.info("emptying database")
# ScheduleEntry.truncate()
self.fetch_new_programme()
self.queue_programme()
self.print_message_queue()
self.exit_event.wait(seconds_to_wait)
......@@ -153,41 +152,66 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
def get_active_entry(self):
"""
Retrieves the current `Show` and `Track` tuple being played.
Externally called via `LiquidSoapCommunicator`.
Retrieves the current `PlaylistEntry` which should be played as per programme.
Publically called via `LiquidSoapCommunicator`.
Important note: This method also updates the state variable `active_entry`.
Returns:
(Show, Entry): The show and track to be played next.
(PlaylistEntry): The track which is (or should) currently being played
"""
# now_unix = time.mktime(datetime.datetime.now().timetuple())
# lastentry = None
# # Load programme if necessary
# if self.programme is None:
# self.logger.info("Next track requested: Need to load programme from database first.")
# self.load_programme_from_db()
# # Get the entry currently being played
# for show in self.programme:
# for entry in show.playlist:
# # check if lastentry is set and if act entry is in the future
# if lastentry is not None and entry.start_unix > now_unix:
# # return entry if so
# return (show,entry) # actsource = entry.source
# lastentry = entry
now_unix = time.mktime(datetime.datetime.now().timetuple())
# Load programme if necessary
if not self.programme:
self.logger.info("Next track requested: Need to load programme from database first.")
self.load_programme_from_db()
# Check for scheduled playlist
current_schedule, current_playlist = self.get_active_playlist()
if not current_playlist:
if not current_schedule:
self.logger.critical("There's no active playlist nor schedule. It's probably time to play some fallback...")
else:
self.logger.warning("There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule.")
return None
time_start = SimpleUtil.fmt_time(current_playlist.start_unix)
time_end = SimpleUtil.fmt_time(current_playlist.start_unix+current_playlist.duration)
time_now = SimpleUtil.fmt_time(now_unix)
self.logger.info("Current Playlist (%d:%d) for show '%s' scheduled to be played at %s until %s (Now: %s)" % (current_playlist.playlist_id, current_playlist.artificial_id, current_playlist.show_name, time_start, time_end, time_now))
# Iterate over playlist entries and store the current one
time_marker = current_playlist.start_unix
current_entry = None
for entry in current_playlist.entries:
self.logger.info(entry)
if entry.start_unix < now_unix < entry.start_unix + entry.duration:
current_entry = entry
break
time_marker += entry.duration
if current_entry:
time_start = SimpleUtil.fmt_time(current_entry.start_unix)
time_end = SimpleUtil.fmt_time(current_entry.start_unix+current_entry.duration)
time_now = SimpleUtil.fmt_time(now_unix)
self.logger.info("Track '%s' is expected playing from %s to %s (Now: %s)" % (current_entry.filename, time_start, time_end, time_now))
if not self.active_entry:
self.logger.warn("Activate track '%s' and [>> FFWD] to current point in time" % (current_entry.filename))
elif self.active_entry.filename != current_entry.filename:
self.logger.critical("--- SOMETHING UNEXPECTED IS PLAYING: %s --vs-- %s" % (self.active_entry.filename, current_entry.filename))
self.active_entry = current_entry
return (current_entry)
else:
# Nothing playing ... fallback will kick-in
self.logger.warning("There's no entry scheduled for playlist '%s'. Is currently -nothing- or a fallback playing?" % str(current_playlist))
return None
# return None, None
# FIXME active_entry logic
if not self.active_entry:
self.logger.warning("No active entry set! Is currently nothing or a fallback playing?")
return (None, None)
else:
return self.active_entry
# FIXME Review relevance.
def get_act_programme_as_string(self):
"""
Fetches the latest programme and returns it as `String`.
......@@ -217,7 +241,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
def print_message_queue(self):
"""
Prints the current message queue i.e. tracks in the queue to be played.
Prints the current message queue i.e. playlists in the queue to be played.
"""
message_queue = ""
messages = sorted(self.message_timer, key=attrgetter('diff'))
......@@ -227,7 +251,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
for msg in messages:
message_queue += str(msg)+"\n"
self.logger.info("Message Queue: " + message_queue)
self.logger.info("Message Queue: \n" + message_queue)
# ------------------------------------------------------------------------------------------ #
......@@ -252,7 +276,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
def get_next_file_for(self, fallbackname):
"""
Evaluates the next fallback file to be played for a given fallback-type.
Evaluates the next **fallback files/folders** to be played for a given fallback-type.
Valid fallback-types are:
* timeslot
......@@ -262,45 +286,221 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
Returns:
(String): Absolute path to the file to be played as a fallback.
"""
self.logger.critical("HAVE TO <GET> NEXT FILE FOR: " + fallbackname)
(show, entry) = self.get_active_entry()
self.logger.critical(str(show) + " " + str(entry))
file = None
# next_entry = None
# if not self.active_entry:
# self.get_active_entry()
# next_entry = self.active_entry
# else:
# next_entry = self.get_next_entry()
if fallbackname == "timeslot":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
file = "/home/david/Music/ab.mp3"
elif fallbackname == "show":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
elif fallbackname == "station":
file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3"
else:
file = ""
self.logger.critical("Should set next fallback file for " + fallbackname + ", but this playlist is unknown!")
self.logger.critical("Should set next fallback file for " + fallbackname + ", but this fallback is unknown!")
if file:
self.logger.info("Got next file '%s' (type: %s)" % (file, fallbackname))
#set_next_file_thread = SetNextFile(fallbackname, show)
#set_next_file_thread.start()
self.logger.info("Got next fallback file for '" + fallbackname + "': " + file)
# self.redismessenger.set_next_file_for(playlistname, file)
#self.redismessenger.set_next_file_for(playlistname, file)
return file
#
# PRIVATE METHODS
#
def get_active_playlist(self):
"""
Retrieves the schedule and playlist currently to be played as per
schedule. If the current point in time has no playlist assigned,
only the matching schedule is returned.
Returns:
(Schedule, Playlist): The current schedule and playlist tuple.
"""
now_unix = time.mktime(datetime.datetime.now().timetuple())
current_schedule = None
current_playlist = None
# Iterate over all shows and playlists and find the one to be played right now
for schedule in self.programme:
if schedule.start_unix < now_unix < schedule.end_unix:
current_schedule = schedule
for playlist in schedule.playlist:
if playlist.start_unix < now_unix < playlist.end_unix:
current_playlist = playlist
break
break
return (current_schedule, current_playlist)
def get_next_playlists(self):
"""
Retrieves the playlists to be played after the current one.
Returns:
([Playlist]): The next playlists
"""
now_unix = time.mktime(datetime.datetime.now().timetuple())
next_playlists = []
for schedule in self.programme:
if schedule.end_unix > now_unix:
for playlist in schedule.playlist:
if playlist.start_unix > now_unix:
next_playlists.append(playlist)
return next_playlists
def get_next_entry(self):
"""
Retrieves the playlist entry to be played next.
Returns:
(Playlist): The next playlist track
"""
next_entry = None
current_schedule, current_playlist = self.get_active_playlist()
if not self.active_entry:
self.logger.warn("For some reason there is no active playlist entry set... Fetching it now!")
self.get_active_entry()
if not self.active_entry:
self.logger.warn("Looks like nothing is currently scheduled...")
return None
# Check if there is a next entry in the current playlist
for i, entry in enumerate(self.active_entry.playlist.entries):
if entry is self.active_entry:
if i+1 < len(self.active_entry.playlist.entries):
next_entry = self.active_entry.playlist.entries[i+1]
break
# It might be in the next playlist...
if not next_entry:
next_playlist = None
found_current = False
# Iterate over all schedule and playlists and find the one to be played next
for schedule in self.programme:
for playlist in schedule.playlist:
if playlist is current_playlist:
found_current = True
elif found_current:
next_playlist = playlist
break
if next_playlist:
next_entry = next_playlist.entries[0]
if not next_entry:
self.logger.fatal("There is no next playlist-entry in the programme!")
return next_entry
def queue_programme(self):
"""
Queues the current programme (playlists as per schedule) by creating
timed commands to Liquidsoap to enable the individuals tracks of playlists.
"""
active_schedule, active_playlist = self.get_active_playlist()
playlists = self.get_next_playlists()
s = "\n\n PLAYING NOW:"
s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────"
if active_schedule:
s += "\n│ Playing schedule %s " % active_schedule
if active_playlist:
s += "\n│ └── Playlist %s " % active_playlist
else:
s += "\n│ └── %s No Playlist active. Did it finish before the end of the schedule? %s" % (TerminalColors.ORANGE.value, TerminalColors.ENDC.value)
else:
s += "\n│ Nothing. "
s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────"
s += "\n PLAYING NEXT:"
s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────"
if not playlists:
s += "\n│ Nothing. "
else:
for next_playlist in playlists:
s += "\n│ Queued schedule %s " % next_playlist.schedule
s += "\n│ └── Playlist %s " % next_playlist
if next_playlist.end_unix > next_playlist.schedule.end_unix:
s += "\n│ %s ↑↑↑ Playlist #%s ends after Schedule #%s!%s " % (TerminalColors.RED.value, next_playlist.playlist_id, next_playlist.schedule.schedule_id, TerminalColors.ENDC.value)
self.schedule_playlist(next_playlist)
s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────\n\n"
self.logger.info(s)
def schedule_playlist(self, playlist):
"""
Creates a schedule at the planned time for the LiquidSoap player.
Args:
playlist(Playlist): The playlist to be scheduled for playout
"""
now_unix = time.mktime(datetime.datetime.now().timetuple())
diff = playlist.start_unix - now_unix
def func(playlist):
self.logger.info("=== Executing scheduled LQS command: activate_playlist(...) ===")
self.liquidsoapcommunicator.activate_playlist(playlist)
planned_timer = self.is_something_planned_at_time(playlist.start_unix)
timer = None
if planned_timer:
# Check if the playlist_id's are different
if planned_timer.entry.playlist_id != playlist.playlist_id:
# If not, stop and remove the old timer, create a new one
self.stop_timer(planned_timer)
timer = self.create_timer(diff, func, [playlist], switcher=True)
else:
# If the playlists do not differ => reuse the old timer and do nothing
self.logger.info("Playlist %s is already scheduled - No new timer created!" % playlist)
else:
# If nothing is planned at given time, create a new timer
timer = self.create_timer(diff, func, [playlist], switcher=True)
if timer:
playlist.switchtimer = timer
def fetch_new_programme(self):
"""
Fetch the latest programme from `AuraCalendarService`.
In case no programme is successfully returned, it is tried
to retrieve the programme from Engine's database.
Fetch the latest programme from `AuraCalendarService` which stores it to the database.
After that, the programme is in turn loaded from the database and stored in `self.programme`.
"""
self.logger.info("Trying to fetch new program...")
# Fetch programme from API endpoints
self.logger.info("Trying to fetch new programe from API endpoints...")
acs = AuraCalendarService(self.config)
queue = acs.get_queue()
acs.start() # start fetching thread
response = queue.get() # wait for the end
self.logger.info("... Programme fetch via API done!")
# Reset last successful fetch state
lsf = self.last_successful_fetch
......@@ -308,27 +508,25 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
if response is None:
self.logger.warning("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.")
elif type(response) is list:
self.programme = response
if self.programme is not None and len(self.programme) > 0:
self.last_successful_fetch = datetime.datetime.now()
self.logger.info("+++ Successfully fetched current programme from API +++")
if len(self.programme) == 0:
self.logger.critical("Programme fetched from Steering/Tank has no entries!")
# return self.get_act_programme_as_string()
elif response.startswith("fetching_aborted"):
# TODO Check why the 16th entry is logged only
self.logger.warning("Trying to load programme from database, because fetching was being aborted from AuraCalendarService! Reason: " + response[16:])
else:
self.logger.warning("Trying to load programme from database, because i got an unknown response from AuraCalendarService: " + response)
# if somehow the programme could not be fetched => try to load it from database
#if self.last_successful_fetch is None:
# Always load latest programme from the database
self.last_successful_fetch = lsf
self.load_programme_from_db()
self.logger.info("Finished loading current programme from database")
for schedule in self.programme:
self.logger.debug("\tSchedule %s with Playlist %s" % (str(schedule), str(schedule.playlist[0])))
......@@ -340,30 +538,32 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
"""
self.programme = Schedule.select_act_programme()
if self.programme is None or len(self.programme) == 0:
if not self.programme:
self.logger.critical("Could not load programme from database. We are in big trouble my friend!")
return
planned_entries = []
# FIXME That's very likely not needed - review!
# planned_entries = []
for schedule in self.programme:
# playlist to play
schedule.playlist = Playlist.select_playlist(schedule.playlist_id)
# show fallback is played when playlist fails
schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id)
# timeslot fallback is played when show fallback fails
schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id)
# station fallback is played when timeslot fallback fails
schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id)
for p in schedule.playlist:
planned_entries.append(p)
# for schedule in self.programme:
# # playlist to play
# #schedule.playlist = [Playlist.select_playlist_for_schedule(schedule.schedule_start, schedule.playlist_id)]
# # show fallback is played when playlist fails
# #schedule.showfallback = Playlist.select_playlist(schedule.show_fallback_id)
# # timeslot fallback is played when show fallback fails
# #schedule.timeslotfallback = Playlist.select_playlist(schedule.timeslot_fallback_id)
# # station fallback is played when timeslot fallback fails
# #schedule.stationfallback = Playlist.select_playlist(schedule.station_fallback_id)
# for p in schedule.playlist:
# planned_entries.append(p)
# FIXME Same playlists are repeated over time - test with different schedules/timeslots/playlists
# Therefore only passing the first playlist for now:
self.logger.warn("ONLY PASSING 1ST PLAYLIST OF PROGRAMME")
self.enable_entries(planned_entries[0])
self.print_message_queue()
# self.logger.warn("ONLY PASSING 1ST PLAYLIST OF PROGRAMME")
# self.enable_entries(planned_entries[0])
......@@ -375,7 +575,6 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
Args:
playlist(Playlist): The playlist to be scheduled for playout
"""
now_unix = time.mktime(datetime.datetime.now().timetuple())
time_marker = playlist.start_unix
......@@ -384,27 +583,40 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
old_entry = None
for entry in playlist.entries:
time_marker += 1 # FIXME ???
diff=3
entry.start_unix = time_marker
self.enable_timer(diff, entry, old_entry)
old_entry = entry
# time_marker += 1 # FIXME ???
# Since we also get entries from the past, filter these out
if time_marker > now_unix:
# # Since we also get entries from the past, filter these out
# if time_marker > now_unix:
# when do we have to start?
diff = time_marker - now_unix
diff = 3 # FIXME test
entry.start_unix = time_marker
# # when do we have to start?
# diff = time_marker - now_unix
# diff = 3 # FIXME test
# entry.start_unix = time_marker
# enable the three timer
self.enable_timer(diff, entry, old_entry)
old_entry = entry
# # enable the three timer
# self.enable_timer(diff, entry, old_entry)
# old_entry = entry
# ------------------------------------------------------------------------------------------ #
def enable_timer(self, diff, entry, old_entry):
# create the activation threads and run them after <diff> seconds
"""
Create threads to send track-activation messages to LiquidSoap.
Those tracks can be delayed by `diff` seconds.
Args:
diff (Integer): seconds after tracks should be activated
"""
self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry))
entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry])
self.enable_fading(diff, entry, old_entry)
# FIXME Fade In/Out logic: Not sure if that's functional
#self.enable_fading(diff, entry, old_entry)
# ------------------------------------------------------------------------------------------ #
def enable_fading(self, diff, new_entry, old_entry):
......@@ -428,7 +640,6 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# ------------------------------------------------------------------------------------------ #
def add_or_update_timer(self, diff, func, parameters):
timer = None
# FIXME check we there's an array passed
entry = parameters[0]
planned_timer = self.is_something_planned_at_time(entry.start_unix)
......@@ -533,30 +744,30 @@ class AuraScheduler(ExceptionLogger, threading.Thread):
# ------------------------------------------------------------------------------------------ #
class SetNextFile(threading.Thread):
fallbackname = None
show = None
def __init__(self, fallbackname, show):
threading.Thread.__init__(self)
self.fallbackname = fallbackname
self.show = show
def run(self):
if self.fallbackname == "show":
self.detect_next_file_for(self.show.showfallback)
elif self.fallbackname == "timeslow":
self.detect_next_file_for(self.show.timeslotfallback)
elif self.fallbackname == "station":
self.detect_next_file_for(self.show.stationfallback)
def detect_next_file_for(self, playlist):
return ""
#if playlist.startswith("pool"):
# self.find_next_file_in_pool(playlist)
#def find_next_file_in_pool(self, pool):
# return ""
# class SetNextFile(threading.Thread):
# fallbackname = None
# show = None
# def __init__(self, fallbackname, show):
# threading.Thread.__init__(self)
# self.fallbackname = fallbackname
# self.show = show
# def run(self):
# if self.fallbackname == "show":
# self.detect_next_file_for(self.show.showfallback)
# elif self.fallbackname == "timeslow":
# self.detect_next_file_for(self.show.timeslotfallback)
# elif self.fallbackname == "station":
# self.detect_next_file_for(self.show.stationfallback)
# def detect_next_file_for(self, playlist):
# return ""
# #if playlist.startswith("pool"):
# # self.find_next_file_in_pool(playlist)
# #def find_next_file_in_pool(self, pool):
# # return ""
# ------------------------------------------------------------------------------------------ #
class CallFunctionTimer(threading.Timer):
......@@ -569,8 +780,12 @@ class CallFunctionTimer(threading.Timer):
switcher = False
def __init__(self, diff, func, param, fadein=False, fadeout=False, switcher=False):
self.logger = logging.getLogger("AuraEngine")
self.logger.debug("CallFunctionTimer: Executing LiquidSoap command '%s' in %s seconds..." % (str(func.__name__), str(diff)))
threading.Timer.__init__(self, diff, func, param)
# TODO Review usage of the fading-attributes:
if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher:
raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")
......@@ -582,9 +797,6 @@ class CallFunctionTimer(threading.Timer):
self.fadeout = fadeout
self.switcher = switcher
self.logger = logging.getLogger("AuraEngine")
self.logger.debug(str(self))
# ------------------------------------------------------------------------------------------ #
def __str__(self):
if self.fadein:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment