From 560daecd3ed720dc004c4f93b27ae585cc1772c1 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Fri, 23 Oct 2020 14:57:37 +0200 Subject: [PATCH] Extracted connection handling. #44 --- modules/core/liquidsoap/connector.py | 220 +++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 modules/core/liquidsoap/connector.py diff --git a/modules/core/liquidsoap/connector.py b/modules/core/liquidsoap/connector.py new file mode 100644 index 00000000..2ed4b344 --- /dev/null +++ b/modules/core/liquidsoap/connector.py @@ -0,0 +1,220 @@ +# +# Aura Engine (https://gitlab.servus.at/aura/engine) +# +# Copyright (C) 2017-2020 - The Aura Engine Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import time + +from modules.base.utils import TerminalColors, SimpleUtil as SU +from modules.base.exceptions import LQConnectionError +from modules.core.liquidsoap.playerclient import LiquidSoapPlayerClient + + + +class PlayerConnector(): + """ + Establishes a Socket connection to Liquidsoap. + """ + client = None + logger = None + transaction = 0 + connection_attempts = 0 + disable_logging = False + event_dispatcher = None + + + + def __init__(self, config, event_dispatcher): + """ + Constructor + + Args: + config (AuraConfig): The configuration + """ + self.config = config + self.logger = logging.getLogger("AuraEngine") + self.client = LiquidSoapPlayerClient(config, "engine.sock") + self.event_dispatcher = event_dispatcher + + + + def send_lqc_command(self, namespace, command, *args): + """ + Ein Kommando an Liquidsoap senden + @type lqs_instance: object + @param lqs_instance: Instance of LiquidSoap Client + @type namespace: string + @param namespace: Namespace of function + @type command: string + @param command: Function name + @type args: list + @param args: List of parameters + @rtype: string + @return: Response from LiquidSoap + """ + lqs_instance = self.client + try: + if not self.disable_logging: + if namespace == "recorder": + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args)) + else: + if command == "": + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + str(args)) + else: + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args)) + + # call wanted function ... + + # FIXME REFACTOR all calls in a common way + if command in [ + "queue_push", + "queue_seek", + "queue_clear", + "playlist_uri_set", + "playlist_uri_clear", + "stream_set_ur", + "stream_start", + "stream_stop", + "stream_status", + ]: + + func = getattr(lqs_instance, command) + result = func(str(namespace), *args) + + elif namespace == "mixer" or namespace == "mixer_fallback": + func = getattr(lqs_instance, command) + result = func(str(namespace), *args) + else: + func = getattr(lqs_instance, namespace) + result = func(command, *args) + + + if not self.disable_logging: + self.logger.debug("LiquidSoapCommunicator got response " + str(result)) + + self.connection_attempts = 0 + + return result + + except LQConnectionError as e: + self.logger.error("Connection Error when sending " + str(namespace) + "." + str(command) + str(args)) + if self.try_to_reconnect(): + time.sleep(0.2) + self.connection_attempts += 1 + if self.connection_attempts < 5: + # reconnect + self.__open_conn(self.client) + self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args)) + # grab return value + retval = self.send_lqc_command(namespace, command, *args) + # disconnect + self.__close_conn(self.client) + # return the val + return retval + else: + if command == "": + msg = "Rethrowing Exception while trying to send " + str(namespace) + str(args) + else: + msg = "Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args) + + self.logger.info(msg) + self.disable_transaction(socket=self.client, force=True) + raise e + else: + self.event_dispatcher.on_critical("Criticial Liquidsoap connection issue", \ + "Could not connect to Liquidsoap (Multiple attempts)", e) + raise e + + + # ------------------------------------------------------------------------------------------ # + def try_to_reconnect(self): + self.enable_transaction() + return self.transaction > 0 + + # ------------------------------------------------------------------------------------------ # + def enable_transaction(self, socket=None): + # set socket to playout if nothing else is given + if socket is None: + socket = self.client + + self.transaction = self.transaction + 1 + + self.logger.debug(TerminalColors.WARNING.value + "Enabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value) + + if self.transaction > 1: + return + + try: + self.__open_conn(socket) + except FileNotFoundError: + self.disable_transaction(socket=socket, force=True) + subject = "CRITICAL Exception when connecting to Liquidsoap" + msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?" + self.logger.critical(SU.red(msg)) + self.event_dispatcher.on_critical(subject, msg, None) + + + # ------------------------------------------------------------------------------------------ # + def disable_transaction(self, socket=None, force=False): + if not force: + # nothing to disable + if self.transaction == 0: + return + + # decrease transaction counter + self.transaction = self.transaction - 1 + + # debug msg + self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value) + + # return if connection is still needed + if self.transaction > 0: + return + else: + self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value) + + # close conn and set transactioncounter to 0 + self.__close_conn(socket) + self.transaction = 0 + + # ------------------------------------------------------------------------------------------ # + def __open_conn(self, socket): + # already connected + if self.transaction > 1: + return + + self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value) + + # try to connect + socket.connect() + + # ------------------------------------------------------------------------------------------ # + def __close_conn(self, socket): + # set socket to playout + if socket is None: + socket = self.client + + # do not disconnect if a transaction is going on + if self.transaction > 0: + return + + # say bye + socket.byebye() + + # debug msg + self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value) -- GitLab