From 122f6538897eabf8692e02c11f99f9c92282784e Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Wed, 9 Feb 2022 13:41:23 +0100 Subject: [PATCH 01/12] Style(format): log messages --- src/scheduling/scheduler.py | 70 ++++++++++--------------------------- 1 file changed, 18 insertions(+), 52 deletions(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 3be93a5..87a052f 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -95,11 +95,8 @@ class AuraScheduler(threading.Thread): try: self.config.load_config() seconds_to_wait = int(self.config.get("fetching_frequency")) - self.logger.info( - SU.cyan( - f"== start fetching new timeslots (every {seconds_to_wait} seconds) ==" - ) - ) + msg = f"== start fetching new timeslots (every {seconds_to_wait} seconds) ==" + self.logger.info(SU.cyan(msg)) # Load some stuff from the API in any case self.programme.refresh() @@ -109,11 +106,8 @@ class AuraScheduler(threading.Thread): self.queue_programme() except Exception as e: - self.logger.critical( - SU.red( - f"Unhandled error while fetching & scheduling new programme! ({str(e)})" - ) - ) + msg = f"Unhandled error while fetching & scheduling new programme! ({str(e)})" + self.logger.critical(SU.red(msg)) # Keep on working anyway EngineExecutor.log_commands() @@ -479,11 +473,8 @@ class TimeslotCommand(EngineExecutor): if recent_entry: self.engine.player.stop(recent_entry, TransitionType.FADE) else: - self.logger.warning( - SU.red( - f"Interestingly timeslot {timeslot} has no entry to be faded out?" - ) - ) + msg = f"Interestingly timeslot {timeslot} has no entry to be faded out?" + self.logger.warning(SU.red(msg)) class PlayCommand(EngineExecutor): @@ -508,9 +499,8 @@ class PlayCommand(EngineExecutor): preload_offset = self.config.get("preload_offset") start_preload = entries[0].start_unix - preload_offset start_play = entries[0].start_unix - self.logger.debug( - f"Preloading entries at {SU.fmt_time(start_preload)}, {preload_offset} seconds before playing it at {SU.fmt_time(start_play)}" - ) + msg = f"Preloading entries at {SU.fmt_time(start_preload)}, {preload_offset} seconds before playing it at {SU.fmt_time(start_play)}" + self.logger.debug(msg) # Initialize the "preload" EngineExecuter and attach a child `PlayCommand` to the "on_ready" event handler super().__init__("PRELOAD", None, start_preload, self.do_preload, entries) EngineExecutor("PLAY", self, start_play, self.do_play, entries) @@ -519,55 +509,31 @@ class PlayCommand(EngineExecutor): """ Preload the entries. """ + entries_str = ResourceUtil.get_entries_string(entries) try: if entries[0].get_content_type() in ResourceClass.FILE.types: - self.logger.info( - SU.cyan( - "=== preload_group('%s') ===" - % ResourceUtil.get_entries_string(entries) - ) - ) + self.logger.info(SU.cyan(f"=== preload_group('{entries_str}') ===")) self.engine.player.preload_group(entries, ChannelType.QUEUE) else: - self.logger.info( - SU.cyan( - "=== preload('%s') ===" - % ResourceUtil.get_entries_string(entries) - ) - ) + self.logger.info(SU.cyan(f"=== preload('{entries_str}') ===")) self.engine.player.preload(entries[0]) except LoadSourceException as e: - self.logger.critical( - SU.red( - "Could not preload entries %s" - % ResourceUtil.get_entries_string(entries) - ), - e, - ) + self.logger.critical(SU.red(f"Could not preload entries {entries_str}"), e) if entries[-1].status != EntryPlayState.READY: - self.logger.critical( - SU.red( - "Entries didn't reach 'ready' state during preloading (Entries: %s)" - % ResourceUtil.get_entries_string(entries) - ) - ) + msg = f"Entries didn't reach 'ready' state during preloading (Entries: {entries_str})" + self.logger.critical(SU.red(msg)) def do_play(self, entries): """ Play the entries. """ - self.logger.info( - SU.cyan("=== play('%s') ===" % ResourceUtil.get_entries_string(entries)) - ) + entries_str = ResourceUtil.get_entries_string(entries) + self.logger.info(SU.cyan(f"=== play('{entries_str}') ===")) if entries[-1].status != EntryPlayState.READY: # Let 'em play anyway ... - self.logger.critical( - SU.red( - "PLAY: The entry/entries are not yet ready to be played (Entries: %s)" - % ResourceUtil.get_entries_string(entries) - ) - ) + msg = f"PLAY: The entry/entries are not yet ready to be played (Entries: {entries_str})" + self.logger.critical(SU.red(msg)) while entries[-1].status != EntryPlayState.READY: self.logger.info("PLAY: Wait a little bit until preloading is done ...") time.sleep(2) -- GitLab From 6079153933e865e908762131d0688215089b24a4 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Wed, 9 Feb 2022 19:13:26 +0100 Subject: [PATCH 02/12] Feat(channel): Resolve by string --- src/channels.py | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/channels.py b/src/channels.py index ce3c44b..d1b5dd1 100644 --- a/src/channels.py +++ b/src/channels.py @@ -62,10 +62,48 @@ class ChannelResolver: Helpers for resolving channel enumerations. """ + @staticmethod + def channel_by_string(channel: str): + """ + Returns the channel enum for a given channel string from Liquidsoap. + """ + if not channel: + return None + + if channel == Channel.QUEUE_A.value: + return Channel.QUEUE_A + elif channel == Channel.QUEUE_B.value: + return Channel.QUEUE_B + elif channel == Channel.HTTP_A.value: + return Channel.HTTP_A + elif channel == Channel.HTTP_B.value: + return Channel.HTTP_B + elif channel == Channel.LIVE_0.value: + return Channel.LIVE_0 + elif channel == Channel.LIVE_1.value: + return Channel.LIVE_1 + elif channel == Channel.LIVE_2.value: + return Channel.LIVE_2 + elif channel == Channel.LIVE_3.value: + return Channel.LIVE_3 + elif channel == Channel.LIVE_4.value: + return Channel.LIVE_4 + elif channel == Channel.FALLBACK_QUEUE_A.value: + return Channel.FALLBACK_QUEUE_A + elif channel == Channel.FALLBACK_QUEUE_B.value: + return Channel.FALLBACK_QUEUE_B + elif channel == Channel.FALLBACK_FOLDER.value: + return Channel.FALLBACK_FOLDER + elif channel == Channel.FALLBACK_PLAYLIST.value: + return Channel.FALLBACK_PLAYLIST + else: + return None + + @staticmethod def live_channel_for_resource(channel: str): """ - Returns the channel enum for a given channel string. + Returns the channel enum for a given live channel string from Tank. """ if not channel: return None @@ -85,6 +123,7 @@ class ChannelResolver: return None + class ChannelType(Enum): """ Engine channel types mapped to `Entry` source types. -- GitLab From 98469c1b06edede5a356cad1d247d49dd315e075 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Wed, 9 Feb 2022 19:14:43 +0100 Subject: [PATCH 03/12] Feat(metadata): Annotate Liquidsoap URIs --- src/resources.py | 63 +++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/src/resources.py b/src/resources.py index fdb4431..be1794a 100644 --- a/src/resources.py +++ b/src/resources.py @@ -20,11 +20,11 @@ from enum import Enum - class ResourceType(Enum): """ Media content types. """ + FILE = "file:" STREAM_HTTP = "http:" STREAM_HTTPS = "https:" @@ -37,25 +37,18 @@ class ResourceClass(Enum): """ Media content classes. """ - FILE = { - "id": "fs", - "numeric": 0, - "types": [ResourceType.FILE] - } + + FILE = {"id": "fs", "numeric": 0, "types": [ResourceType.FILE]} STREAM = { "id": "fs", "numeric": 0, - "types": [ResourceType.STREAM_HTTP, ResourceType.STREAM_HTTPS] - } - LIVE = { - "id": "http", - "numeric": 1, - "types": [ResourceType.LINE] + "types": [ResourceType.STREAM_HTTP, ResourceType.STREAM_HTTPS], } + LIVE = {"id": "http", "numeric": 1, "types": [ResourceType.LINE]} PLAYLIST = { "id": "https", "numeric": 2, - "types": [ResourceType.PLAYLIST, ResourceType.POOL] + "types": [ResourceType.PLAYLIST, ResourceType.POOL], } @property @@ -70,13 +63,11 @@ class ResourceClass(Enum): return str(self.value["id"]) - class ResourceUtil(Enum): """ Utilities for different resource types. """ - @staticmethod def get_content_type(uri): """ @@ -99,7 +90,6 @@ class ResourceUtil(Enum): if uri.startswith(ResourceType.LINE.value): return ResourceType.LINE - @staticmethod def get_content_class(content_type): """ @@ -120,7 +110,6 @@ class ResourceUtil(Enum): if content_type in ResourceClass.PLAYLIST.types: return ResourceClass.PLAYLIST - @staticmethod def generate_m3u_file(target_file, audio_store_path, entries, entry_extension): """ @@ -133,18 +122,21 @@ class ResourceUtil(Enum): entry_extension (String): The file extension of the playlist entries """ file = open(target_file, "w") - fb = [ "#EXTM3U" ] + fb = ["#EXTM3U"] for entry in entries: if ResourceUtil.get_content_type(entry.source) == ResourceType.FILE: - path = ResourceUtil.source_to_filepath(audio_store_path, entry.source, entry_extension) - fb.append(f"#EXTINF:{entry.duration},{entry.meta_data.artist} - {entry.meta_data.title}") + path = ResourceUtil.source_to_filepath( + audio_store_path, entry.source, entry_extension + ) + fb.append( + f"#EXTINF:{entry.duration},{entry.meta_data.artist} - {entry.meta_data.title}" + ) fb.append(path) file.writelines(fb) file.close() - @staticmethod def source_to_filepath(base_dir, source, source_extension): """ @@ -169,7 +161,6 @@ class ResourceUtil(Enum): else: return base_dir + "/" + path + source_extension - @staticmethod def get_entries_string(entries): """ @@ -179,24 +170,36 @@ class ResourceUtil(Enum): if isinstance(entries, list): for entry in entries: s += str(entry) - if entry != entries[-1]: s += ", " + if entry != entries[-1]: + s += ", " else: s = str(entries) return s - @staticmethod - def lqs_annotate_cuein(uri, cue_in): + def lqs_annotate(uri, annotations): """ - Wraps the given URI with a Liquidsoap Cue In annotation. + Wraps the given URI with the passed annotation dictionary. Args: - uri (String): The path to the audio source - cue_in (Float): The value in seconds where the cue in should start + uri (String): The path to the audio source + annotations (dict): The value in seconds where the cue in should start Returns: (String): The annotated URI """ - if cue_in > 0.0: - uri = "annotate:liq_cue_in=\"%s\":%s" % (str(cue_in), uri) + metadata = "" + for k, v in annotations.items(): + metadata += f"{k}={v}," + uri = f"annotate:{metadata[:-1]}:{uri}" return uri + + @staticmethod + def generate_metadata(entry): + """ + Generates Liquidsoap meta-data based on an entry. + + Args: + entry (Entry): Playlist entry + """ + return {"timeslot_id": entry.playlist.timeslot.timeslot_id} -- GitLab From cad64698433451d68a4816d0b5a6bb13b4517775 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Thu, 10 Feb 2022 10:57:01 +0100 Subject: [PATCH 04/12] Doc(event): Improve description --- src/plugins/trackservice.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/plugins/trackservice.py b/src/plugins/trackservice.py index 27caaaf..e969359 100644 --- a/src/plugins/trackservice.py +++ b/src/plugins/trackservice.py @@ -77,7 +77,14 @@ class TrackServiceHandler: def on_play(self, entry): """ - Some `PlaylistEntry` started playing. This is likely only a LIVE or STREAM entry. + Event Handler which is called by the engine when some play command to Liquidsoap is issued. + This does not indicate that Liquidsoap started playing actually, only that the command has + been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead. + + This event is not issued when media is played by Liquidsoap in fallback scenarios. + + Args: + entry (PlaylistEntry): """ content_class = ResourceUtil.get_content_class(entry.get_content_type()) if content_class == ResourceClass.FILE: -- GitLab From fd91cc7f752df33580920b02016548e9c74528ed Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Thu, 10 Feb 2022 12:38:11 +0100 Subject: [PATCH 05/12] Refact(timeslot): End via latest, used channel --- src/engine.py | 116 +++++++++++++++++++----------------- src/events.py | 19 +++--- src/scheduling/models.py | 1 + src/scheduling/scheduler.py | 22 +++---- 4 files changed, 87 insertions(+), 71 deletions(-) diff --git a/src/engine.py b/src/engine.py index 2d70aa6..670d4bd 100644 --- a/src/engine.py +++ b/src/engine.py @@ -250,7 +250,8 @@ class Player: # QUEUE if entry.get_content_type() in ResourceClass.FILE.types: - is_ready = self.queue_push(entry.channel, entry.source) + metadata = ResourceUtil.generate_metadata(entry) + is_ready = self.queue_push(entry.channel, entry.source, metadata) # STREAM elif entry.get_content_type() in ResourceClass.STREAM.types: @@ -293,8 +294,8 @@ class Player: # Choose and save the input channel entry.previous_channel, entry.channel = channels - - if self.queue_push(entry.channel, entry.source) == True: + metadata = ResourceUtil.generate_metadata(entry) + if self.queue_push(entry.channel, entry.source, metadata) == True: entry.status = EntryPlayState.READY self.event_dispatcher.on_queue(entries) @@ -356,36 +357,28 @@ class Player: self.event_dispatcher.on_play(entry) - def stop(self, entry, transition): + def stop(self, channel: Channel, transition: TransitionType): """ - Stops the currently playing entry. + Stops the currently playing channel. Args: - entry (Entry): The entry to stop playing - transition (TransitionType): The type of transition to use e.g. fade-out. + channel (Channel): The channel to stop playing or fade-out + transition (TransitionType): The type of transition to use e.g. fade-out """ with suppress(LQConnectionError): self.connector.enable_transaction() - - if not entry.channel: - self.logger.warn( - SU.red( - "Trying to stop entry %s, but it has no channel assigned" - % entry - ) - ) + if not channel: + self.logger.warn(SU.red(f"Cannot stop, no channel passed")) return if transition == TransitionType.FADE: - self.mixer.fade_out(entry.channel) + self.mixer.fade_out(channel) else: - self.mixer.channel_volume(entry.channel, 0) + self.mixer.channel_volume(channel, 0) - self.logger.info( - SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry)) - ) + self.logger.info(SU.pink(f"Stopped channel '{channel}' with {transition}")) self.connector.disable_transaction() - self.event_dispatcher.on_stop(entry) + self.event_dispatcher.on_stop(channel) # def start_fallback_playlist(self, entries): # """ @@ -534,13 +527,14 @@ class Player: # Channel Type - Queue # - def queue_push(self, channel, source): + def queue_push(self, channel, source, metadata): """ Adds an filesystem URI to the given `ChannelType.QUEUE` channel. Args: channel (Channel): The channel to push the file to - source (String): The URI of the file + source (String): The URI of the file + metadata (dict): additional meta data to be wrapped with the URI Returns: (Boolean): `True` if successful @@ -556,6 +550,8 @@ class Player: extension = self.config.get("audio_source_extension") filepath = ResourceUtil.source_to_filepath(audio_store, source, extension) self.logger.info(SU.pink(f"{channel}.queue_push('{filepath}')")) + if metadata: + filepath = ResourceUtil.lqs_annotate(filepath, metadata) result = self.connector.send_lqc_command(channel, "queue_push", filepath) self.logger.info("%s.queue_push result: %s" % (channel, result)) self.connector.disable_transaction() @@ -714,8 +710,7 @@ class PlayoutState: # retrieved from Liquidsoap in case Engine didn't do a fresh boot, but a # restart after a crash (See #77). self.state = { - "current": None, - "previous": None, + "playout_type": None, "timeslot": None, } @@ -728,29 +723,44 @@ class PlayoutState: Some new timeslot has just started. """ self.state["timeslot"] = timeslot + self.logger.info(SU.green(f"Starting timeslot {timeslot}")) def on_timeslot_end(self, timeslot): """ - The timeslot has ended and the state is updated. The method ensures that any intermediate state - update doesn't get overwritten. + The timeslot has ended and the state is updated. The method ensures that any + intermediate state update doesn't get overwritten. """ + self.logger.info(SU.green(f"Ended timeslot {timeslot}")) if self.state["timeslot"] == timeslot: self.state["timeslot"] = None def on_play(self, entry): """ - Event Handler which is called by the engine when some entry is actually playing. + Event Handler which is called by the engine when some play command to Liquidsoap is issued. + This does not indicate that Liquidsoap started playing actually, only that the command has + been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead. + + This event is not issued when media is played by Liquidsoap in fallback scenarios. Args: - source (String): The `PlaylistEntry` object + entry (PlaylistEntry): """ + + # Store latest, active channel to the timeslot. Used for fade-out at end of timeslot. + timeslot = entry.playlist.timeslot + timeslot.latest_channel = entry.channel + msg = f"Store channel '{entry.channel}' to timeslot #{timeslot.timeslot_id}" + self.logger.info(SU.pink(msg)) + + # Store playout-state change content_class = ResourceUtil.get_content_class(entry.get_content_type()) if content_class == ResourceClass.FILE: # Files are handled by "on_metadata" called via Liquidsoap return - state = self.type_for_channel(entry.channel) + # Only update upon first state change of same type - if state != self.current(): + state = self.type_for_channel(entry.channel) + if state != self.state.get("playout_type"): self.update_playout_state(entry.channel) def on_metadata(self, data): @@ -759,37 +769,28 @@ class PlayoutState: This does not include live or stream sources, since they ain't have metadata and are triggered from engine core (see `on_play(..)`). + This event is called for both, files played in NORMAL and FALLBACK (silence detector) state. + Args: - data (dict): A collection of metadata related to the current track + data (dict): A collection of metadata related to the current track passed by Liquidsoap """ - channel = data.get("source") + channel_str = data.get("source") + channel = ChannelResolver.channel_by_string(channel_str) state = self.type_for_channel(channel) - # Only update upon first state change of same type - if state != self.current(): - state = self.update_playout_state(channel) - # If we turn into a fallback state we issue an event - if state is not PlayoutState.types.DEFAULT: - timeslot = self.state["timeslot"] - if timeslot: - DB.session.merge(timeslot) - self.event_dispatcher.on_fallback_active(timeslot, state) + # Only update playout state upon first state change of same type + if state != self.state.get("playout_type"): + self.update_playout_state(channel) # # METHODS # - def current(self): - """ - Returns the current playout state, like default or fallback. - """ - return self.state.get("current") - def is_default(self): """ Returns the `True` in case Engine is in default state. """ - return self.state.get("current") == PlayoutState.types.DEFAULT + return self.state.get("playout_type") == PlayoutState.types.DEFAULT def update_playout_state(self, channel): """ @@ -799,16 +800,23 @@ class PlayoutState: (PlayoutState): The current playout state """ playout_state = self.type_for_channel(channel) - self.state["previous"] = self.state["current"] - self.state["current"] = playout_state + self.state["playout_type"] = playout_state self.logger.info(SU.green(f"Play-out mode changed: {playout_state}")) + + # If we turn into a fallback state we issue an event + if playout_state is not PlayoutState.types.DEFAULT: + timeslot = self.state["timeslot"] + # if timeslot: + # DB.session.merge(timeslot) + self.event_dispatcher.on_fallback_active(timeslot, playout_state) + return playout_state - def type_for_channel(self, source): + def type_for_channel(self, channel): """ - Retrieves the matching fallback type for the given source. + Retrieves the matching play-out type for the given channel (default vs. fallback). """ - if source in [str(i) for i in PlayoutState.types.FALLBACK.channels]: + if str(channel) in [str(i) for i in PlayoutState.types.FALLBACK.channels]: return PlayoutState.types.FALLBACK return PlayoutState.types.DEFAULT diff --git a/src/events.py b/src/events.py index 3bf6c7f..48e7139 100644 --- a/src/events.py +++ b/src/events.py @@ -211,12 +211,15 @@ class EngineEventDispatcher: This does not indicate that Liquidsoap started playing actually, only that the command has been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead. + This event is not issued when media is played by Liquidsoap in fallback scenarios. + Args: - source (String): The `PlaylistEntry` object + entry (PlaylistEntry): """ def func(self, entry): self.logger.debug("on_play(..)") + self.engine.playout_state.on_play(entry) # Assign timestamp indicating start play time. Use the actual playtime when possible. entry.entry_start_actual = datetime.datetime.now() self.scheduler.on_play(entry) @@ -231,8 +234,10 @@ class EngineEventDispatcher: This does not include live or stream sources, since they ain't have metadata and are triggered from engine core (see `on_play(..)`). + This event is called for both, files played in NORMAL and FALLBACK (silence detector) state. + Args: - data (dict): A collection of metadata related to the current track + data (dict): A collection of metadata related to the current track passed by Liquidsoap """ def func(self, data): @@ -243,16 +248,16 @@ class EngineEventDispatcher: thread = Thread(target=func, args=(self, data)) thread.start() - def on_stop(self, entry): + def on_stop(self, channel): """ - The entry on the assigned channel has been stopped playing. + The passed channel has stopped playing. """ - def func(self, entry): + def func(self, channel): self.logger.debug("on_stop(..)") - self.call_event("on_stop", entry) + self.call_event("on_stop", channel) - thread = Thread(target=func, args=(self, entry)) + thread = Thread(target=func, args=(self, channel)) thread.start() def on_fallback_active(self, timeslot, fallback_type): diff --git a/src/scheduling/models.py b/src/scheduling/models.py index 17b5a5d..b865504 100644 --- a/src/scheduling/models.py +++ b/src/scheduling/models.py @@ -254,6 +254,7 @@ class Timeslot(DB.Model, AuraDatabaseModel): # Transients active_entry = None + latest_channel = None @staticmethod def for_datetime(date_time): diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index 87a052f..ec2f87d 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -133,12 +133,14 @@ class AuraScheduler(threading.Thread): def on_play(self, entry): """ - Event Handler which is called by the engine when some entry is actually playing. - Ignores entries which are part of a scheduled fallback, because they handle their - stuff by themselves. + Event Handler which is called by the engine when some play command to Liquidsoap is issued. + This does not indicate that Liquidsoap started playing actually, only that the command has + been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead. + + This event is not issued when media is played by Liquidsoap in fallback scenarios. Args: - source (String): The `PlaylistEntry` object + entry (PlaylistEntry): """ if entry.channel in ChannelType.FALLBACK_QUEUE.channels: return @@ -468,13 +470,13 @@ class TimeslotCommand(EngineExecutor): """ self.logger.info(SU.cyan(f"=== on_timeslot_end('{timeslot}') ===")) self.engine.event_dispatcher.on_timeslot_end(timeslot) - - recent_entry = timeslot.get_recent_entry() - if recent_entry: - self.engine.player.stop(recent_entry, TransitionType.FADE) + channel = timeslot.latest_channel + if channel: + self.logger.info(f"Stopping channel {channel}") + self.engine.player.stop(channel, TransitionType.FADE) else: - msg = f"Interestingly timeslot {timeslot} has no entry to be faded out?" - self.logger.warning(SU.red(msg)) + msg = f"There is no channel to stop for timeslot #{timeslot.timeslot_id}" + self.logger.error(SU.red(msg)) class PlayCommand(EngineExecutor): -- GitLab From 4fe12f49eaa359ecc3ac766a07ad2d537583df3e Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Thu, 10 Feb 2022 13:45:01 +0100 Subject: [PATCH 06/12] Refactor(queue): Clean on stop --- src/channels.py | 13 +++++++++++-- src/engine.py | 36 ++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/channels.py b/src/channels.py index d1b5dd1..e039878 100644 --- a/src/channels.py +++ b/src/channels.py @@ -99,7 +99,6 @@ class ChannelResolver: else: return None - @staticmethod def live_channel_for_resource(channel: str): """ @@ -123,7 +122,6 @@ class ChannelResolver: return None - class ChannelType(Enum): """ Engine channel types mapped to `Entry` source types. @@ -252,6 +250,17 @@ class ChannelRouter: else: return None + def is_any_queue(self, channel): + """ + Evaluates if the channel is of any queue type. + """ + if ( + self.type_of_channel(channel) == ChannelType.QUEUE + or self.type_of_channel(channel) == ChannelType.FALLBACK_QUEUE + ): + return True + return False + def type_for_resource(self, resource_type): """ Retrieves a `ChannelType` for the given `ResourceType`. diff --git a/src/engine.py b/src/engine.py index 670d4bd..cb6208e 100644 --- a/src/engine.py +++ b/src/engine.py @@ -335,26 +335,6 @@ class Player: # Update active channel for the current channel type self.channel_router.set_active(channel_type, entry.channel) - - # Dear filesystem channels, please leave the room as you would like to find it! - if ( - entry.previous_channel - and entry.previous_channel in ChannelType.QUEUE.channels - and entry.previous_channel in ChannelType.FALLBACK_QUEUE.channels - ): - - def clean_up(): - # Wait a little, if there is some long fade-out. Note, this also means, - # this channel should not be used for at least some seconds (including clearing time). - time.sleep(2) - self.connector.enable_transaction() - mixer.channel_activate(entry.previous_channel.value, False) - res = self.queue_clear(entry.previous_channel) - self.logger.info("Clear Queue Response: " + res) - self.connector.disable_transaction() - - Thread(target=clean_up).start() - self.event_dispatcher.on_play(entry) def stop(self, channel: Channel, transition: TransitionType): @@ -380,6 +360,22 @@ class Player: self.connector.disable_transaction() self.event_dispatcher.on_stop(channel) + # Dear filesystem channels, please leave the room as you would like to find it! + if self.channel_router.is_any_queue(channel): + + def clean_up(): + # Wait a little, if there is some long fade-out. Note, this also + # means, this channel should not be used for at least some seconds + # (including clearing time). + time.sleep(2) + self.connector.enable_transaction() + self.mixer.channel_activate(channel.value, False) + res = self.queue_clear(channel) + self.logger.info("Clear Queue Response: " + res) + self.connector.disable_transaction() + + Thread(target=clean_up).start() + # def start_fallback_playlist(self, entries): # """ # Sets any scheduled fallback playlist and performs a fade-in. -- GitLab From 5d6eca595242c66148e61aa5cc2a6b3ac1fd17ad Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Thu, 10 Feb 2022 15:31:16 +0100 Subject: [PATCH 07/12] Fix(doc): Typo --- src/channels.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/channels.py b/src/channels.py index e039878..db47f32 100644 --- a/src/channels.py +++ b/src/channels.py @@ -272,7 +272,7 @@ class ChannelRouter: def channel_swap(self, channel_type): """ - Returns the currently inactive channel for a given type. For example if the currently some + Returns the currently inactive channel for a given type. For example if currently some file on channel QUEUE A is playing, the channel QUEUE B is returned for being used to queue new entries. -- GitLab From a50bbc232e51fa5bed74f8fef45e9cda054e9724 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Fri, 11 Feb 2022 17:10:34 +0100 Subject: [PATCH 08/12] Debug(timeslot): Add log --- src/scheduling/programme.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scheduling/programme.py b/src/scheduling/programme.py index c6f8580..32614a9 100644 --- a/src/scheduling/programme.py +++ b/src/scheduling/programme.py @@ -394,6 +394,7 @@ class ProgrammeStore(): if "station_fallback_id" in timeslot: timeslot_db.station_fallback_id = timeslot["station_fallback_id"] + self.logger.debug(SU.pink(f"Store/Update TIMESLOT havetoadd={havetoadd} - data: "+str(timeslot))) timeslot_db.store(add=havetoadd, commit=True) return timeslot_db -- GitLab From a54ce9b83f5f72c76555043774abe43d1e3cc0be Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Fri, 11 Feb 2022 19:06:15 +0100 Subject: [PATCH 09/12] Refact(log): Add more info, remove obsolete code --- src/scheduling/scheduler.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/scheduling/scheduler.py b/src/scheduling/scheduler.py index ec2f87d..0fae8ba 100644 --- a/src/scheduling/scheduler.py +++ b/src/scheduling/scheduler.py @@ -145,9 +145,8 @@ class AuraScheduler(threading.Thread): if entry.channel in ChannelType.FALLBACK_QUEUE.channels: return - current_timeslot = self.programme.get_current_timeslot() - if current_timeslot: - current_timeslot.set_active_entry(entry) + # Nothing to do atm + # # METHODS @@ -446,11 +445,12 @@ class TimeslotCommand(EngineExecutor): f"Fading out timeslot in {start_fade_out} seconds at {SU.fmt_time(timeslot.end_unix - fade_out_time)} | Timeslot: {timeslot}" ) # Initialize the "fade in" EngineExecuter and instantiate a connected child EngineExecuter for "fade out" when the parent is ready + timeslot_id = timeslot.timeslot_id super().__init__( - "TIMESLOT", None, timeslot.start_unix, self.do_start_timeslot, timeslot + f"TIMESLOT#{timeslot_id}", None, timeslot.start_unix, self.do_start_timeslot, timeslot ) EngineExecutor( - "TIMESLOT", + f"TIMESLOT#{timeslot_id}", self, timeslot.end_unix - fade_out_time, self.do_end_timeslot, @@ -502,10 +502,11 @@ class PlayCommand(EngineExecutor): start_preload = entries[0].start_unix - preload_offset start_play = entries[0].start_unix msg = f"Preloading entries at {SU.fmt_time(start_preload)}, {preload_offset} seconds before playing it at {SU.fmt_time(start_play)}" - self.logger.debug(msg) + self.logger.info(msg) # Initialize the "preload" EngineExecuter and attach a child `PlayCommand` to the "on_ready" event handler - super().__init__("PRELOAD", None, start_preload, self.do_preload, entries) - EngineExecutor("PLAY", self, start_play, self.do_play, entries) + timeslot_id = entries[0].playlist.timeslot.timeslot_id + super().__init__(f"PRELOAD#{timeslot_id}", None, start_preload, self.do_preload, entries) + EngineExecutor(f"PLAY#{timeslot_id}", self, start_play, self.do_play, entries) def do_preload(self, entries): """ -- GitLab From 21811e57b8ecfc6bece2afefc12d60839d6e4ec1 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Fri, 11 Feb 2022 19:13:03 +0100 Subject: [PATCH 10/12] Fix(connection): Workaround for transaction issue This solves issues with concurrent Liquidsoap operations. This temporarly solves: - engine-core#5 This relates to: - engine#65 - engine#76 --- src/client/connector.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/client/connector.py b/src/client/connector.py index 0662aa6..b8387ff 100644 --- a/src/client/connector.py +++ b/src/client/connector.py @@ -39,7 +39,7 @@ class PlayerConnector(): connection_attempts = 0 disable_logging = False event_dispatcher = None - + has_connection = None def __init__(self, event_dispatcher): @@ -53,7 +53,7 @@ class PlayerConnector(): self.logger = logging.getLogger("AuraEngine") self.client = LiquidSoapPlayerClient(self.config, "engine.sock") self.event_dispatcher = event_dispatcher - + self.has_connection = False def send_lqc_command(self, namespace, command, *args): @@ -196,13 +196,17 @@ class PlayerConnector(): # ------------------------------------------------------------------------------------------ # def __open_conn(self, socket): # already connected - if self.transaction > 1: + # if self.transaction > 1: + # return + + if self.has_connection: return self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value) # try to connect socket.connect() + self.has_connection = True # ------------------------------------------------------------------------------------------ # def __close_conn(self, socket): @@ -215,7 +219,7 @@ class PlayerConnector(): return # say bye - socket.byebye() + # socket.byebye() # debug msg self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value) -- GitLab From 549a54bc4941546f4dadb0cd55ca886e6c580002 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Fri, 11 Feb 2022 19:14:21 +0100 Subject: [PATCH 11/12] Refactor(channel): clear esp with multi timeslots --- src/engine.py | 111 +++++++++++++++++++------------------------------- 1 file changed, 42 insertions(+), 69 deletions(-) diff --git a/src/engine.py b/src/engine.py index cb6208e..392b898 100644 --- a/src/engine.py +++ b/src/engine.py @@ -321,20 +321,24 @@ class Player: channel_type = self.channel_router.type_of_channel(entry.channel) mixer = self.mixer - # if channel_type == ChannelType.FALLBACK_QUEUE: - # mixer = self.mixer_fallback # Instant activation or fade-in self.connector.enable_transaction() if transition == TransitionType.FADE: mixer.channel_select(entry.channel.value, True) mixer.fade_in(entry.channel, entry.volume) + msg = f"Select channel '{entry.channel}' with {transition}" + self.logger.info(SU.pink(msg)) else: mixer.channel_activate(entry.channel.value, True) + msg = f"Activate channel '{entry.channel}'" + self.logger.info(SU.pink(msg)) self.connector.disable_transaction() # Update active channel for the current channel type self.channel_router.set_active(channel_type, entry.channel) + prev_channel = entry.previous_channel + self.queue_clear(prev_channel) self.event_dispatcher.on_play(entry) def stop(self, channel: Channel, transition: TransitionType): @@ -356,60 +360,10 @@ class Player: else: self.mixer.channel_volume(channel, 0) - self.logger.info(SU.pink(f"Stopped channel '{channel}' with {transition}")) + self.logger.info(SU.pink(f"Mute channel '{channel}' with {transition}")) self.connector.disable_transaction() self.event_dispatcher.on_stop(channel) - # Dear filesystem channels, please leave the room as you would like to find it! - if self.channel_router.is_any_queue(channel): - - def clean_up(): - # Wait a little, if there is some long fade-out. Note, this also - # means, this channel should not be used for at least some seconds - # (including clearing time). - time.sleep(2) - self.connector.enable_transaction() - self.mixer.channel_activate(channel.value, False) - res = self.queue_clear(channel) - self.logger.info("Clear Queue Response: " + res) - self.connector.disable_transaction() - - Thread(target=clean_up).start() - - # def start_fallback_playlist(self, entries): - # """ - # Sets any scheduled fallback playlist and performs a fade-in. - - # Args: - # entries ([Entry]): The playlist entries - # """ - # self.preload_group(entries, ChannelType.FALLBACK_QUEUE) - # self.play(entries[0], TransitionType.FADE) - # self.event_dispatcher.on_fallback_updated(entries) - - # def stop_fallback_playlist(self): - # """ - # Performs a fade-out and clears any scheduled fallback playlist. - # """ - # dirty_channel = self.channel_router.get_active(ChannelType.FALLBACK_QUEUE) - - # self.logger.info(f"Fading out channel '{dirty_channel}'") - # self.connector.enable_transaction() - # self.mixer_fallback.fade_out(dirty_channel) - # self.connector.disable_transaction() - - # def clean_up(): - # # Wait a little, if there is some long fade-out. Note, this also means, - # # this channel should not be used for at least some seconds (including clearing time). - # time.sleep(2) - # self.connector.enable_transaction() - # self.mixer_fallback.channel_activate(dirty_channel.value, False) - # res = self.queue_clear(dirty_channel) - # self.logger.info("Clear Fallback Queue Response: " + res) - # self.connector.disable_transaction() - # self.event_dispatcher.on_fallback_cleaned(dirty_channel) - # Thread(target=clean_up).start() - # # Channel Type - Stream # @@ -588,30 +542,49 @@ class Player: return result - def queue_clear(self, channel): + def queue_clear(self, channel: Channel): """ - Removes all tracks currently queued in the given `ChannelType.QUEUE` channel. + Dear queue channels, please leave the room as you + would like to find it: Every queue needs to be empty, + before it can receive new items. Otherwise old times + continue to play, when such 'dirty' channel is selected. - Args: - channel (Channel): The channel to push the file to + Note, the channel is cleared asynchronously. - Returns: - (String): Liquidsoap response + Args: + channel (Channel): The channel to clear """ - if ( - channel not in ChannelType.QUEUE.channels - and channel not in ChannelType.FALLBACK_QUEUE.channels - ): + if not self.channel_router.is_any_queue(channel): raise InvalidChannelException - self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel)) + def clean_up(): + # Wait some moments, if there is some long fade-out. Note, this also + # means, this channel should not be used for at least some seconds + # (including clearing time). + clear_timeout = 10 + self.logger.info(f"Clearing channel '{channel}' in {clear_timeout} seconds") + time.sleep(clear_timeout) - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "queue_clear") - self.logger.info("%s.clear result: %s" % (channel, result)) - self.connector.disable_transaction() + # Deactivate channel + self.connector.enable_transaction() + response = self.mixer.channel_activate(channel.value, False) + self.connector.disable_transaction() + msg = f"Deactivate channel '{channel}' with message '{response}'" + self.logger.info(SU.pink(msg)) - return result + # Remove all items from queue + self.connector.enable_transaction() + result = self.connector.send_lqc_command(channel, "queue_clear") + self.connector.disable_transaction() + msg = f"Clear queue channel '{channel}' with result: {result}" + self.logger.info(SU.pink(msg)) + + # FIXME Workaround to get clearing somewhat working (See engine-core#16) + self.mixer.channel_activate(channel.value, True) + self.mixer.channel_activate(channel.value, False) + + + Thread(target=clean_up).start() # # Channel Type - Playlist -- GitLab From a1c5610044875d6e01f606938dfed2063a79aa36 Mon Sep 17 00:00:00 2001 From: David Trattnig Date: Fri, 11 Feb 2022 19:15:03 +0100 Subject: [PATCH 12/12] Refact: Obsolete code --- src/scheduling/models.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/scheduling/models.py b/src/scheduling/models.py index b865504..9b3baf6 100644 --- a/src/scheduling/models.py +++ b/src/scheduling/models.py @@ -253,7 +253,6 @@ class Timeslot(DB.Model, AuraDatabaseModel): is_repetition = Column(Boolean()) # Transients - active_entry = None latest_channel = None @staticmethod @@ -290,21 +289,6 @@ class Timeslot(DB.Model, AuraDatabaseModel): ) return timeslots - def set_active_entry(self, entry): - """ - Sets the currently playing entry. - - Args: - entry (PlaylistEntry): The entry playing right now - """ - self.active_entry = entry - - def get_recent_entry(self): - """ - Retrieves the most recent played or currently playing entry. This is used to fade-out - the timeslot, when there is no other entry is following the current one. - """ - return self.active_entry @hybrid_property def start_unix(self): -- GitLab