diff --git a/src/aura_engine/base/exceptions.py b/src/aura_engine/base/exceptions.py deleted file mode 100644 index 8cbb5fdecedfa0c0ebabf61ef3a3a1ef84982e17..0000000000000000000000000000000000000000 --- a/src/aura_engine/base/exceptions.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A collection of exceptions. -""" - -# Scheduler Exceptions - - -class NoProgrammeLoadedException(Exception): - """ - Exception thrown when no programme data could be loaded. - """ - - pass - - -class NoActiveTimeslotException(Exception): - """ - Exception thrown when there is no timeslot active. - """ - - pass - - -# Soundsystem and Mixer Exceptions - - -class LoadSourceException(Exception): - """ - Exception thrown when some source could not be loaded. - """ - - pass - - -class InvalidChannelException(Exception): - """ - Exception thrown when the given channel is invalid. - """ - - pass - - -class PlaylistException(Exception): - """ - Exception thrown when the playlist is invalid. - """ - - pass - - -class NoActiveEntryException(Exception): - """ - Exception thrown when there is no playlist entry active. - """ - - pass - - -# Liquidsoap Exceptions - - -class LQConnectionError(Exception): - """ - Exception thrown when there is a connection problem with Liquidsoap. - """ - - pass - - -class LQStreamException(Exception): - """ - Exception thrown when there is a problem with an audio stream in Liquidsoap. - """ - - pass diff --git a/src/aura_engine/client/connector.py b/src/aura_engine/client/connector.py deleted file mode 100644 index 4785d6c7fd45596636eede240fee11b45eb1c91a..0000000000000000000000000000000000000000 --- a/src/aura_engine/client/connector.py +++ /dev/null @@ -1,265 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import logging -import time - -from aura_engine.base.config import AuraConfig -from aura_engine.base.exceptions import LQConnectionError -from aura_engine.base.utils import SimpleUtil as SU -from aura_engine.base.utils import TerminalColors -from aura_engine.client.playerclient import LiquidSoapPlayerClient - - -class PlayerConnector: - """ - Establishes a Socket connection to Liquidsoap. - - #TODO Refactor class: https://gitlab.servus.at/aura/engine/-/issues/65 - """ - - client = None - logger = None - transaction = 0 - connection_attempts = 0 - disable_logging = False - event_dispatcher = None - has_connection = None - - def __init__(self, event_dispatcher): - """ - Constructor - - Args: - config (AuraConfig): The configuration - """ - self.config = AuraConfig.config() - self.logger = logging.getLogger("AuraEngine") - self.client = LiquidSoapPlayerClient(self.config, "engine.sock") - self.event_dispatcher = event_dispatcher - self.has_connection = False - - def send_lqc_command(self, namespace, command, *args): - """ - Ein Kommando an Liquidsoap senden - @type lqs_instance: object - @param lqs_instance: Instance of LiquidSoap Client - @type namespace: string - @param namespace: Namespace of function - @type command: string - @param command: Function name - @type args: list - @param args: List of parameters - @rtype: string - @return: Response from LiquidSoap - """ - lqs_instance = self.client - try: - if not self.disable_logging: - if command == "": - self.logger.debug( - "LiquidSoapCommunicator is calling " + str(namespace) + str(args) - ) - else: - self.logger.debug( - "LiquidSoapCommunicator is calling " - + str(namespace) - + "." - + str(command) - + str(args) - ) - - # call wanted function ... - - # FIXME REFACTOR all calls in a common way - if command in [ - "queue_push", - "queue_seek", - "queue_clear", - "playlist_uri_set", - "playlist_uri_clear", - "stream_set_url", - "stream_start", - "stream_stop", - "stream_status", - "set_track_metadata", - ]: - - func = getattr(lqs_instance, command) - result = func(str(namespace), *args) - - elif namespace == "mixer": # or namespace == "mixer_fallback": - func = getattr(lqs_instance, command) - result = func(str(namespace), *args) - else: - func = getattr(lqs_instance, namespace) - result = func(command, *args) - - if not self.disable_logging: - self.logger.debug("LiquidSoapCommunicator got response " + str(result)) - - self.connection_attempts = 0 - - return result - - except LQConnectionError as e: - self.logger.error( - "Connection Error when sending " + str(namespace) + "." + str(command) + str(args) - ) - if self.try_to_reconnect(): - time.sleep(0.2) - self.connection_attempts += 1 - if self.connection_attempts < 5: - # reconnect - self.__open_conn(self.client) - self.logger.info( - "Trying to resend " + str(namespace) + "." + str(command) + str(args) - ) - # grab return value - retval = self.send_lqc_command(namespace, command, *args) - # disconnect - self.__close_conn(self.client) - # return the val - return retval - else: - if command == "": - msg = ( - "Rethrowing Exception while trying to send " - + str(namespace) - + str(args) - ) - else: - msg = ( - "Rethrowing Exception while trying to send " - + str(namespace) - + "." - + str(command) - + str(args) - ) - - self.logger.info(msg) - self.disable_transaction(socket=self.client, force=True) - raise e - else: - self.event_dispatcher.on_critical( - "Critical Liquidsoap connection issue", - "Could not connect to Liquidsoap after multiple attempts", - e, - ) - raise e - - def try_to_reconnect(self): - self.enable_transaction() - return self.transaction > 0 - - def enable_transaction(self, socket=None): - # set socket to playout if nothing else is given - if socket is None: - socket = self.client - - self.transaction = self.transaction + 1 - - self.logger.debug( - TerminalColors.WARNING.value - + "Enabling transaction! cnt: " - + str(self.transaction) - + TerminalColors.ENDC.value - ) - - if self.transaction > 1: - return - - try: - self.__open_conn(socket) - except FileNotFoundError: - self.disable_transaction(socket=socket, force=True) - subject = "CRITICAL Exception when connecting to Liquidsoap" - msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?" - self.logger.critical(SU.red(msg)) - # Not using this for now, as it should be triggered by "on_sick(..)" as well - if False: - self.event_dispatcher.on_critical(subject, msg, None) - - def disable_transaction(self, socket=None, force=False): - if not force: - # nothing to disable - if self.transaction == 0: - return - - # decrease transaction counter - self.transaction = self.transaction - 1 - - # debug msg - self.logger.debug( - TerminalColors.WARNING.value - + "DISabling transaction! cnt: " - + str(self.transaction) - + TerminalColors.ENDC.value - ) - - # return if connection is still needed - if self.transaction > 0: - return - else: - self.logger.debug( - TerminalColors.WARNING.value - + "Forcefully DISabling transaction! " - + TerminalColors.ENDC.value - ) - - # close conn and set transactioncounter to 0 - self.__close_conn(socket) - self.transaction = 0 - - def __open_conn(self, socket): - # already connected - # if self.transaction > 1: - # return - - if self.has_connection: - return - - self.logger.debug( - TerminalColors.GREEN.value - + "LiquidSoapCommunicator opening conn" - + TerminalColors.ENDC.value - ) - - # try to connect - socket.connect() - self.has_connection = True - - def __close_conn(self, socket): - # set socket to playout - if socket is None: - socket = self.client - - # do not disconnect if a transaction is going on - if self.transaction > 0: - return - - # say bye - # socket.byebye() - - # debug msg - self.logger.debug( - TerminalColors.BLUE.value - + "LiquidSoapCommunicator closed conn" - + TerminalColors.ENDC.value - ) diff --git a/src/aura_engine/client/playerclient.py b/src/aura_engine/client/playerclient.py deleted file mode 100644 index c7557bb276320a5ef1a7b8891037b016d725ff52..0000000000000000000000000000000000000000 --- a/src/aura_engine/client/playerclient.py +++ /dev/null @@ -1,489 +0,0 @@ -# -# Aura Engine (https://gitlab.servus.at/aura/engine) -# -# Copyright (C) 2017-2020 - The Aura Engine Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -from aura_engine.core.client import CoreConnection - - -class LiquidSoapPlayerClient(CoreConnection): - - # TODO Refactor class: https://gitlab.servus.at/aura/engine/-/issues/65 - - # - # Mixer - # - - # def mixer(self, mixer_id, command, *args): - # if command == "status": - # return self.mixer_status(mixer_id, *args) - - # if command == "inputs": - # return self.mixer_inputs(mixer_id) - - # if command == "volume": - # return self.mixer_volume(mixer_id, *args) - - # if command == "select": - # if len(args) == 2: - # return self.mixer_select(mixer_id, args[0], args[1]) - - # if command == "activate": - # if len(args) == 2: - # return self.mixer_activate(mixer_id, args[0], args[1]) - - # return "LiquidSoapPlayerClient does not understand mixer."+command+str(args) - - def mixer_outputs(self, mixer_id): - # send command - self.command(mixer_id, "outputs") - return self.message - - def mixer_inputs(self, mixer_id): - # send command - self.command(mixer_id, "inputs") - - # convert to list and return it - return self.message.strip().split(" ") - - def mixer_status(self, mixer_id, pos=""): - """ - Get state of a source in the mixer - @type pos: string - @param pos: Mixerposition - @rtype: string - @return: Response from LiquidSoap - """ - self.command(mixer_id, "status", str(pos)) - return self.message - - # def input_status(self, input_id): - # """ - # Retrieves the status of the given input - - # Args: - # input_id (_type_): _description_ - # """ - # self.command(input_id, "status") - # return self.message - - def mixer_volume(self, mixer_id, pos, volume): - """ - Sets some mixer channel to the given volume - - Args: - pos (Integer): The channel number - volume (Integer): The volume - - Returns: - (String): Liquidsoap server response - """ - self.command(mixer_id, "volume", str(pos) + " " + str(volume)) - return self.message - - def mixer_select(self, mixer_id, pos, select): - """ - Selects some mixer channel or vice versa. - - Args: - pos (Integer): The channel number - select (Boolean): Select or deselect - - Returns: - (String): Liquidsoap server response - """ - self.command(mixer_id, "select", str(pos) + " " + str(select).lower()) - return self.message - - def mixer_activate(self, mixer_id, pos, activate): - """ - Selects some mixer channel and increases the volume to 100 or vice versa. - - Args: - pos (Integer): The channel number - activate (Boolean): Activate or deactivate - - Returns: - (String): Liquidsoap server response - """ - self.command(mixer_id, "activate", str(pos) + " " + str(activate).lower()) - return self.message - - # - # Channel (general) - # - - def set_track_metadata(self, channel, json_meta): - """ - Sets additional metadata for the current track - """ - self.command(channel, "set_track_metadata", json_meta) - return self.message - - # - # Queues - # - - def queue_push(self, channel, uri): - """ - Pushes the passed file URI to the `equeue` playlist channel. - - Args: - channel (String): Liquidsoap Source ID - uri (String): Path to the file - """ - self.command(channel, "push", uri) - return self.message - - def queue_seek(self, channel, duration): - """ - Forward the playing `equeue` track/playlist of the given channel. - - Args: - channel (String): Liquidsoap Source ID - duration (Integer): Seek duration ins seconds - - Returns: - Liquidsoap server response - """ - self.command(channel, "seek", str(duration)) - return self.message - - def queue_clear(self, channel): - """ - Clears all `equeue` playlist entries of the given channel. - - Args: - channel (String): Liquidsoap Source ID - duration (Integer): Seek duration ins seconds - - Returns: - Liquidsoap server response - """ - self.command(channel, "clear") - return self.message - - # - # Playlist - # - - def playlist_uri_set(self, channel, uri): - """ - Sets the URI of a playlist source. - - Args: - channel (String): Liquidsoap Source ID - uri (String): URI to the playlist file - - Returns: - Liquidsoap server response - """ - self.command(channel, "uri", uri) - return self.message - - def playlist_uri_clear(self, channel): - """ - Clears the URI of a playlist source. - - Args: - channel (String): Liquidsoap Source ID - uri (String): URI to the playlist file - - Returns: - Liquidsoap server response - """ - self.command(channel, "clear") - return self.message - - # - # Stream - # - - def stream_set_url(self, channel, url): - """ - Sets the URL on the given HTTP channel. - """ - self.command(channel, "url", url) - return self.message - - def stream_start(self, channel): - """ - Starts the HTTP stream set with `stream_set_url` on the given channel. - """ - self.command(channel, "start") - return self.message - - def stream_stop(self, channel): - """ - Stops the HTTP stream on the given channel. - """ - self.command(channel, "stop") - return self.message - - def stream_status(self, channel): - """ - Returns the status of the HTTP stream on the given channel. - """ - self.command(channel, "status") - return self.message - - # - # General Entries - # - - def entry_status(self, rid): - """ - Retrieves the status of a given entry. - - Args: - rid (String): Resource ID (RID) - - Returns: - Liquidsoap server response - """ - self.command("request", "status", str(rid)) - return self.message - - # - # Other - # - - def uptime(self, command=""): # no command will come - """ - Retrieves how long the engine is running already. - """ - return self.command("", "uptime") - - def version(self, command=""): # no command will come - """ - Retrieves the Liquidsoap version. - """ - return self.command("", "version") - - def engine(self, command, *args): - """ - Retrieves the state of all input and outputs. - """ - if command == "version": - return self.engine_version() - if command == "state": - return self.engine_state() - if command == "update_config": - return self.engine_update_config(str(args[0])) - if command == "set_track_metadata": - return self.engine_set_track_metadata(str(args[0])) - - return "LiquidSoapPlayerClient does not understand engine." + command + str(args) - - def engine_version(self): - """ - Retrieves the state of all input and outputs. - """ - self.command("aura_engine", "version") - return self.message - - def engine_state(self): - """ - Retrieves the state of all input and outputs. - """ - self.command("aura_engine", "status") - return self.message - - def engine_update_config(self, json_config): - """ - Updates the config - """ - self.command("aura_engine", "update_config", json_config) - return self.message - - # def skip(self, namespace="playlist", pos=""): - # """ - # Source skippen - # @type namespace: string - # @param namespace: Namespace der Source - # @type pos: string - # @param pos: Die Position - optional - Position des Channels vom Mixer benötigt - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('skip', namespace, pos) - # return self.message - - # def remove(self, pos, namespace="playlist"): - # """ - # Track aus der secondary_queue oder der Playlist entfernen - # @type pos: string - # @param pos: Die Position - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('remove', namespace, str(pos)) - # return self.message - - # def insert(self, uri, pos='0', namespace="playlist"): - # """ - # Track einfügen - # @type uri: string - # @param uri: Uri einer Audiodatei - # @type pos: string - # @param pos: Die Position - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('insert', namespace, str(pos) + ' ' + uri) - # return self.message - - # def move(self, fromPos, toPos, namespace="playlist"): - # """ - # Track von Position fromPos nach Position toPos verschieben - # @type fromPos: string/int - # @param fromPos: Position des zu verschiebenden Tracks - # @type toPos: string - # @param toPos: Die Position zu der verschoben werden soll - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('move', namespace, str(fromPos) + ' ' + str(toPos)) - # return self.message - - # def play(self, namespace="playlist"): - # """ - # Source abspielen - funktioniert nur bei Playlist - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('play', namespace) - # return self.message - - # def pause(self, namespace="playlist"): - # """ - # Source pausieren/stoppen - funktioniert nur bei Playlist - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('pause', namespace) - # return self.message - - # def flush(self, namespace="playlist"): - # """ - # Playlist leeren - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('flush', namespace) - # return self.message - - # def playlistData(self): - # """ - # Metadaten der Playlist ausgeben - # @rtype: string - # @return: Ein Json-String - # """ - # self.command('data', 'playlist') - # return self.message - - # def get_queue(self, namespace="ch1", queue='queue'): - # """ - # Queue eines Kanals ausgeben - # @type namespace: string - # @param namespace: Namespace der Source - # @type queue: string - # @param queue: Name des queues (queue, primary_queue, secondary_queue) - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command(queue, namespace) - # return self.message - - # def loadPlaylist(self, uri, params="", namespace="playlist"): - # """ - # Playlist laden - # @type uri: string - # @param uri: Uri einer Playlist im XSPF-Format - # @type params: string - # @param params: obsolete - # @type namespace: string - # @param namespace: Namespace der Source - hier nur playlist - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('load', namespace, uri + params) - # return self.message - - # def currentTrack(self, namespace="request"): - # """ - # Das oder die ID(s) der gerade abgespielten requests erhalten - # @type namespace: string - # @param namespace: Namespace der Source - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers (als String) - # """ - # self.command('on_air', namespace) - # return self.message - - def volume(self, pos, volume, namespace="mixer"): - """ - Lautstärke eines Kanals setzen - @type pos: int/string - @param pos: Die Position/ Nummer des Kanals (playlist=0) - @type volume: int/string - @param volume: Zahl von 1 -100 - @type namespace: string - @param namespace: Namespace der Source (immer mixer) - @rtype: string - @return: Die Antwort des Liquidsoap-Servers - """ - self.command("volume", namespace, str(pos) + " " + str(volume)) - return self.message - - # def playlist_remaining(self): - # """ - # Wie lange läuft der aktuelle Track der Playlist noch - # @rtype: string - # @return: Die Antwort des Liquidsoap-Servers - # """ - # self.command('remaining', 'playlist') - # return self.message - - # def list_channels(self): - # """ - # Channels auflisten (Simple JSON) - # """ - # # Liquidsoap Kommando - - # channels = self.sendLqcCommand(self.lqc, 'mixer', 'inputs') - - # if not isinstance(channels, list): - # self.error('02') - # elif len(channels) < 1: - # self.warning('01') - # else: - # self.success('00', channels) - - # self.notifyClient() diff --git a/src/aura_engine/core/client.py b/src/aura_engine/core/client.py index 867ec52ff7d69500e39248a87da7f0c21eaca275..b41f403c67a82c46447725aa3b357ad5f0394167 100644 --- a/src/aura_engine/core/client.py +++ b/src/aura_engine/core/client.py @@ -21,18 +21,156 @@ Message and connection handling to Engine Core (Liquidsoap). """ - import logging import socket import urllib.parse -from multiprocessing import Lock from aura_engine.base.config import AuraConfig -from aura_engine.base.exceptions import LQConnectionError -from aura_engine.base.lang import private +from aura_engine.base.lang import private, synchronized from aura_engine.base.utils import SimpleUtil as SU +class CoreConnectionError(Exception): + """ + Exception thrown when there is a connection problem with Liquidsoap. + """ + + pass + + +class CoreClient: + """ + Client managing communication with Engine Core (Liquidsoap). + """ + + skip_log_commands = ("aura_engine.status", "volume") + + instance = None + logger = None + connection = None + event_dispatcher = None + conn = None + + def __init__(self): + """ + Initialize the connection. + """ + self.logger = logging.getLogger("AuraEngine") + self.config = AuraConfig.config() + self.conn = CoreConnection() + + @staticmethod + def get_instance(): + """ + Get an instance of the client singleton. + """ + if not CoreClient.instance: + CoreClient.instance = CoreClient() + return CoreClient.instance + + def set_event_dispatcher(self, event_dispatcher): + """ + Set an instance of the event dispatcher. + """ + self.event_dispatcher = event_dispatcher + + @synchronized + def connect(self): + """ + Open connection. + + @synchronized + """ + try: + if not self.conn.is_connected(): + self.conn.open() + except CoreConnectionError as e: + self.logger.critical(SU.red(e.message)) + self.event_dispatcher.on_critical("Client connection error", e.message, str(e)) + + @synchronized + def disconnect(self): + """ + Close the connection. + + @synchronized + """ + if not self.conn.is_connected(): + self.conn.close() + + @synchronized + def exec(self, namespace: str, action: str, args: str = "") -> str: + """ + Execute a command. + + Args: + namespace (str): The namespace for the command to execute. + action (str): The action to execute. + args (str, optional): Arguments passed with the action. Defaults to "". + + Raises: + CoreConnectionError: Raised when there is a connection or communication error. + + Returns: + str: result of the command (optional). + + @synchronized + + """ + response = None + + if not self.conn.is_connected(): + self.conn.open() + try: + command = self.build_command(namespace, action, args) + self.log_debug(command, f"[>>] {command}") + response = self.conn.send(command) + if response: + self.log_debug(command, f"[<<] {response}") + except CoreConnectionError as e: + msg = "Error while issuing command to Liquidsoap" + self.event_dispatcher.on_critical("Core client connection issue", msg, str(e)) + raise CoreConnectionError(msg, e) + return response + + @private + def build_command(self, namespace: str, action: str, args: str) -> str: + """ + Construct a command string for sending to Liquidsoap. + + Args: + namespace (str): The namespace for the command to execute. + action (str): The action to execute. + args (str, optional): Arguments passed with the action. Defaults to "". + + Returns: + str: The command string + + @private + """ + args = str(args).strip() + args = " " + urllib.parse.unquote(args) if args != "" else "" + namespace = str(namespace) + "." if namespace else "" + command = f"{namespace}{action}{args}" + return command + + @private + def log_debug(self, command: str, log_message: str): + """ + Check if the command is excluded from debug logging. + + This is meant to avoid log-pollution by status and fade commands. + + @private + + """ + if self.config.get("log_level") == "debug": + cmds = CoreClient.skip_log_commands + base_cmd = command.split(" ")[0] + if not base_cmd.startswith(cmds): + self.logger.debug(log_message) + + class CoreConnection: """ Handles connections and sends commands to Engine Core (Liquidsoap). @@ -41,30 +179,32 @@ class CoreConnection: ENCODING = "UTF-8" logger = None - skip_logging = ["aura_engine.status"] - config = None socket_path = None socket = None - mutex = None connected = None message = None - def __init__(self, config, socket_filename): + def __init__(self): """ Initialize the connection. """ self.logger = logging.getLogger("AuraEngine") - self.config = AuraConfig.config() - socket_path = config.get("socket_dir") + "/" + socket_filename + config = AuraConfig.config() + socket_path = config.get("socket_dir") + "/engine.sock" self.socket_path = config.to_abs_path(socket_path) - self.logger.debug("LiquidSoapClient using socketpath: " + self.socket_path) + self.logger.debug(f"Using socket at '{self.socket_path}'") - self.mutex = Lock() self.connected = False self.message = "" self.socket = None - def connect(self): + def is_connected(self): + """ + Return `True` if a connection is established. + """ + return self.connected + + def open(self): """ Connect to Liquidsoap socket. """ @@ -72,23 +212,21 @@ class CoreConnection: self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.connect(self.socket_path) + except FileNotFoundError as e: + msg = f"Socket file at '{self.socket_path}' not found. Is Liquidsoap running?" + self.connected = False + raise CoreConnectionError(msg, e) except socket.error as e: msg = f"Cannot connect to socket at '{self.socket_path}'" - self.logger.critical(SU.red(msg), e) self.connected = False + raise CoreConnectionError(msg, e) except Exception as e: - msg = "Unknown error while connecting to socket at '{self.socket_path}'" - self.logger.critical(SU.red(msg), e) + msg = f"Unknown error while connecting to socket at '{self.socket_path}'" self.connected = False + raise CoreConnectionError(msg, e) else: + self.connection_attempts = 0 self.connected = True - return True - - def is_connected(self): - """ - Return `True` if a connection is established. - """ - return self.connected def close(self): """ @@ -100,49 +238,37 @@ class CoreConnection: self.socket.close() self.connected = False - def command(self, namespace: str, command: str, param: str = "") -> str: + def send(self, command: str) -> str: """ Send command to Liquidsoap. Args: - namespace (str): Command namespace - command (str): Command - param (str, optional): Optional parameters + command (str): The command string to be executed Raises: - LQConnectionError: Thrown when not connected + CoreConnectionError: Thrown when not connected Returns: str: Result of the command """ - param = param.strip() - param = " " + urllib.parse.unquote(param) if param != "" else "" - namespace += "." if namespace else "" + result = None + command += "\n" - if self.connected: - message = f"{namespace}{command}{param}\n" - self.log_debug(namespace, command, f"Send message:\n{message}") - - try: - self.socket.sendall(message.encode()) - self.log_debug(namespace, command, "Waiting for reply...") - self.read() - self.log_debug(namespace, command, f"Got reply: {self.message}") - except BrokenPipeError: - msg = "Broken Pipe. Reconnecting..." - self.logger.error(SU.red(msg)) - self.connect() - raise - except Exception as e: - msg = f"Unexpected error while sending command '{message}'" - self.logger.error(SU.red(msg), e) - raise + try: + self.socket.sendall(command.encode()) + result = self.read() + except BrokenPipeError: + msg = "Broken Pipe while sending command" + self.logger.info(SU.pink(msg)) + self.connected = False + raise CoreConnectionError(msg) + except Exception as e: + msg = "Unknown Error while sending command" + self.logger.error(SU.red(msg), e) + self.connected = False + raise CoreConnectionError(msg) - return self.message - else: - msg = "LiquidsoapClient not connected to LiquidSoap Server" - self.logger.error(SU.red(msg)) - raise LQConnectionError(msg) + return str(result) @private def read_all(self, timeout: int = 2) -> str: @@ -155,21 +281,20 @@ class CoreConnection: Returns: str: The response + @private + """ data = "" try: self.socket.settimeout(timeout) - self.mutex.acquire() while True: data += self.socket.recv(1).decode(CoreConnection.ENCODING) if data.find("END\r\n") != -1 or data.find("Bye!\r\n") != -1: data.replace("END\r\n", "") break - self.mutex.release() except Exception as e: msg = "Unknown error while socket.read_all()" self.logger.error(SU.red(msg), e) - self.mutex.release() return data @private @@ -180,6 +305,8 @@ class CoreConnection: Returns: str: message read from socket + @private + """ if self.connected: ret = self.read_all().splitlines() @@ -197,13 +324,3 @@ class CoreConnection: self.logger.error(SU.red(msg), e) return self.message return None - - @private - def log_debug(self, namespace: str, command: str, msg: str) -> bool: - """ - Check if the command is excluded from debug logging. - - This is meant to avoid log-pollution by status and fade commands. - """ - if f"{namespace}{command}" not in self.skip_logging: - self.logger.debug(f"[{namespace}.{command}] {msg}") diff --git a/src/aura_engine/engine.py b/src/aura_engine/engine.py index 1197d18aca02572f91aa2923d72f4be3d6ccda41..24822510130fcaa0ae2966c4a9aaa2dfe62cfb70 100644 --- a/src/aura_engine/engine.py +++ b/src/aura_engine/engine.py @@ -30,15 +30,13 @@ from threading import Thread from aura_engine.base.api import LiquidsoapUtil as LU from aura_engine.base.config import AuraConfig -from aura_engine.base.exceptions import ( - InvalidChannelException, - LoadSourceException, - LQConnectionError, - LQStreamException, -) from aura_engine.base.lang import DotDict from aura_engine.base.utils import SimpleUtil as SU -from aura_engine.channels import ( +from aura_engine.control import EngineControlInterface +from aura_engine.core.client import CoreClient, CoreConnectionError +from aura_engine.events import EngineEventDispatcher +from aura_engine.resources import ResourceClass, ResourceUtil +from src.aura_engine.channels import ( Channel, ChannelResolver, ChannelRouter, @@ -48,11 +46,23 @@ from aura_engine.channels import ( ResourceType, TransitionType, ) -from aura_engine.client.connector import PlayerConnector -from aura_engine.control import EngineControlInterface -from aura_engine.events import EngineEventDispatcher -from aura_engine.mixer import Mixer, MixerType -from aura_engine.resources import ResourceClass, ResourceUtil +from src.aura_engine.mixer import Mixer, MixerType + + +class InvalidChannelException(Exception): + """ + Exception thrown when the given channel is invalid. + """ + + pass + + +class LoadSourceException(Exception): + """ + Exception thrown when some source could not be loaded or updated. + """ + + pass class Engine: @@ -64,11 +74,9 @@ class Engine: engine_time_offset = 0.0 logger = None eci = None - channels = None scheduler = None event_dispatcher = None - plugins = None - connector = None + client = None playout_state = None def __init__(self): @@ -81,7 +89,6 @@ class Engine: self.logger = logging.getLogger("AuraEngine") self.config = AuraConfig.config() Engine.engine_time_offset = float(self.config.get("engine_latency_offset")) - self.plugins = dict() self.start() def start(self): @@ -92,7 +99,8 @@ class Engine: """ self.event_dispatcher = EngineEventDispatcher(self) self.eci = EngineControlInterface(self, self.event_dispatcher) - self.connector = PlayerConnector(self.event_dispatcher) + self.client = CoreClient.get_instance() + self.client.set_event_dispatcher(self.event_dispatcher) self.event_dispatcher.on_initialized() while not self.is_connected(): @@ -105,7 +113,7 @@ class Engine: else: self.logger.info(SU.red("Error while updating playout config")) - self.player = Player(self.connector, self.event_dispatcher) + self.player = Player(self.client, self.event_dispatcher) self.event_dispatcher.on_boot() self.logger.info(EngineSplash.splash_screen(self.config)) @@ -123,10 +131,10 @@ class Engine: try: self.uptime() has_connection = True - except LQConnectionError: - self.logger.info("Liquidsoap is not running so far") + except CoreConnectionError: + self.logger.debug("Liquidsoap is not running so far") except Exception as e: - self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) + self.logger.error("Cannot check if Liquidsoap is running. \nReason: {e.message}", e) return has_connection @@ -134,7 +142,7 @@ class Engine: """ Retrieve the state of all inputs and outputs. """ - state = self.connector.send_lqc_command("engine", "state") + state = self.client.exec("aura_engine", "status") state = DotDict(LU.json_to_dict(state)) # Initialize state @@ -173,7 +181,7 @@ class Engine: "fallback_show_name": self.config.get("fallback_show_name"), } json_config = json.dumps(playout_config, ensure_ascii=False) - res = self.connector.send_lqc_command("engine", "update_config", json_config) + res = self.client.exec("aura_engine", "update_config", json_config) return res def init_version(self) -> dict: @@ -192,7 +200,7 @@ class Engine: with open(os.path.join("", "VERSION")) as version_file: ctrl_version = version_file.read().strip() - versions = self.connector.send_lqc_command("engine", "version") + versions = self.client.exec("aura_engine", "version") versions = DotDict(json.loads(versions)) self.config.set("version_control", ctrl_version) self.config.set("version_core", versions.core) @@ -202,9 +210,7 @@ class Engine: """ Retrieve the uptime of Liquidsoap. """ - self.connector.enable_transaction() - data = self.connector.send_lqc_command("uptime", "") - self.connector.disable_transaction() + data = self.client.exec("", "uptime") return data @staticmethod @@ -253,27 +259,27 @@ class Player: config = None logger = None - connector = None + client = None channels = None channel_router = None event_dispatcher = None mixer = None - def __init__(self, connector, event_dispatcher): + def __init__(self, client, event_dispatcher): """ Initialize the player. Args: - connector (Connector): Connector for the playout + client (CoreClient): Client for connecting to Engine Core (Liquidsoap) event_dispatcher (EventDispather): Dispatcher for issuing events """ self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") self.event_dispatcher = event_dispatcher - self.connector = connector + self.client = client self.channel_router = ChannelRouter() - self.mixer = Mixer(self.config, MixerType.MAIN, self.connector) + self.mixer = Mixer(self.config, MixerType.MAIN, self.client) def preload(self, entry): """ @@ -298,7 +304,7 @@ class Player: def set_metadata(): track_meta = ResourceUtil.generate_track_metadata(entry, True) json_meta = json.dumps(track_meta, ensure_ascii=False) - res = self.connector.send_lqc_command(entry.channel, "set_track_metadata", json_meta) + res = self.client.exec(entry.channel, "set_track_metadata", json_meta) self.logger.info(f"Response for '{entry.channel}.set_track_metadata': {res}") if res not in LiquidsoapResponse.SUCCESS.value: msg = f"Error while setting metadata on {entry.channel} to:\n{json_meta}" @@ -308,7 +314,7 @@ class Player: if entry.get_content_type() in ResourceClass.LIVE.types: entry.channel = ChannelResolver.live_channel_for_resource(entry.source) if entry.channel is None: - self.logger.critical(SU.red("Invalid live channel '{entry.source}' requested!")) + self.logger.critical(SU.red("No live channel for '{entry.source}' source")) entry.previous_channel = None set_metadata() is_ready = True @@ -364,7 +370,7 @@ class Player: channels = self.channel_router.get_free_channel(channel_type) for entry in entries: entry.status = EntryPlayState.LOADING - self.logger.info("Loading entry '%s'", entry) + self.logger.info(f"Loading entry '{entry}'") # Choose and save the input channel entry.previous_channel, entry.channel = channels @@ -394,11 +400,10 @@ class Player: otherwise a new channel of the same type is activated """ - with suppress(LQConnectionError): + with suppress(CoreConnectionError): mixer = self.mixer # Instant activation or fade-in - self.connector.enable_transaction() if transition == TransitionType.FADE: mixer.channel_select(entry.channel.value, True) mixer.fade_in(entry.channel, entry.volume) @@ -408,7 +413,6 @@ class Player: mixer.channel_activate(entry.channel.value, True) msg = f"Activate channel '{entry.channel}'" self.logger.info(SU.pink(msg)) - self.connector.disable_transaction() # Update active channel for the current channel type self.channel_router.set_active(entry.channel) @@ -434,8 +438,7 @@ class Player: channel (Channel): The channel to stop playing or fade-out transition (TransitionType): The type of transition to use e.g. fade-out """ - with suppress(LQConnectionError): - self.connector.enable_transaction() + with suppress(CoreConnectionError): if not channel: self.logger.warn(SU.red("Cannot stop, no channel passed")) return @@ -446,7 +449,6 @@ class Player: self.mixer.channel_volume(channel, 0) self.logger.info(SU.pink(f"Mute channel '{channel}' with {transition}")) - self.connector.disable_transaction() self.event_dispatcher.on_stop(channel) # @@ -474,10 +476,8 @@ class Player: while not self.stream_is_ready(entry.channel, entry.source): self.logger.info("Loading Stream ...") if retries >= max_retries: - raise LoadSourceException( - "Could not connect to stream while waiting for %s seconds!" - % str(retries * retry_delay) - ) + msg = f"Stream connection failed after {retries * retry_delay} seconds!" + raise LoadSourceException(msg) time.sleep(retry_delay) retries += 1 @@ -494,32 +494,32 @@ class Player: channel (Channel): The stream channel url (String): The stream URL + Raises: + (LoadSourceException): When stream cannot be set or stopped. + Returns: (Boolean): `True` if successful """ result = None - - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "stream_stop") + result = self.client.exec(channel, "stop") if result not in LiquidsoapResponse.SUCCESS.value: - self.logger.error("%s.stop result: %s" % (channel, result)) - raise LQStreamException("Error while stopping stream!") + self.logger.error(f"{channel}.stop result: {result}") + raise LoadSourceException("Error while stopping stream!") - result = self.connector.send_lqc_command(channel, "stream_set_url", url) + result = self.client.exec(channel, "url", url) if result not in LiquidsoapResponse.SUCCESS.value: - self.logger.error("%s.set_url result: %s" % (channel, result)) - raise LQStreamException("Error while setting stream URL!") + self.logger.error(f"{channel}.url result: {result}") + raise LoadSourceException("Error while setting stream URL!") - # Liquidsoap ignores commands sent without a certain timeout + # TODO Review: Liquidsoap ignores commands sent without a certain timeout time.sleep(2) - result = self.connector.send_lqc_command(channel, "stream_start") - self.logger.info("%s.start result: %s" % (channel, result)) + result = self.client.exec(channel, "start") + self.logger.info(f"{channel}.start result: {result}") - self.connector.disable_transaction() return result def stream_is_ready(self, channel, url): @@ -538,29 +538,21 @@ class Player: """ result = None - - self.connector.enable_transaction() - - result = self.connector.send_lqc_command(channel, "stream_status") - self.logger.info("%s.status result: %s" % (channel, result)) + result = self.client.exec(channel, "status") + self.logger.info(f"{channel}.status result: {result}") if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value): return False lqs_url = result.split(" ")[1] if not url == lqs_url: - self.logger.error( - "Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url) - ) + msg = f"Wrong URL '{lqs_url}' set for channel '{channel}', expected: '{url}'." + self.logger.error(msg) return False - self.connector.disable_transaction() - stream_buffer = self.config.get("input_stream_buffer") - self.logger.info( - "Ready to play stream, but wait %s seconds until the buffer is filled..." - % str(stream_buffer) - ) + msg = f"Ready to play stream, but wait {stream_buffer} seconds to fill buffer..." + self.logger.info(msg) time.sleep(round(float(stream_buffer))) return True @@ -587,23 +579,22 @@ class Player: ): raise InvalidChannelException - self.connector.enable_transaction() audio_store = self.config.abs_audio_store_path() extension = self.config.get("audio_source_extension") filepath = ResourceUtil.source_to_filepath(audio_store, source, extension) - self.logger.info(SU.pink(f"{channel}.queue_push('{filepath}')")) + self.logger.info(SU.pink(f"{channel}.push('{filepath}')")) if metadata: filepath = ResourceUtil.lqs_annotate(filepath, metadata) - result = self.connector.send_lqc_command(channel, "queue_push", filepath) - self.logger.info("%s.queue_push result: %s" % (channel, result)) - self.connector.disable_transaction() + result = self.client.exec(channel, "push", filepath) + self.logger.info(f"{channel}.push result: {result}") # If successful, Liquidsoap returns a resource ID of the queued track resource_id = -1 try: resource_id = int(result) except ValueError: - self.logger.error(SU.red("Got an invalid resource ID: '%s'" % result)) + msg = SU.red(f"Got invalid resource ID: '{result}'") + self.logger.error(msg) return False return resource_id >= 0 @@ -626,10 +617,8 @@ class Player: ): raise InvalidChannelException - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "queue_seek", str(seconds_to_seek)) - self.logger.info("%s.seek result: %s" % (channel, result)) - self.connector.disable_transaction() + result = self.client.exec(channel, "seek", str(seconds_to_seek)) + self.logger.info(f"{channel}.seek result: {result}") return result @@ -655,70 +644,22 @@ class Player: # means, this channel should not be used for at least some seconds # (including clearing time). clear_timeout = 10 - self.logger.info(f"Clearing channel '{channel}' in {clear_timeout} seconds") + self.logger.info(f"Clear channel '{channel}' in {clear_timeout} seconds") time.sleep(clear_timeout) # Deactivate channel - self.connector.enable_transaction() response = self.mixer.channel_activate(channel.value, False) - self.connector.disable_transaction() msg = f"Deactivate channel '{channel}' with result '{response}'" self.logger.info(SU.pink(msg)) # Remove all items from queue - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "queue_clear") - self.connector.disable_transaction() - msg = f"Clear queue channel '{channel}' with result '{result}'" + result = self.client.exec(channel, "clear") + + msg = f"Cleared queue channel '{channel}' with result '{result}'" self.logger.info(SU.pink(msg)) Thread(target=clean_up).start() - # - # Channel Type - Playlist - # - - def playlist_set_uri(self, channel, playlist_uri): - """ - Set the URI of a playlist. - - Args: - channel (Channel): The channel to push the file to - playlist_uri (String): The path to the playlist - - Returns: - (String): Liquidsoap response - - """ - self.logger.info(SU.pink("Setting URI of playlist '%s' to '%s'" % (channel, playlist_uri))) - - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "playlist_uri_set", playlist_uri) - self.logger.info("%s.playlist_uri result: %s" % (channel, result)) - self.connector.disable_transaction() - - return result - - def playlist_clear_uri(self, channel): - """ - Clear the URI of a playlist. - - Args: - channel (Channel): The channel to push the file to - - Returns: - (String): Liquidsoap response - - """ - self.logger.info(SU.pink("Clearing URI of playlist '%s'" % (channel))) - - self.connector.enable_transaction() - result = self.connector.send_lqc_command(channel, "playlist_uri_clear") - self.logger.info("%s.playlist_uri_clear result: %s" % (channel, result)) - self.connector.disable_transaction() - - return result - class EngineSplash: """Print the splash and version information on boot.""" diff --git a/src/aura_engine/mixer.py b/src/aura_engine/mixer.py index ca51cded366b8e003a989e0f302b7493c59bbf0e..dc72c1912bb18f2ba7eb0ed645cba0aa23ac6a56 100644 --- a/src/aura_engine/mixer.py +++ b/src/aura_engine/mixer.py @@ -26,8 +26,8 @@ import time from enum import Enum from aura_engine.base.api import LiquidsoapUtil as LU -from aura_engine.base.exceptions import LQConnectionError from aura_engine.base.utils import SimpleUtil as SU +from aura_engine.core.client import CoreConnectionError class MixerType(Enum): @@ -63,13 +63,13 @@ class Mixer: config = None logger = None - connector = None + client = None mixer_id = None channels = None fade_in_active = None fade_out_active = None - def __init__(self, config, mixer_id, connector): + def __init__(self, config, mixer_id, client): """ Initialize the mixer object. @@ -82,7 +82,7 @@ class Mixer: self.mixer_id = mixer_id self.fade_in_active = None self.fade_out_active = None - self.connector = connector + self.client = client self.mixer_initialize() # @@ -98,14 +98,13 @@ class Mixer: - Initialize default channels per type """ - self.connector.enable_transaction() + time.sleep(1) # TODO Check is this is still required channels = self.mixer_channels_reload() # TODO Graceful reboot: At some point the current track playing could # resume inside Liquidsoap in case only Engine restarted (See #77). for channel in channels: self.channel_volume(channel, "0") - self.connector.disable_transaction() def mixer_status(self): """ @@ -113,22 +112,17 @@ class Mixer: """ cnt = 0 inputstate = {} - - self.connector.enable_transaction() inputs = self.mixer_channels() - for channel in inputs: inputstate[channel] = self.channel_status(cnt) cnt = cnt + 1 - - self.connector.disable_transaction() return inputstate def mixer_outputs(self): """ Retrieve the state of all inputs and outputs. """ - outputs = self.connector.send_lqc_command(self.mixer_id.value, "mixer_outputs") + outputs = self.client.exec(self.mixer_id.value, "outputs") outputs = LU.json_to_dict(outputs) return outputs @@ -137,7 +131,8 @@ class Mixer: Retrieve all mixer channels. """ if self.channels is None or len(self.channels) == 0: - self.channels = self.connector.send_lqc_command(self.mixer_id.value, "mixer_inputs") + channel_str = self.client.exec(self.mixer_id.value, "inputs") + self.channels = channel_str.split(" ") return self.channels def mixer_channels_selected(self): @@ -147,7 +142,6 @@ class Mixer: cnt = 0 activeinputs = [] - self.connector.enable_transaction() inputs = self.mixer_channels() for channel in inputs: @@ -156,7 +150,6 @@ class Mixer: activeinputs.append(channel) cnt = cnt + 1 - self.connector.disable_transaction() return activeinputs def mixer_channels_except(self, input_type): @@ -167,9 +160,8 @@ class Mixer: activemixer_copy = self.mixer_channels().copy() activemixer_copy.remove(input_type) except ValueError as e: - self.logger.error( - "Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e)) - ) + msg = f"Requested channel type '{input_type}' not in channel-list" + self.logger.error(SU.red(msg), e) except AttributeError: self.logger.critical("Empty channel list") @@ -200,9 +192,8 @@ class Mixer: channels = self.mixer_channels() index = channels.index(channel) if index < 0: - self.logger.critical( - f"There's no valid channel number for channel ID '{channel.value}'" - ) + msg = f"There's no valid channel number for channel ID '{channel.value}'" + self.logger.critical(SU.red(msg)) return None return index @@ -217,7 +208,7 @@ class Mixer: (String): Channel status info as a String """ - return self.connector.send_lqc_command(self.mixer_id.value, "mixer_status", channel_number) + return self.client.exec(self.mixer_id.value, "status", channel_number) def channel_select(self, channel, select): """ @@ -236,14 +227,13 @@ class Mixer: try: index = channels.index(channel) if len(channel) < 1: - self.logger.critical("Cannot select channel. There are no channels!") + self.logger.critical(SU.red("Cannot select channel because there are no channels")) else: - message = self.connector.send_lqc_command( - self.mixer_id.value, "mixer_select", index, select - ) + select = "true" if select else "false" + message = self.client.exec(self.mixer_id.value, "select", f"{index} {select}") return message except Exception as e: - self.logger.critical("Ran into exception when selecting channel. Reason: " + str(e)) + self.logger.critical(SU.red("Ran into exception when selecting channel"), e) def channel_activate(self, channel, activate): """ @@ -266,14 +256,13 @@ class Mixer: try: index = channels.index(channel) if len(channel) < 1: - self.logger.critical("Cannot activate channel. There are no channels!") + self.logger.critical(SU.red("Cannot activate channel. There are no channels!")) else: - message = self.connector.send_lqc_command( - self.mixer_id.value, "mixer_activate", index, activate - ) + activate = "true" if activate else "false" + message = self.client.exec(self.mixer_id.value, "activate", f"{index} {activate}") return message except Exception as e: - self.logger.critical("Ran into exception when activating channel. Reason: " + str(e)) + self.logger.critical(SU.red("Ran into exception when activating channel."), e) def channel_current_volume(self, channel): """ @@ -286,8 +275,9 @@ class Mixer: if volume: return int(volume.split("%")[0]) else: - self.logger.error(f"Invalid volume for channel {channel.value} (status: '{status}'") - return 0 + msg = f"Invalid volume for channel {channel.value} (status: '{status}'" + self.logger.error(SU.red(msg)) + return -1 def channel_volume(self, channel, volume): """ @@ -307,38 +297,23 @@ class Mixer: channels = self.mixer_channels() index = channels.index(channel) except ValueError as e: - msg = f"Cannot set volume of channel '{channel}' to {volume}. Reason: {str(e)}" - self.logger.error(SU.red(msg)) + msg = f"Cannot set volume of channel '{channel}' to {volume}" + self.logger.error(SU.red(msg), e) return - try: - if len(channel) < 1: - msg = f"Cannot set volume of channel '{channel}', because no channels available" - self.logger.warning(SU.red(msg)) - else: - playout_volume = str(int(volume) / 100) # 100% volume equals 1 - message = self.connector.send_lqc_command( - self.mixer_id.value, "mixer_volume", str(index), playout_volume - ) - - if not self.connector.disable_logging: - if message.find("volume=" + str(volume) + "%"): - msg = f"Set volume of channel '{channel}' to {volume}" - self.logger.info(SU.pink(msg)) - else: - msg(f"Error setting volume of channel '{channel}': {message}") - self.logger.warning(SU.red(msg)) + # try: + if len(channel) < 1: + msg = f"Cannot set volume of channel '{channel}', because no channels available" + self.logger.error(SU.red(msg)) + else: + playout_volume = str(int(volume) / 100) # 100% volume equals 1 + args = f"{str(index)} {playout_volume}" + message = self.client.exec(self.mixer_id.value, "volume", args) + if not message.find(f"volume={volume}%"): + msg(f"Error setting volume of channel '{channel}': {message}") + self.logger.error(SU.red(msg)) - return message - except AttributeError as e: # (LQConnectionError, AttributeError): - self.connector.disable_transaction(force=True) - msg = SU.red( - "Ran into exception when setting volume of channel " - + channel - + ". Reason: " - + str(e) - ) - self.logger.error(msg) + return message # # Fading @@ -358,18 +333,14 @@ class Mixer: """ try: current_volume = self.channel_current_volume(channel) - if current_volume == volume: - self.logger.warning( - f"Current volume for channel {channel.value} is already at target volume of" - f" {volume}% SKIPPING..." - ) + msg = f"Skip fade in of {channel}: Already at target volume of {volume}%" + self.logger.info(msg) return elif current_volume > volume: - self.logger.warning( - f"Current volume {current_volume}% of channel {channel.value} exceeds target" - f" volume of {volume}% SKIPPING..." - ) + msg = f"Skip fade in of {channel}: Current volume of {current_volume}% exceeds \ + target volume of {volume}%" + self.logger.info(msg) return fade_in_time = float(self.config.get("fade_in_time")) @@ -377,36 +348,17 @@ class Mixer: if fade_in_time > 0: self.fade_in_active = True target_volume = volume - step = fade_in_time / target_volume - - msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % ( - channel, - str(step), - str(target_volume), - ) + msg = f"Fade in '{channel}' to {target_volume} ({step}s steps)" self.logger.info(SU.pink(msg)) - - # Enable logging, which might have been disabled in a previous fade-out - # TODO refactor - self.connector.disable_logging = True - # self.connector.client.disable_logging = True - for i in range(target_volume): self.channel_volume(channel.value, i + 1) time.sleep(step) - - msg = "Finished with fading-in '%s'." % channel + msg = f"Fade in of '{channel}' done" self.logger.info(SU.pink(msg)) - self.fade_in_active = False - if not self.fade_out_active: - # TODO refactor - self.connector.disable_logging = False - # self.connector.client.disable_logging = False - - except LQConnectionError as e: - self.logger.critical(str(e)) + except CoreConnectionError as e: + self.logger.critical(SU.red(e.message), e) return False return True @@ -428,41 +380,23 @@ class Mixer: volume = current_volume if current_volume == 0: - self.logger.warning( - f"Current volume for channel {channel.value} is already at target volume of" - f" 0%. SKIPPING..." - ) + msg = f"Channel {channel} already at target volume of 0%. SKIPPING..." + self.logger.info(msg) return fade_out_time = float(self.config.get("fade_out_time")) if fade_out_time > 0: step = abs(fade_out_time) / current_volume - - msg = "Starting to fading-out '%s'. Step is %ss." % (channel, str(step)) + msg = f"Start to fade out '{channel}' ({step}s step)" self.logger.info(SU.pink(msg)) - - # Disable logging... it is going to be enabled again after fadein and -out is - # finished - # TODO refactor - self.connector.disable_logging = True - # self.connector.client.disable_logging = True - for i in range(volume): self.channel_volume(channel.value, volume - i - 1) time.sleep(step) - - msg = "Finished with fading-out '%s'" % channel + msg = f"Finished with fading-out '{channel}'" self.logger.info(SU.pink(msg)) - # Enable logging again - self.fade_out_active = False - if not self.fade_in_active: - self.connector.disable_logging = False - # TODO refactor - # self.connector.client.disable_logging = False - - except LQConnectionError as e: - self.logger.critical(str(e)) + except CoreConnectionError as e: + self.logger.critical(SU.red(e.message), e) return False return True diff --git a/src/aura_engine/plugins/mailer.py b/src/aura_engine/plugins/mailer.py index eabbe20d7dea57a5227e60c3d618442b97945677..2500e6bc7c6ef59bd14212f1d1e29e0bffbf488f 100644 --- a/src/aura_engine/plugins/mailer.py +++ b/src/aura_engine/plugins/mailer.py @@ -99,12 +99,11 @@ class AuraMailer: ) self.mail.notify_admin(subject, message) - def on_critical(self, subject, message, data=None): + def on_critical(self, data): """ Call when some critical event occurs. """ - if not data: - data = "" + (subject, message, data) = data self.mail.notify_admin(subject, message + "\n\n" + str(data)) @@ -142,9 +141,8 @@ class MailService: """ if self.admin_mails_enabled == "false": - self.logger.warning( - SU.red("No admin mail sent, because doing so is disabled in engine.ini!") - ) + # msg = "No admin mail sent, because doing so is disabled in engine.ini!" + # self.logger.warning(SU.red(msg)) return False admin_mails = self.admin_mails.split() @@ -162,12 +160,8 @@ class MailService: """ if self.coordinator_mails_enabled == "false": - self.logger.warning( - SU.yellow( - "No programme coordinator mail sent, because doing so is disabled in" - " engine.ini!" - ) - ) + # msg = "No programme coordinator mail sent, because it is disabled in engine.ini!" + # self.logger.warning(SU.yellow(msg)) return False coordinator_mails = self.coordinator_mails.split() diff --git a/src/aura_engine/plugins/monitor.py b/src/aura_engine/plugins/monitor.py index f91aa4f83b680e1bdf6b35967dfc6be91c7a772e..1aee8aa046e441d0819825e5bde145412f534c26 100644 --- a/src/aura_engine/plugins/monitor.py +++ b/src/aura_engine/plugins/monitor.py @@ -99,9 +99,6 @@ class AuraMonitor: self.status["api"]["engine"] = dict() self.already_invalid = False - # Register as an engine plugin - self.engine.plugins["monitor"] = self - # Heartbeat settings self.heartbeat_running = False self.heartbeat_server = self.config.get("heartbeat_server") @@ -240,12 +237,9 @@ class AuraMonitor: liq_version = self.config.get("version_liquidsoap") self.status["engine"]["version"] = ctrl_version - - self.engine.player.connector.enable_transaction() self.status["lqs"]["version"] = {"core": core_version, "liquidsoap": liq_version} self.status["lqs"]["outputs"] = self.engine.player.mixer.mixer_outputs() self.status["lqs"]["mixer"] = self.engine.player.mixer.mixer_status() - self.engine.player.connector.disable_transaction() self.status["api"]["steering"]["url"] = self.config.get("api_steering_status") self.status["api"]["steering"]["available"] = self.validate_url_connection( self.config.get("api_steering_status") @@ -268,9 +262,7 @@ class AuraMonitor: """ Refresh the vital status info which are required for the engine to survive. """ - self.engine.player.connector.enable_transaction() self.status["lqs"]["status"] = self.engine.update_playout_state() - self.engine.player.connector.disable_transaction() self.status["lqs"]["available"] = self.status["lqs"]["status"] is not None self.status["audio_source"] = self.validate_directory(self.config.abs_audio_store_path()) diff --git a/src/aura_engine/scheduling/scheduler.py b/src/aura_engine/scheduling/scheduler.py index cfbafc02cf81608dc624377b55e6ccd8a17bfbbd..4cd57ffb37b91c8e5b583bfe501aa25f32a78488 100644 --- a/src/aura_engine/scheduling/scheduler.py +++ b/src/aura_engine/scheduling/scheduler.py @@ -26,17 +26,24 @@ import threading import time from aura_engine.base.config import AuraConfig -from aura_engine.base.exceptions import LoadSourceException, NoActiveTimeslotException from aura_engine.base.utils import SimpleUtil as SU from aura_engine.channels import ChannelType, EntryPlayState, TransitionType from aura_engine.control import EngineExecutor -from aura_engine.engine import Engine +from aura_engine.engine import Engine, LoadSourceException from aura_engine.resources import ResourceClass, ResourceUtil from aura_engine.scheduling.models import AuraDatabaseModel from aura_engine.scheduling.programme import ProgrammeService from aura_engine.scheduling.utils import TimeslotRenderer +class NoActiveTimeslotException(Exception): + """ + Exception thrown when there is no timeslot active. + """ + + pass + + class AuraScheduler(threading.Thread): """The programme scheduler. @@ -542,7 +549,7 @@ class PlayCommand(EngineExecutor): if entries[-1].status != EntryPlayState.READY: msg = f"Entries didn't reach 'ready' state during preloading (Entries: {entries_str})" - self.logger.critical(SU.red(msg)) + self.logger.warning(SU.red(msg)) def do_play(self, entries): """