Skip to content
Snippets Groups Projects
Commit 4099a3b8 authored by David Trattnig's avatar David Trattnig
Browse files

Fixed timer init and cmd chaining. #72

parent 9fd502e5
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,7 @@ class EngineControlInterface:
Provides ability to control the engine in various ways.
"""
config = None
logger = None
logger = None
engine = None
event_dispatcher = None
sci = None
......@@ -51,10 +51,10 @@ class EngineControlInterface:
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
"""
self.engine = engine
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.logger = logging.getLogger("AuraEngine")
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
self.sci = SocketControlInterface.get_instance(event_dispatcher)
......@@ -74,13 +74,13 @@ class SocketControlInterface:
Note this server only allows a single connection at once. This
service is primarly utilized to store new playlogs.
"""
"""
PORT = 1337
ACTION_ON_METADATA = "on_metadata"
instance = None
config = None
logger = None
logger = None
server = None
event_dispatcher = None
......@@ -95,14 +95,14 @@ class SocketControlInterface:
"""
if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher
self.event_dispatcher = event_dispatcher
host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host))
thread.start()
thread.start()
@staticmethod
......@@ -119,7 +119,7 @@ class SocketControlInterface:
def run(self, logger, host):
"""
Starts the socket server
"""
"""
while True:
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......@@ -131,23 +131,23 @@ class SocketControlInterface:
time.sleep(wait_time)
logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
self.server.listen()
self.server.listen()
while True:
(conn, client) = self.server.accept()
while True:
r = SocketReader(conn)
p = HttpStream(r)
data = p.body_file().read()
data = p.body_file().read()
logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
try:
self.process(logger, json.loads(data))
conn.sendall(b'\n[ECI] processing done.\n')
except Exception as e:
logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)
conn.close()
conn.close()
break
......@@ -155,7 +155,7 @@ class SocketControlInterface:
"""
Process incoming actions.
"""
if "action" in data:
if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
meta_data = data["data"]
meta_data["duration"] = data["track_duration"]
......@@ -166,7 +166,7 @@ class SocketControlInterface:
logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else:
logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
def terminate(self):
......@@ -185,7 +185,8 @@ class EngineExecutor(Timer):
Primarily used for automations performed by the scheduler.
"""
_lock = None
logger = logging.getLogger("AuraEngine")
logger = logging.getLogger("AuraEngine")
initialized = None
timer_store = {}
parent_timer = None
child_timer = None
......@@ -208,23 +209,24 @@ class EngineExecutor(Timer):
func (function): The function to be called
param (object): Parameter passt to the function
"""
self.initialized = False
self._lock = Lock()
from src.engine import Engine
now_unix = Engine.engine_time()
now_unix = Engine.engine_time()
# Init parent-child relation
# Init parent-child relation
self.parent_timer = parent_timer
if self.parent_timer:
self.parent_timer.child_timer = self
# Init meta data
self.direct_exec = False
self.timer_type = timer_type
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time:
diff = 0
else:
else:
diff = due_time - now_unix
self.diff = diff
......@@ -235,15 +237,15 @@ class EngineExecutor(Timer):
is_stored = self.update_store()
if not is_stored:
self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
else:
else:
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
else:
self.exec_timed()
self.start()
......@@ -252,7 +254,12 @@ class EngineExecutor(Timer):
"""
Calls the passed function `func` when the timer is ready.
"""
self.join()
while self.initialized == False:
timer.sleep(0.001)
self.logger.info(SU.orange("Waiting until the EngineExecutor is done with initialization..."))
if not self.direct_exec: #TODO Evaluate if we should join for direct exec too
self.join()
func()
......@@ -276,7 +283,8 @@ class EngineExecutor(Timer):
self.wait_for_parent()
thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
thread.start()
self.initialized = True
def exec_timed(self):
"""
......@@ -285,11 +293,12 @@ class EngineExecutor(Timer):
Assigns the `timer_id` as the thread name.
"""
def wrapper_func(param=None):
self.wait_for_parent()
self.wait_for_parent()
if param: self.func(param,)
else: self.func()
super().__init__(self.diff, wrapper_func, (self.param,))
self._name = self.timer_id
self.initialized = True
def update_store(self):
......@@ -315,11 +324,11 @@ class EngineExecutor(Timer):
return False
# Still waiting for execution -> update
else:
else:
self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
existing_command.cancel()
existing_command.cancel()
if existing_command.child_timer:
self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
EngineExecutor.timer_store[self.timer_id] = self
self.logger.debug(f"Created command timer with ID: {self.timer_id}")
......@@ -351,7 +360,7 @@ class EngineExecutor(Timer):
del_keys = []
for timer in timers:
if timer.dt < datetime.now() - timedelta(hours=3):
if timer.dt < datetime.now() - timedelta(hours=3):
if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
del_keys.append(timer.timer_id)
......
......@@ -48,7 +48,7 @@ class AuraScheduler(threading.Thread):
config = None
logger = None
engine = None
exit_event = None
exit_event = None
timeslot_renderer = None
programme = None
message_timer = []
......@@ -70,12 +70,12 @@ class AuraScheduler(threading.Thread):
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.programme = ProgrammeService()
self.timeslot_renderer = TimeslotRenderer(self)
self.timeslot_renderer = TimeslotRenderer(self)
self.fallback = fallback_manager
self.engine = engine
self.engine.scheduler = self
self.is_soundsytem_init = False
# Scheduler Initialization
AuraDatabaseModel.init_database()
self.is_initialized = False
......@@ -91,8 +91,8 @@ class AuraScheduler(threading.Thread):
def run(self):
"""
Called when thread is started via `start()`. It does the following:
1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration.
1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration.
2. Loads the latest programme from the database and sets the instance state `self.programme` with current timeslots.
3. Queues all timeslots of the programme, if the soundssystem is ready to accept commands.
......@@ -103,19 +103,19 @@ class AuraScheduler(threading.Thread):
self.config.load_config()
seconds_to_wait = int(self.config.get("fetching_frequency"))
self.logger.info(SU.cyan(f"== start fetching new timeslots (every {seconds_to_wait} seconds) =="))
# Load some stuff from the API in any case
self.programme.refresh()
# Queue only when the engine is ready to play
if self.is_initialized == True:
if self.is_initialized == True:
self.queue_programme()
except Exception as e:
self.logger.critical(SU.red(f"Unhandled error while fetching & scheduling new programme! ({str(e)})"), e)
# Keep on working anyway
EngineExecutor.log_commands()
EngineExecutor.log_commands()
self.exit_event.wait(seconds_to_wait)
......@@ -137,12 +137,12 @@ class AuraScheduler(threading.Thread):
self.queue_startup_entries()
except NoActiveTimeslotException:
# That's not good, but keep on working...
pass
pass
def on_play(self, entry):
"""
Event Handler which is called by the engine when some entry is actually playing.
Event Handler which is called by the engine when some entry is actually playing.
Ignores entries which are part of a scheduled fallback, because they handle their
stuff by themselves.
......@@ -172,7 +172,7 @@ class AuraScheduler(threading.Thread):
def play_active_entry(self):
"""
Plays the entry scheduled for the very current moment and forwards to the scheduled position in time.
Plays the entry scheduled for the very current moment and forwards to the scheduled position in time.
Usually called when the Engine boots.
Raises:
......@@ -198,11 +198,11 @@ class AuraScheduler(threading.Thread):
now_unix = Engine.engine_time()
seconds_to_seek = now_unix - active_entry.start_unix
# If the seek exceeds the length of the current track,
# If the seek exceeds the length of the current track,
# there's no need to do anything - the scheduler takes care of the rest
if (seconds_to_seek + sleep_offset) > active_entry.duration:
self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.")
else:
else:
# Preload and play active entry
PlayCommand(self.engine, [active_entry])
......@@ -217,7 +217,7 @@ class AuraScheduler(threading.Thread):
self.logger.info("Sound-system seek response: " + response)
thread = threading.Thread(target = async_cue_seek, args = (seconds_to_seek,))
thread.start()
thread.start()
elif active_entry.get_content_type() in ResourceClass.STREAM.types \
or active_entry.get_content_type() in ResourceClass.LIVE.types:
......@@ -227,7 +227,7 @@ class AuraScheduler(threading.Thread):
else:
self.logger.critical("Unknown Entry Type: %s" % active_entry)
def get_active_playlist(self):
......@@ -247,7 +247,7 @@ class AuraScheduler(threading.Thread):
def queue_programme(self):
"""
Queues the current programme (playlists as per timeslot) by creating
timed commands to the sound-system to enable the individual tracks of playlists.
timed commands to the sound-system to enable the individual tracks of playlists.
"""
# Get a clean set of the timeslots within the scheduling window
......@@ -265,7 +265,7 @@ class AuraScheduler(threading.Thread):
playlist = self.programme.get_current_playlist(next_timeslot)
if playlist:
self.queue_playlist_entries(next_timeslot, playlist.entries, False, True)
self.logger.info(SU.green("Finished queuing programme."))
......@@ -276,7 +276,7 @@ class AuraScheduler(threading.Thread):
this method in any other scenario, as it doesn't respect the scheduling window.
"""
current_timeslot = self.programme.get_current_timeslot()
# Queue the (rest of the) currently playing timeslot upon startup
if current_timeslot:
current_playlist = self.programme.get_current_playlist(current_timeslot)
......@@ -295,8 +295,8 @@ class AuraScheduler(threading.Thread):
Creates sound-system player commands for all playlist items to be executed at the scheduled time.
Since each scheduled playlist can consist of multiple entry types such as *file*, *live*,
and *stream*, the play-out of the timeslot is actually a bit more complex. Before any playlist
entries of the timeslot can be turned into sound, they need to be grouped, queued and pre-loaded.
and *stream*, the play-out of the timeslot is actually a bit more complex. Before any playlist
entries of the timeslot can be turned into sound, they need to be aggregated, queued and pre-loaded.
1. First, all entries are aggregated when they hold filesystem entries.
Given you have a playlist with 10 entries, the first 4 are consisting of files, the next two
......@@ -318,7 +318,7 @@ class AuraScheduler(threading.Thread):
Returns:
(String): Formatted string to display playlist entries in log
"""
"""
entry_groups = []
entry_groups.append([])
previous_entry = None
......@@ -333,7 +333,7 @@ class AuraScheduler(threading.Thread):
(previous_entry != None and \
previous_entry.get_content_type() == entry.get_content_type() and \
entry.get_content_type() in ResourceClass.FILE.types):
entry_groups[index].append(entry)
else:
index += 1
......@@ -341,7 +341,7 @@ class AuraScheduler(threading.Thread):
entry_groups[index].append(entry)
previous_entry = entry
self.logger.info("Built %s entry group(s)" % len(entry_groups))
# Timeslot function calls
if len(entries) > 0 and len(entry_groups) > 0:
for entries in entry_groups:
......@@ -359,17 +359,17 @@ class AuraScheduler(threading.Thread):
"""
Ignore timeslots which are before the start of scheduling window (start of timeslot - `scheduling_window_start`)
or after the end of the scheduling window (end of timeslot -`scheduling_window_end`).
Before the scheduling window:
- Timeslots can still be deleted in Steering and the playout will respect this
During the scheduling window:
- Timeslots and it's playlists are queued as timed commands
After the scheduling window:
- Such timeslots are ignored, because it doesn't make sense anymore to schedule them before the next
timeslot starts
"""
if not timeslots:
return timeslots
......@@ -393,7 +393,7 @@ class AuraScheduler(threading.Thread):
self.logger.info("Shutting down scheduler ...")
self.programme.terminate()
self.exit_event.set()
......@@ -417,15 +417,16 @@ class TimeslotCommand(EngineExecutor):
Args:
engine (Engine): The engine
timeslot (Timeslot): The timeslot which is starting at this time
"""
self.config = AuraConfig()
"""
self.config = AuraConfig()
self.engine = engine
fade_out_time = float(self.config.get("fade_out_time"))
start_fade_out = timeslot.end_unix - fade_out_time
self.logger.info(f"Fading out timeslot in {start_fade_out} seconds at {timeslot.timeslot_end} | Timeslot: {timeslot}")
self.logger.info(f"Fading out timeslot in {start_fade_out} seconds at {timeslot.timeslot_end} | Timeslot: {timeslot}")
# Initialize the "fade in" EngineExecuter and instatiate a connected child EngineExecuter for "fade out" when the parent is ready
super().__init__("TIMESLOT", None, timeslot.start_unix, self.do_start_timeslot, timeslot)
EngineExecutor("TIMESLOT", self, start_fade_out, self.do_end_timeslot, timeslot)
self.on_ready(lambda: EngineExecutor("TIMESLOT", self, start_fade_out, self.do_end_timeslot, timeslot))
def do_start_timeslot(self, timeslot):
......@@ -439,7 +440,7 @@ class TimeslotCommand(EngineExecutor):
def do_end_timeslot(self, timeslot):
"""
Initiates the end of the timeslot.
"""
"""
self.logger.info(SU.cyan(f"=== on_timeslot_end('{timeslot}') ==="))
self.engine.event_dispatcher.on_timeslot_end(timeslot)
......@@ -448,12 +449,12 @@ class TimeslotCommand(EngineExecutor):
self.engine.player.stop(recent_entry, TransitionType.FADE)
else:
self.logger.warning(SU.red(f"Interestingly timeslot {timeslot} has no entry to be faded out?"))
class PlayCommand(EngineExecutor):
"""
Command for triggering start and end of timeslot events.
Command for triggering timed preloading and playing as a child command.
"""
engine = None
config = None
......@@ -471,14 +472,15 @@ class PlayCommand(EngineExecutor):
start_preload = entries[0].start_unix - self.config.get("preload_offset")
start_play = entries[0].start_unix
preload_timer = super().__init__("PLAY", None, start_preload, self.do_preload, entries)
# Initialize the "preload" EngineExecuter and attach a child `PlayCommand` to the "on_ready" event handler
preload_timer = super().__init__("PRELOAD", None, start_preload, self.do_preload, entries)
self.on_ready(lambda: EngineExecutor("PLAY", self, start_play, self.do_play, entries))
def do_preload(self, entries):
"""
Preload the entries.
"""
"""
try:
if entries[0].get_content_type() in ResourceClass.FILE.types:
self.logger.info(SU.cyan("=== preload_group('%s') ===" % ResourceUtil.get_entries_string(entries)))
......@@ -496,18 +498,16 @@ class PlayCommand(EngineExecutor):
def do_play(self, entries):
"""
Play the entries.
"""
"""
self.logger.info(SU.cyan("=== play('%s') ===" % ResourceUtil.get_entries_string(entries)))
if entries[-1].status != EntryPlayState.READY:
# Let 'em play anyway ...
# Let 'em play anyway ...
self.logger.critical(SU.red("PLAY: The entry/entries are not yet ready to be played (Entries: %s)" % ResourceUtil.get_entries_string(entries)))
while (entries[-1].status != EntryPlayState.READY):
while (entries[-1].status != EntryPlayState.READY):
self.logger.info("PLAY: Wait a little until preloading is done ...")
time.sleep(2)
self.engine.player.play(entries[0], TransitionType.FADE)
self.engine.player.play(entries[0], TransitionType.FADE)
self.logger.info(self.engine.scheduler.timeslot_renderer.get_ascii_timeslots())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment