# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time import logging from contextlib import suppress from threading import Thread import meta from src.base.config import AuraConfig from src.base.utils import SimpleUtil as SU from src.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, LoadSourceException from src.core.resources import ResourceClass, ResourceUtil from src.core.channels import ChannelType, TransitionType, LiquidsoapResponse, EntryPlayState, ResourceType, ChannelRouter from src.core.events import EngineEventDispatcher from src.core.control import EngineControlInterface from src.core.mixer import Mixer, MixerType from src.core.client.connector import PlayerConnector class Engine(): """ The Engine. """ instance = None engine_time_offset = 0.0 logger = None eci = None channels = None channel_router = None scheduler = None event_dispatcher = None plugins = None connector = None def __init__(self): """ Constructor """ if Engine.instance: raise Exception("Engine is already running!") Engine.instance = self self.logger = logging.getLogger("AuraEngine") self.config = AuraConfig.config() Engine.engine_time_offset = float(self.config.get("lqs_delay_offset")) self.plugins = dict() self.channel_router = ChannelRouter(self.config, self.logger) self.start() def start(self): """ Starts the engine. Called when the connection to the sound-system implementation has been established. """ self.event_dispatcher = EngineEventDispatcher(self) self.eci = EngineControlInterface(self, self.event_dispatcher) self.connector = PlayerConnector(self.event_dispatcher) self.event_dispatcher.on_initialized() while not self.is_connected(): self.logger.info(SU.yellow("Waiting for Liquidsoap to be running ...")) time.sleep(2) self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap")) self.player = Player(self.connector, self.event_dispatcher) self.event_dispatcher.on_boot() self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__)) self.event_dispatcher.on_ready() # # Basic Methods # def is_connected(self): """ Checks if there's a valid connection to Liquidsoap. """ has_connection = False try: self.uptime() has_connection = True except LQConnectionError as e: self.logger.info("Liquidsoap is not running so far") except Exception as e: self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) return has_connection def engine_state(self): """ Retrieves the state of all inputs and outputs. """ state = self.connector.send_lqc_command("engine", "state") return state def version(self): """ Get the version of Liquidsoap. """ data = self.connector.send_lqc_command("version", "") return data def uptime(self): """ Retrieves the uptime of Liquidsoap. """ self.connector.enable_transaction() data = self.connector.send_lqc_command("uptime", "") self.connector.disable_transaction() return data @staticmethod def engine_time(): """ Liquidsoap is slow in executing commands, therefore it's needed to schedule actions by (n) seconds in advance, as defined in the configuration file by the property `lqs_delay_offset`. it's important to note that this method requires the class variable `EngineUtil.engine_time_offset` to be set on Engine initialization. Returns: (Integer): the Unix epoch timestamp including the offset """ return SU.timestamp() + Engine.engine_time_offset @staticmethod def get_instance(): """ Returns the one and only engine. """ return Engine.instance def terminate(self): """ Terminates the engine and all related processes. """ if self.eci: self.eci.terminate() # # PLAYER # class Player: """ Engine Player. """ config = None logger = None connector = None channels = None channel_router = None event_dispatcher = None mixer = None mixer_fallback = None def __init__(self, connector, event_dispatcher): """ Constructor Args: config (AuraConfig): The configuration """ self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") self.event_dispatcher = event_dispatcher self.connector = connector self.channel_router = ChannelRouter(self.config, self.logger) self.mixer = Mixer(self.config, MixerType.MAIN, self.connector) self.mixer_fallback = Mixer(self.config, MixerType.FALLBACK, self.connector) def preload(self, entry): """ Pre-Load the entry. This is required before the actual `play(..)` can happen. Be aware when using this method to queue a very short entry (shorter than ``) this may result in sitations with incorrect timing. In this case bundle multiple short entries as one queue using `preload_playlist(self, entries)`. It's important to note, that his method is blocking until loading has finished. If this method is called asynchronously, the progress on the preloading state can be looked up in `entry.state`. Args: entries ([Entry]): An array holding filesystem entries """ entry.status = EntryPlayState.LOADING self.logger.info("Loading entry '%s'" % entry) is_ready = False # LIVE if entry.get_content_type() in ResourceClass.LIVE.types: entry.channel = "linein_" + entry.source.split("line://")[1] is_ready = True else: channel_type = self.channel_router.type_for_resource(entry.get_content_type()) entry.previous_channel, entry.channel = self.channel_router.channel_swap(channel_type) # QUEUE if entry.get_content_type() in ResourceClass.FILE.types: is_ready = self.queue_push(entry.channel, entry.source) # STREAM elif entry.get_content_type() in ResourceClass.STREAM.types: is_ready = self.stream_load_entry(entry) if is_ready: entry.status = EntryPlayState.READY self.event_dispatcher.on_queue([entry]) def preload_group(self, entries, channel_type=ChannelType.QUEUE): """ Pre-Load multiple filesystem entries at once. This call is required before the actual `play(..)` can happen. Due to their nature, non-filesystem entries cannot be queued using this method. In this case use `preload(self, entry)` instead. This method also allows queuing of very short files, such as jingles. It's important to note, that his method is blocking until loading has finished. If this method is called asynchronously, the progress on the preloading state can be looked up in `entry.state`. Args: entries ([Entry]): An array holding filesystem entries channel_type (ChannelType): The type of channel where it should be queued (optional) """ channels = None # Validate entry type for entry in entries: if entry.get_content_type() != ResourceType.FILE: raise InvalidChannelException # Determine channel channels = self.channel_router.channel_swap(channel_type) # Queue entries for entry in entries: entry.status = EntryPlayState.LOADING self.logger.info("Loading entry '%s'" % entry) # Choose and save the input channel entry.previous_channel, entry.channel = channels if self.queue_push(entry.channel, entry.source) == True: entry.status = EntryPlayState.READY self.event_dispatcher.on_queue(entries) return channels def play(self, entry, transition): """ Plays a new `Entry`. In case of a new timeslot (or some intented, immediate transition), a clean channel is selected and transitions between old and new channel is performed. This method expects that the entry is pre-loaded using `preload(..)` or `preload_group(self, entries)` before being played. In case the pre-roll has happened for a group of entries, only the first entry of the group needs to be passed. Args: entry (PlaylistEntry): The audio source to be played transition (TransitionType): The type of transition to use e.g. fade-in or instant volume level. queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so; otherwise a new channel of the same type is activated """ with suppress(LQConnectionError): channel_type = self.channel_router.type_of_channel(entry.channel) mixer = self.mixer if channel_type == ChannelType.FALLBACK_QUEUE: mixer = self.mixer_fallback # Instant activation or fade-in self.connector.enable_transaction() if transition == TransitionType.FADE: mixer.channel_select(entry.channel.value, True) mixer.fade_in(entry.channel, entry.volume) else: mixer.channel_activate(entry.channel.value, True) self.connector.disable_transaction() # Update active channel for the current channel type self.channel_router.set_active(channel_type, entry.channel) # Dear filesystem channels, please leave the room as you would like to find it! if entry.previous_channel and \ entry.previous_channel in ChannelType.QUEUE.channels and \ entry.previous_channel in ChannelType.FALLBACK_QUEUE.channels: def clean_up(): # Wait a little, if there is some long fade-out. Note, this also means, # this channel should not be used for at least some seconds (including clearing time). time.sleep(2) self.connector.enable_transaction() mixer.channel_activate(entry.previous_channel.value, False) res = self.queue_clear(entry.previous_channel) self.logger.info("Clear Queue Response: " + res) self.connector.disable_transaction() Thread(target=clean_up).start() self.event_dispatcher.on_play(entry) def stop(self, entry, transition): """ Stops the currently playing entry. Args: entry (Entry): The entry to stop playing transition (TransitionType): The type of transition to use e.g. fade-out. """ with suppress(LQConnectionError): self.connector.enable_transaction() if not entry.channel: self.logger.warn(SU.red("Trying to stop entry %s, but it has no channel assigned" % entry)) return if transition == TransitionType.FADE: self.mixer.fade_out(entry.channel) else: self.mixer.channel_volume(entry.channel, 0) self.logger.info(SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry))) self.connector.disable_transaction() self.event_dispatcher.on_stop(entry) def start_fallback_playlist(self, entries): """ Sets any scheduled fallback playlist and performs a fade-in. Args: entries ([Entry]): The playlist entries """ self.preload_group(entries, ChannelType.FALLBACK_QUEUE) self.play(entries[0], TransitionType.FADE) self.event_dispatcher.on_fallback_updated(entries) def stop_fallback_playlist(self): """ Performs a fade-out and clears any scheduled fallback playlist. """ dirty_channel = self.channel_router.get_active(ChannelType.FALLBACK_QUEUE) self.logger.info(f"Fading out channel '{dirty_channel}'") self.connector.enable_transaction() self.mixer_fallback.fade_out(dirty_channel) self.connector.disable_transaction() def clean_up(): # Wait a little, if there is some long fade-out. Note, this also means, # this channel should not be used for at least some seconds (including clearing time). time.sleep(2) self.connector.enable_transaction() self.mixer_fallback.channel_activate(dirty_channel.value, False) res = self.queue_clear(dirty_channel) self.logger.info("Clear Fallback Queue Response: " + res) self.connector.disable_transaction() self.event_dispatcher.on_fallback_cleaned(dirty_channel) Thread(target=clean_up).start() # # Channel Type - Stream # def stream_load_entry(self, entry): """ Loads the given stream entry and updates the entries's status codes. Args: entry (Entry): The entry to be pre-loaded Returns: (Boolean): `True` if successfull """ self.stream_load(entry.channel, entry.source) time.sleep(1) retry_delay = self.config.get("input_stream_retry_delay") max_retries = self.config.get("input_stream_max_retries") retries = 0 while not self.stream_is_ready(entry.channel, entry.source): self.logger.info("Loading Stream ...") if retries >= max_retries: raise LoadSourceException("Could not connect to stream while waiting for %s seconds!" % str(retries*retry_delay)) time.sleep(retry_delay) retries += 1 return True def stream_load(self, channel, url): """ Preloads the stream URL on the given channel. Note this method is blocking some serious amount of time; hence it's worth being called asynchroneously. Args: channel (Channel): The stream channel url (String): The stream URL Returns: (Boolean): `True` if successful """ result = None self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "stream_stop") if result != LiquidsoapResponse.SUCCESS.value: self.logger.error("%s.stop result: %s" % (channel, result)) raise LQStreamException("Error while stopping stream!") result = self.connector.send_lqc_command(channel, "stream_set_url", url) if result != LiquidsoapResponse.SUCCESS.value: self.logger.error("%s.set_url result: %s" % (channel, result)) raise LQStreamException("Error while setting stream URL!") # Liquidsoap ignores commands sent without a certain timeout time.sleep(2) result = self.connector.send_lqc_command(channel, "stream_start") self.logger.info("%s.start result: %s" % (channel, result)) self.connector.disable_transaction() return result def stream_is_ready(self, channel, url): """ Checks if the stream on the given channel is ready to play. Note this method is blocking some serious amount of time even when successfull; hence it's worth being called asynchroneously. Args: channel (Channel): The stream channel url (String): The stream URL Returns: (Boolean): `True` if successful """ result = None self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "stream_status") self.logger.info("%s.status result: %s" % (channel, result)) if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value): return False lqs_url = result.split(" ")[1] if not url == lqs_url: self.logger.error("Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url)) return False self.connector.disable_transaction() stream_buffer = self.config.get("input_stream_buffer") self.logger.info("Ready to play stream, but wait %s seconds until the buffer is filled..." % str(stream_buffer)) time.sleep(round(float(stream_buffer))) return True # # Channel Type - Queue # def queue_push(self, channel, source): """ Adds an filesystem URI to the given `ChannelType.QUEUE` channel. Args: channel (Channel): The channel to push the file to source (String): The URI of the file Returns: (Boolean): `True` if successful """ if channel not in ChannelType.QUEUE.channels and \ channel not in ChannelType.FALLBACK_QUEUE.channels: raise InvalidChannelException self.connector.enable_transaction() audio_store = self.config.get("audio_source_folder") extension = self.config.get("audio_source_extension") filepath = ResourceUtil.source_to_filepath(audio_store, source, extension) self.logger.info(SU.pink(f"{channel}.queue_push('{filepath}')")) result = self.connector.send_lqc_command(channel, "queue_push", filepath) self.logger.info("%s.queue_push result: %s" % (channel, result)) self.connector.disable_transaction() # If successful, Liquidsoap returns a resource ID of the queued track resource_id = -1 try: resource_id = int(result) except ValueError: self.logger.error(SU.red("Got an invalid resource ID: '%s'" % result)) return False return resource_id >= 0 def queue_seek(self, channel, seconds_to_seek): """ Forwards the player of the given `ChannelType.QUEUE` channel by (n) seconds. Args: channel (Channel): The channel to push the file to seconds_to_seeks (Float): The seconds to skip Returns: (String): Liquidsoap response """ if channel not in ChannelType.QUEUE.channels and \ channel not in ChannelType.FALLBACK_QUEUE.channels: raise InvalidChannelException self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "queue_seek", str(seconds_to_seek)) self.logger.info("%s.seek result: %s" % (channel, result)) self.connector.disable_transaction() return result def queue_clear(self, channel): """ Removes all tracks currently queued in the given `ChannelType.QUEUE` channel. Args: channel (Channel): The channel to push the file to Returns: (String): Liquidsoap response """ if channel not in ChannelType.QUEUE.channels and \ channel not in ChannelType.FALLBACK_QUEUE.channels: raise InvalidChannelException self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel)) self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "queue_clear") self.logger.info("%s.clear result: %s" % (channel, result)) self.connector.disable_transaction() return result # # Channel Type - Playlist # def playlist_set_uri(self, channel, playlist_uri): """ Sets the URI of a playlist. Args: channel (Channel): The channel to push the file to playlist_uri (String): The path to the playlist Returns: (String): Liquidsoap response """ self.logger.info(SU.pink("Setting URI of playlist '%s' to '%s'" % (channel, playlist_uri))) self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "playlist_uri_set", playlist_uri) self.logger.info("%s.playlist_uri result: %s" % (channel, result)) self.connector.disable_transaction() return result def playlist_clear_uri(self, channel): """ Clears the URI of a playlist. Args: channel (Channel): The channel to push the file to Returns: (String): Liquidsoap response """ self.logger.info(SU.pink("Clearing URI of playlist '%s'" % (channel))) self.connector.enable_transaction() result = self.connector.send_lqc_command(channel, "playlist_uri_clear") self.logger.info("%s.playlist_uri_clear result: %s" % (channel, result)) self.connector.disable_transaction() return result class EngineSplash: @staticmethod def splash_screen(component, version): """ Prints the engine logo and version info. """ return """\n █████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗ ██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝ ███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗ ██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝ ██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝ %s v%s - Ready to play! \n""" % (component, version)