Commit a1a97d34 authored by david's avatar david
Browse files

Fixed timer init and cmd chaining. #72

parent 113b3ff0
...@@ -39,7 +39,7 @@ class EngineControlInterface: ...@@ -39,7 +39,7 @@ class EngineControlInterface:
Provides ability to control the engine in various ways. Provides ability to control the engine in various ways.
""" """
config = None config = None
logger = None logger = None
engine = None engine = None
event_dispatcher = None event_dispatcher = None
sci = None sci = None
...@@ -51,10 +51,10 @@ class EngineControlInterface: ...@@ -51,10 +51,10 @@ class EngineControlInterface:
Args: Args:
config (AuraConfig): Engine configuration config (AuraConfig): Engine configuration
logger (AuraLogger): The logger logger (AuraLogger): The logger
""" """
self.engine = engine self.engine = engine
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ...")) self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
self.sci = SocketControlInterface.get_instance(event_dispatcher) self.sci = SocketControlInterface.get_instance(event_dispatcher)
...@@ -74,13 +74,13 @@ class SocketControlInterface: ...@@ -74,13 +74,13 @@ class SocketControlInterface:
Note this server only allows a single connection at once. This Note this server only allows a single connection at once. This
service is primarly utilized to store new playlogs. service is primarly utilized to store new playlogs.
""" """
PORT = 1337 PORT = 1337
ACTION_ON_METADATA = "on_metadata" ACTION_ON_METADATA = "on_metadata"
instance = None instance = None
config = None config = None
logger = None logger = None
server = None server = None
event_dispatcher = None event_dispatcher = None
...@@ -95,14 +95,14 @@ class SocketControlInterface: ...@@ -95,14 +95,14 @@ class SocketControlInterface:
""" """
if SocketControlInterface.instance: if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!")) raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self SocketControlInterface.instance = self
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher self.event_dispatcher = event_dispatcher
host = "127.0.0.1" host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host)) thread = Thread(target = self.run, args = (self.logger, host))
thread.start() thread.start()
@staticmethod @staticmethod
...@@ -119,7 +119,7 @@ class SocketControlInterface: ...@@ -119,7 +119,7 @@ class SocketControlInterface:
def run(self, logger, host): def run(self, logger, host):
""" """
Starts the socket server Starts the socket server
""" """
while True: while True:
try: try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
...@@ -131,23 +131,23 @@ class SocketControlInterface: ...@@ -131,23 +131,23 @@ class SocketControlInterface:
time.sleep(wait_time) time.sleep(wait_time)
logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}')) logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
self.server.listen() self.server.listen()
while True: while True:
(conn, client) = self.server.accept() (conn, client) = self.server.accept()
while True: while True:
r = SocketReader(conn) r = SocketReader(conn)
p = HttpStream(r) p = HttpStream(r)
data = p.body_file().read() data = p.body_file().read()
logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}')) logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
try: try:
self.process(logger, json.loads(data)) self.process(logger, json.loads(data))
conn.sendall(b'\n[ECI] processing done.\n') conn.sendall(b'\n[ECI] processing done.\n')
except Exception as e: except Exception as e:
logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e) logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)
conn.close() conn.close()
break break
...@@ -155,7 +155,7 @@ class SocketControlInterface: ...@@ -155,7 +155,7 @@ class SocketControlInterface:
""" """
Process incoming actions. Process incoming actions.
""" """
if "action" in data: if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA: if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
meta_data = data["data"] meta_data = data["data"]
meta_data["duration"] = data["track_duration"] meta_data["duration"] = data["track_duration"]
...@@ -166,7 +166,7 @@ class SocketControlInterface: ...@@ -166,7 +166,7 @@ class SocketControlInterface:
logger.error(SU.red("[ECI] Unknown action: " + data["action"])) logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else: else:
logger.error(SU.red(f'[ECI] Missing action in request: {data}')) logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
def terminate(self): def terminate(self):
...@@ -185,7 +185,8 @@ class EngineExecutor(Timer): ...@@ -185,7 +185,8 @@ class EngineExecutor(Timer):
Primarily used for automations performed by the scheduler. Primarily used for automations performed by the scheduler.
""" """
_lock = None _lock = None
logger = logging.getLogger("AuraEngine") logger = logging.getLogger("AuraEngine")
initialized = None
timer_store = {} timer_store = {}
parent_timer = None parent_timer = None
child_timer = None child_timer = None
...@@ -208,23 +209,24 @@ class EngineExecutor(Timer): ...@@ -208,23 +209,24 @@ class EngineExecutor(Timer):
func (function): The function to be called func (function): The function to be called
param (object): Parameter passt to the function param (object): Parameter passt to the function
""" """
self.initialized = False
self._lock = Lock() self._lock = Lock()
from src.engine import Engine from src.engine import Engine
now_unix = Engine.engine_time() now_unix = Engine.engine_time()
# Init parent-child relation # Init parent-child relation
self.parent_timer = parent_timer self.parent_timer = parent_timer
if self.parent_timer: if self.parent_timer:
self.parent_timer.child_timer = self self.parent_timer.child_timer = self
# Init meta data # Init meta data
self.direct_exec = False self.direct_exec = False
self.timer_type = timer_type self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time: if not due_time:
diff = 0 diff = 0
else: else:
diff = due_time - now_unix diff = due_time - now_unix
self.diff = diff self.diff = diff
...@@ -235,15 +237,15 @@ class EngineExecutor(Timer): ...@@ -235,15 +237,15 @@ class EngineExecutor(Timer):
is_stored = self.update_store() is_stored = self.update_store()
if not is_stored: if not is_stored:
self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead")) self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
else: else:
if diff < 0: if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg)) self.logger.error(SU.red(msg))
self.exec_now() self.exec_now()
elif diff == 0: elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately") self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now() self.exec_now()
else: else:
self.exec_timed() self.exec_timed()
self.start() self.start()
...@@ -252,7 +254,12 @@ class EngineExecutor(Timer): ...@@ -252,7 +254,12 @@ class EngineExecutor(Timer):
""" """
Calls the passed function `func` when the timer is ready. Calls the passed function `func` when the timer is ready.
""" """
self.join() while self.initialized == False:
timer.sleep(0.001)
self.logger.info(SU.orange("Waiting until the EngineExecutor is done with initialization..."))
if not self.direct_exec: #TODO Evaluate if we should join for direct exec too
self.join()
func() func()
...@@ -276,7 +283,8 @@ class EngineExecutor(Timer): ...@@ -276,7 +283,8 @@ class EngineExecutor(Timer):
self.wait_for_parent() self.wait_for_parent()
thread = Thread(name=self.timer_id, target=self.func, args=(self.param,)) thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
thread.start() thread.start()
self.initialized = True
def exec_timed(self): def exec_timed(self):
""" """
...@@ -285,11 +293,12 @@ class EngineExecutor(Timer): ...@@ -285,11 +293,12 @@ class EngineExecutor(Timer):
Assigns the `timer_id` as the thread name. Assigns the `timer_id` as the thread name.
""" """
def wrapper_func(param=None): def wrapper_func(param=None):
self.wait_for_parent() self.wait_for_parent()
if param: self.func(param,) if param: self.func(param,)
else: self.func() else: self.func()
super().__init__(self.diff, wrapper_func, (self.param,)) super().__init__(self.diff, wrapper_func, (self.param,))
self._name = self.timer_id self._name = self.timer_id
self.initialized = True
def update_store(self): def update_store(self):
...@@ -315,11 +324,11 @@ class EngineExecutor(Timer): ...@@ -315,11 +324,11 @@ class EngineExecutor(Timer):
return False return False
# Still waiting for execution -> update # Still waiting for execution -> update
else: else:
self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}") self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
existing_command.cancel() existing_command.cancel()
if existing_command.child_timer: if existing_command.child_timer:
self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}") self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
EngineExecutor.timer_store[self.timer_id] = self EngineExecutor.timer_store[self.timer_id] = self
self.logger.debug(f"Created command timer with ID: {self.timer_id}") self.logger.debug(f"Created command timer with ID: {self.timer_id}")
...@@ -351,7 +360,7 @@ class EngineExecutor(Timer): ...@@ -351,7 +360,7 @@ class EngineExecutor(Timer):
del_keys = [] del_keys = []
for timer in timers: for timer in timers:
if timer.dt < datetime.now() - timedelta(hours=3): if timer.dt < datetime.now() - timedelta(hours=3):
if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()): if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}") timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
del_keys.append(timer.timer_id) del_keys.append(timer.timer_id)
......
...@@ -48,7 +48,7 @@ class AuraScheduler(threading.Thread): ...@@ -48,7 +48,7 @@ class AuraScheduler(threading.Thread):
config = None config = None
logger = None logger = None
engine = None engine = None
exit_event = None exit_event = None
timeslot_renderer = None timeslot_renderer = None
programme = None programme = None
message_timer = [] message_timer = []
...@@ -70,12 +70,12 @@ class AuraScheduler(threading.Thread): ...@@ -70,12 +70,12 @@ class AuraScheduler(threading.Thread):
self.config = AuraConfig.config() self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.programme = ProgrammeService() self.programme = ProgrammeService()
self.timeslot_renderer = TimeslotRenderer(self) self.timeslot_renderer = TimeslotRenderer(self)
self.fallback = fallback_manager self.fallback = fallback_manager
self.engine = engine self.engine = engine
self.engine.scheduler = self self.engine.scheduler = self
self.is_soundsytem_init = False self.is_soundsytem_init = False
# Scheduler Initialization # Scheduler Initialization
AuraDatabaseModel.init_database() AuraDatabaseModel.init_database()
self.is_initialized = False self.is_initialized = False
...@@ -91,8 +91,8 @@ class AuraScheduler(threading.Thread): ...@@ -91,8 +91,8 @@ class AuraScheduler(threading.Thread):
def run(self): def run(self):
""" """
Called when thread is started via `start()`. It does the following: Called when thread is started via `start()`. It does the following:
1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration. 1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration.
2. Loads the latest programme from the database and sets the instance state `self.programme` with current timeslots. 2. Loads the latest programme from the database and sets the instance state `self.programme` with current timeslots.
3. Queues all timeslots of the programme, if the soundssystem is ready to accept commands. 3. Queues all timeslots of the programme, if the soundssystem is ready to accept commands.
...@@ -103,19 +103,19 @@ class AuraScheduler(threading.Thread): ...@@ -103,19 +103,19 @@ class AuraScheduler(threading.Thread):
self.config.load_config() self.config.load_config()
seconds_to_wait = int(self.config.get("fetching_frequency")) seconds_to_wait = int(self.config.get("fetching_frequency"))
self.logger.info(SU.cyan(f"== start fetching new timeslots (every {seconds_to_wait} seconds) ==")) self.logger.info(SU.cyan(f"== start fetching new timeslots (every {seconds_to_wait} seconds) =="))
# Load some stuff from the API in any case # Load some stuff from the API in any case
self.programme.refresh() self.programme.refresh()
# Queue only when the engine is ready to play # Queue only when the engine is ready to play
if self.is_initialized == True: if self.is_initialized == True:
self.queue_programme() self.queue_programme()
except Exception as e: except Exception as e:
self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e) self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e)
# Keep on working anyway # Keep on working anyway
EngineExecutor.log_commands() EngineExecutor.log_commands()
self.exit_event.wait(seconds_to_wait) self.exit_event.wait(seconds_to_wait)
...@@ -137,12 +137,12 @@ class AuraScheduler(threading.Thread): ...@@ -137,12 +137,12 @@ class AuraScheduler(threading.Thread):
self.queue_startup_entries() self.queue_startup_entries()
except NoActiveTimeslotException: except NoActiveTimeslotException:
# That's not good, but keep on working... # That's not good, but keep on working...
pass pass
def on_play(self, entry): def on_play(self, entry):
""" """
Event Handler which is called by the engine when some entry is actually playing. Event Handler which is called by the engine when some entry is actually playing.
Ignores entries which are part of a scheduled fallback, because they handle their Ignores entries which are part of a scheduled fallback, because they handle their
stuff by themselves. stuff by themselves.
...@@ -172,7 +172,7 @@ class AuraScheduler(threading.Thread): ...@@ -172,7 +172,7 @@ class AuraScheduler(threading.Thread):
def play_active_entry(self): def play_active_entry(self):
""" """
Plays the entry scheduled for the very current moment and forwards to the scheduled position in time. Plays the entry scheduled for the very current moment and forwards to the scheduled position in time.
Usually called when the Engine boots. Usually called when the Engine boots.
Raises: Raises:
...@@ -198,11 +198,11 @@ class AuraScheduler(threading.Thread): ...@@ -198,11 +198,11 @@ class AuraScheduler(threading.Thread):
now_unix = Engine.engine_time() now_unix = Engine.engine_time()
seconds_to_seek = now_unix - active_entry.start_unix seconds_to_seek = now_unix - active_entry.start_unix
# If the seek exceeds the length of the current track, # If the seek exceeds the length of the current track,
# there's no need to do anything - the scheduler takes care of the rest # there's no need to do anything - the scheduler takes care of the rest
if (seconds_to_seek + sleep_offset) > active_entry.duration: if (seconds_to_seek + sleep_offset) > active_entry.duration:
self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.") self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.")
else: else:
# Preload and play active entry # Preload and play active entry
PlayCommand(self.engine, [active_entry]) PlayCommand(self.engine, [active_entry])
...@@ -217,7 +217,7 @@ class AuraScheduler(threading.Thread): ...@@ -217,7 +217,7 @@ class AuraScheduler(threading.Thread):
self.logger.info("Sound-system seek response: " + response) self.logger.info("Sound-system seek response: " + response)
thread = threading.Thread(target = async_cue_seek, args = (seconds_to_seek,)) thread = threading.Thread(target = async_cue_seek, args = (seconds_to_seek,))
thread.start() thread.start()
elif active_entry.get_content_type() in ResourceClass.STREAM.types \ elif active_entry.get_content_type() in ResourceClass.STREAM.types \
or active_entry.get_content_type() in ResourceClass.LIVE.types: or active_entry.get_content_type() in ResourceClass.LIVE.types:
...@@ -227,7 +227,7 @@ class AuraScheduler(threading.Thread): ...@@ -227,7 +227,7 @@ class AuraScheduler(threading.Thread):
else: else:
self.logger.critical("Unknown Entry Type: %s" % active_entry) self.logger.critical("Unknown Entry Type: %s" % active_entry)
def get_active_playlist(self): def get_active_playlist(self):
...@@ -247,7 +247,7 @@ class AuraScheduler(threading.Thread): ...@@ -247,7 +247,7 @@ class AuraScheduler(threading.Thread):
def queue_programme(self): def queue_programme(self):
""" """
Queues the current programme (playlists as per timeslot) by creating Queues the current programme (playlists as per timeslot) by creating
timed commands to the sound-system to enable the individual tracks of playlists. timed commands to the sound-system to enable the individual tracks of playlists.
""" """
# Get a clean set of the timeslots within the scheduling window # Get a clean set of the timeslots within the scheduling window
...@@ -265,7 +265,7 @@ class AuraScheduler(threading.Thread): ...@@ -265,7 +265,7 @@ class AuraScheduler(threading.Thread):
playlist = self.programme.get_current_playlist(next_timeslot) playlist = self.programme.get_current_playlist(next_timeslot)
if playlist: if playlist:
self.queue_playlist_entries(next_timeslot, playlist.entries, False, True) self.queue_playlist_entries(next_timeslot, playlist.entries, False, True)
self.logger.info(SU.green("Finished queuing programme.")) self.logger.info(SU.green("Finished queuing programme."))
...@@ -276,7 +276,7 @@ class AuraScheduler(threading.Thread): ...@@ -276,7 +276,7 @@ class AuraScheduler(threading.Thread):
this method in any other scenario, as it doesn't respect the scheduling window. this method in any other scenario, as it doesn't respect the scheduling window.
""" """
current_timeslot = self.programme.get_current_timeslot() current_timeslot = self.programme.get_current_timeslot()
# Queue the (rest of the) currently playing timeslot upon startup # Queue the (rest of the) currently playing timeslot upon startup
if current_timeslot: if current_timeslot:
current_playlist = self.programme.get_current_playlist(current_timeslot) current_playlist = self.programme.get_current_playlist(current_timeslot)
...@@ -295,8 +295,8 @@ class AuraScheduler(threading.Thread): ...@@ -295,8 +295,8 @@ class AuraScheduler(threading.Thread):
Creates sound-system player commands for all playlist items to be executed at the scheduled time. Creates sound-system player commands for all playlist items to be executed at the scheduled time.
Since each scheduled playlist can consist of multiple entry types such as *file*, *live*, Since each scheduled playlist can consist of multiple entry types such as *file*, *live*,
and *stream*, the play-out of the timeslot is actually a bit more complex. Before any playlist and *stream*, the play-out of the timeslot is actually a bit more complex. Before any playlist
entries of the timeslot can be turned into sound, they need to be grouped, queued and pre-loaded. entries of the timeslot can be turned into sound, they need to be aggregated, queued and pre-loaded.
1. First, all entries are aggregated when they hold filesystem entries. 1. First, all entries are aggregated when they hold filesystem entries.
Given you have a playlist with 10 entries, the first 4 are consisting of files, the next two Given you have a playlist with 10 entries, the first 4 are consisting of files, the next two
...@@ -318,7 +318,7 @@ class AuraScheduler(threading.Thread): ...@@ -318,7 +318,7 @@ class AuraScheduler(threading.Thread):
Returns: Returns:
(String): Formatted string to display playlist entries in log (String): Formatted string to display playlist entries in log
""" """
entry_groups = [] entry_groups = []
entry_groups.append([]) entry_groups.append([])
previous_entry = None previous_entry = None
...@@ -333,7 +333,7 @@ class AuraScheduler(threading.Thread): ...@@ -333,7 +333,7 @@ class AuraScheduler(threading.Thread):
(previous_entry != None and \ (previous_entry != None and \
previous_entry.get_content_type() == entry.get_content_type() and \ previous_entry.get_content_type() == entry.get_content_type() and \
entry.get_content_type() in ResourceClass.FILE.types): entry.get_content_type() in ResourceClass.FILE.types):
entry_groups[index].append(entry) entry_groups[index].append(entry)
else: else:
index += 1 index += 1
...@@ -341,7 +341,7 @@ class AuraScheduler(threading.Thread): ...@@ -341,7 +341,7 @@ class AuraScheduler(threading.Thread):
entry_groups[index].append(entry) entry_groups[index].append(entry)
previous_entry = entry previous_entry = entry
self.logger.info("Built %s entry group(s)" % len(entry_groups)) self.logger.info("Built %s entry group(s)" % len(entry_groups))
# Timeslot function calls # Timeslot function calls
if len(entries) > 0 and len(entry_groups) > 0: if len(entries) > 0 and len(entry_groups) > 0:
for entries in entry_groups: for entries in entry_groups:
...@@ -359,17 +359,17 @@ class AuraScheduler(threading.Thread): ...@@ -359,17 +359,17 @@ class AuraScheduler(threading.Thread):
""" """
Ignore timeslots which are before the start of scheduling window (start of timeslot - `scheduling_window_start`) Ignore timeslots which are before the start of scheduling window (start of timeslot - `scheduling_window_start`)
or after the end of the scheduling window (end of timeslot -`scheduling_window_end`). or after the end of the scheduling window (end of timeslot -`scheduling_window_end`).
Before the scheduling window: Before the scheduling window:
- Timeslots can still be deleted in Steering and the playout will respect this - Timeslots can still be deleted in Steering and the playout will respect this
During the scheduling window: During the scheduling window:
- Timeslots and it's playlists are queued as timed commands - Timeslots and it's playlists are queued as timed commands
After the scheduling window: After the scheduling window:
- Such timeslots are ignored, because it doesn't make sense anymore to schedule them before the next