# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time import logging from contextlib import suppress from threading import Thread import meta from modules.base.utils import TerminalColors, SimpleUtil as SU, EngineUtil from modules.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, LoadSourceException from modules.core.channels import ChannelType, Channel, TransitionType, LiquidsoapResponse, EntryPlayState from modules.core.startup import StartupThread from modules.core.events import EngineEventDispatcher from modules.core.liquidsoap.playerclient import LiquidSoapPlayerClient # from modules.core.liquidsoap.recorderclient import LiquidSoapRecorderClient class SoundSystem(): """ The Soundsystem Mixer Control. This class represents a virtual mixer as an abstraction layer to the actual audio hardware. It uses LiquidSoapClient, but introduces more complex commands, transactions and error handling. From one layer above it is used by `AuraScheduler` as if a virtual DJ is remote controling the mixer. """ client = None logger = None transaction = 0 channels = None scheduler = None event_dispatcher = None is_liquidsoap_running = False connection_attempts = 0 disable_logging = False fade_in_active = False fade_out_active = False active_channel = None plugins=None def __init__(self, config): """ Initializes the sound-system by establishing a Socket connection to Liquidsoap. Args: config (AuraConfig): The configuration """ self.config = config self.logger = logging.getLogger("AuraEngine") self.client = LiquidSoapPlayerClient(config, "engine.sock") # self.lqcr = LiquidSoapRecorderClient(config, "record.sock") self.is_active() # TODO Check if it makes sense to move it to the boot-phase self.plugins = dict() def start(self): """ Starts the engine. Called when the connection to the sound-system implementation has been established. """ self.event_dispatcher = EngineEventDispatcher(self, self.scheduler) # Sleep needed, because the socket is created too slowly by Liquidsoap time.sleep(1) self.mixer_initialize() self.is_liquidsoap_running = True self.event_dispatcher.on_initialized() self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap")) self.event_dispatcher.on_boot() self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__)) self.event_dispatcher.on_ready() # # MIXER : GENERAL # def mixer_initialize(self): """ - Pull all faders down to volume 0. - Initialize default channels per type """ self.enable_transaction() time.sleep(1) # TODO Check is this is still required channels = self.mixer_channels_reload() for channel in channels: self.channel_volume(channel, "0") self.disable_transaction() self.active_channel = { ChannelType.FILESYSTEM: Channel.FILESYSTEM_A, ChannelType.HTTP: Channel.HTTP_A, ChannelType.HTTPS: Channel.HTTPS_A, ChannelType.LIVE: Channel.LIVE_0 } def mixer_status(self): """ Returns the state of all mixer channels """ cnt = 0 inputstate = {} self.enable_transaction() inputs = self.mixer_channels() for channel in inputs: inputstate[channel] = self.channel_status(cnt) cnt = cnt + 1 self.disable_transaction() return inputstate def mixer_channels(self): """ Retrieves all mixer channels """ if self.channels is None or len(self.channels) == 0: self.channels = self.__send_lqc_command__(self.client, "mixer", "inputs") return self.channels def mixer_channels_selected(self): """ Retrieves all selected channels of the mixer. """ cnt = 0 activeinputs = [] self.enable_transaction() inputs = self.mixer_channels() for channel in inputs: status = self.channel_status(cnt) if "selected=true" in status: activeinputs.append(channel) cnt = cnt + 1 self.disable_transaction() return activeinputs def mixer_channels_except(self, input_type): """ Retrieves all mixer channels except the ones of the given type. """ try: activemixer_copy = self.mixer_channels().copy() activemixer_copy.remove(input_type) except ValueError as e: self.logger.error("Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e))) except AttributeError: self.logger.critical("Empty channel list") return activemixer_copy def mixer_channels_reload(self): """ Reloads all mixer channels. """ self.channels = None return self.mixer_channels() # # MIXER : CONTROL SECTION # def preroll(self, entry): """ Pre-Rolls/Pre-Loads the entry. This is required before the actual `play(..)` can happen. Be aware when using this method to queue a very short entry (shorter than ``) this may result in sitations with incorrect timing. In this case bundle multiple short entries as one queue using `preroll_playlist(self, entries)`. It's important to note, that his method is blocking until loading has finished. If this method is called asynchronously, the progress on the preloading state can be looked up in `entry.state`. Args: entries ([Entry]): An array holding filesystem entries """ entry.status = EntryPlayState.LOADING self.logger.info("Loading entry '%s'" % entry) is_ready = False # LIVE if entry.get_type() == ChannelType.LIVE: entry.channel = "linein_" + entry.source.split("line://")[1] is_ready = True else: # Choose and save the input channel entry.previous_channel, entry.channel = self.channel_swap(entry.get_type()) # PLAYLIST if entry.get_type() == ChannelType.FILESYSTEM: is_ready = self.playlist_push(entry.channel, entry.source) # STREAM elif entry.get_type() == ChannelType.HTTP or entry.get_type() == ChannelType.HTTPS: is_ready = self.stream_load_entry(entry) if is_ready: entry.status = EntryPlayState.READY self.event_dispatcher.on_queue([entry]) def preroll_group(self, entries): """ Pre-Rolls/Pre-Loads multiple filesystem entries at once. This call is required before the actual `play(..)` can happen. Due to their nature, non-filesystem entries cannot be queued using this method. In this case use `preroll(self, entry)` instead. This method also allows queuing of very short files, such as jingles. It's important to note, that his method is blocking until loading has finished. If this method is called asynchronously, the progress on the preloading state can be looked up in `entry.state`. Args: entries ([Entry]): An array holding filesystem entries """ channel = None # Validate entry type for entry in entries: if entry.get_type() != ChannelType.FILESYSTEM: raise InvalidChannelException # Determine channel channel = self.channel_swap(entries[0].get_type()) # Queue entries for entry in entries: entry.status = EntryPlayState.LOADING self.logger.info("Loading entry '%s'" % entry) # Choose and save the input channel entry.previous_channel, entry.channel = channel if self.playlist_push(entry.channel, entry.source) == True: entry.status = EntryPlayState.READY self.event_dispatcher.on_queue(entries) def play(self, entry, transition): """ Plays a new `Entry`. In case of a new schedule (or some intented, immediate transition), a clean channel is selected and transitions between old and new channel is performed. This method expects that the entry is pre-loaded using `preroll(..)` or `preroll_group(self, entries)` before being played. In case the pre-roll has happened for a group of entries, only the first entry of the group needs to be passed. Args: entry (PlaylistEntry): The audio source to be played transition (TransitionType): The type of transition to use e.g. fade-in or instant volume level. queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so; otherwise a new channel of the same type is activated """ with suppress(LQConnectionError): # Instant activation or fade-in self.enable_transaction() if transition == TransitionType.FADE: self.channel_select(entry.channel.value, True) self.fade_in(entry) else: self.channel_activate(entry.channel.value, True) self.disable_transaction() # Update active channel and type self.active_channel[entry.get_type()] = entry.channel # Dear filesystem channels, please leave the room as you would like to find it! if entry.previous_channel and entry.previous_channel in ChannelType.FILESYSTEM.channels: def clean_up(): # Wait a little, if there is some long fade-out. Note, this also means, # this channel should not be used for at least some seconds (including clearing time). time.sleep(2) self.enable_transaction() self.channel_activate(entry.previous_channel.value, False) res = self.playlist_clear(entry.previous_channel) self.logger.info("Clear Queue Response: " + res) self.disable_transaction() Thread(target=clean_up).start() def on_play(self, source): """ Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) when some entry is actually playing. Args: source (String): The URI of the media source currently being played """ self.event_dispatcher.on_play(source) def stop(self, entry, transition): """ Stops the currently playing entry. Args: entry (Entry): The entry to stop playing transition (TransitionType): The type of transition to use e.g. fade-out. """ with suppress(LQConnectionError): self.enable_transaction() if not entry.channel: self.logger.warn(SU.red("Trying to stop entry %s, but it has no channel assigned" % entry)) return if transition == TransitionType.FADE: self.fade_out(entry) else: self.channel_volume(entry.channel, 0) self.logger.info(SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry))) self.disable_transaction() self.event_dispatcher.on_stop(entry) # # MIXER : CHANNEL # def channel_swap(self, channel_type): """ Returns the currently in-active channel for a given type. For example if the currently some file on channel FILESYSTEM_A is playing, the channel FILESYSTEM B is returned for being used to queue new entries. Args: channel_type (ChannelType): The channel type such es filesystem, stream or live channel """ previous_channel = self.active_channel[channel_type] new_channel = None msg = None if channel_type == ChannelType.FILESYSTEM: if previous_channel == Channel.FILESYSTEM_A: new_channel = Channel.FILESYSTEM_B msg = "Swapped filesystem channel from A > B" else: new_channel = Channel.FILESYSTEM_A msg = "Swapped filesystem channel from B > A" elif channel_type == ChannelType.HTTP: if previous_channel == Channel.HTTP_A: new_channel = Channel.HTTP_B msg = "Swapped HTTP Stream channel from A > B" else: new_channel = Channel.HTTP_A msg = "Swapped HTTP Stream channel from B > A" elif channel_type == ChannelType.HTTPS: if previous_channel == Channel.HTTPS_A: new_channel = Channel.HTTPS_B msg = "Swapped HTTPS Stream channel from A > B" else: new_channel = Channel.HTTPS_A msg = "Swapped HTTPS Stream channel from B > A" if msg: self.logger.info(SU.pink(msg)) return (previous_channel, new_channel) def channel_status(self, channel_number): """ Retrieves the status of a channel identified by the channel number. """ return self.__send_lqc_command__(self.client, "mixer", "status", channel_number) def channel_select(self, channel, select): """ Selects/deselects some mixer channel Args: pos (Integer): The channel number select (Boolean): Select or deselect Returns: (String): Liquidsoap server response """ channels = self.mixer_channels() try: index = channels.index(channel) if len(channel) < 1: self.logger.critical("Cannot select channel. There are no channels!") else: message = self.__send_lqc_command__(self.client, "mixer", "select", index, select) return message except Exception as e: self.logger.critical("Ran into exception when selecting channel. Reason: " + str(e)) def channel_activate(self, channel, activate): """ Combined call of following to save execution time: - Select some mixer channel - Increase the volume to 100, Args: pos (Integer): The channel number activate (Boolean): Activate or deactivate Returns: (String): Liquidsoap server response """ channels = self.mixer_channels() try: index = channels.index(channel) if len(channel) < 1: self.logger.critical("Cannot activate channel. There are no channels!") else: message = self.__send_lqc_command__(self.client, "mixer", "activate", index, activate) return message except Exception as e: self.logger.critical("Ran into exception when activating channel. Reason: " + str(e)) def channel_volume(self, channel, volume): """ Set volume of a channel Args: channel (Channel): The channel volume (Integer) Volume between 0 and 100 """ channel = str(channel) try: if str(volume) == "100": channels = self.mixer_channels() index = channels.index(channel) else: channels = self.mixer_channels() index = channels.index(channel) except ValueError as e: msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e)) self.logger.error(msg) self.logger.info("Available channels: %s" % str(channels)) return try: if len(channel) < 1: msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") self.logger.warning(msg) else: message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume))) if not self.disable_logging: if message.find('volume=' + str(volume) + '%'): self.logger.info(SU.pink("Set volume of channel '%s' to %s" % (channel, str(volume)))) else: msg = SU.red("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message) self.logger.warning(msg) return message except AttributeError as e: #(LQConnectionError, AttributeError): self.disable_transaction(force=True) msg = SU.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) self.logger.error(msg) # # Channel Type - Stream # def stream_load_entry(self, entry): """ Loads the given stream entry and updates the entries's status codes. Args: entry (Entry): The entry to be pre-loaded Returns: (Boolean): `True` if successfull """ self.stream_load(entry.channel, entry.source) time.sleep(1) retry_delay = self.config.get("input_stream_retry_delay") max_retries = self.config.get("input_stream_max_retries") retries = 0 while not self.stream_is_ready(entry.channel, entry.source): self.logger.info("Loading Stream ...") if retries >= max_retries: raise LoadSourceException("Could not connect to stream while waiting for %s seconds!" % retries*retry_delay) time.sleep(retry_delay) retries += 1 return True def stream_load(self, channel, url): """ Preloads the stream URL on the given channel. Note this method is blocking some serious amount of time; hence it's worth being called asynchroneously. Args: channel (Channel): The stream channel uri (String): The stream URL Returns: (Boolean): `True` if successful """ result = None self.enable_transaction() result = self.__send_lqc_command__(self.client, channel, "stream_stop") if result != LiquidsoapResponse.SUCCESS.value: self.logger.error("%s.stop result: %s" % (channel, result)) raise LQStreamException("Error while stopping stream!") result = self.__send_lqc_command__(self.client, channel, "stream_set_url", url) if result != LiquidsoapResponse.SUCCESS.value: self.logger.error("%s.set_url result: %s" % (channel, result)) raise LQStreamException("Error while setting stream URL!") # Liquidsoap ignores commands sent without a certain timeout time.sleep(2) result = self.__send_lqc_command__(self.client, channel, "stream_start") self.logger.info("%s.start result: %s" % (channel, result)) self.disable_transaction() return result def stream_is_ready(self, channel, url): """ Checks if the stream on the given channel is ready to play. Note this method is blocking some serious amount of time even when successfull; hence it's worth being called asynchroneously. Args: channel (Channel): The stream channel uri (String): The stream URL Returns: (Boolean): `True` if successful """ result = None self.enable_transaction() result = self.__send_lqc_command__(self.client, channel, "stream_status") self.logger.info("%s.status result: %s" % (channel, result)) if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value): return False lqs_url = result.split(" ")[1] if not url == lqs_url: self.logger.error("Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url)) return False self.disable_transaction() stream_buffer = self.config.get("input_stream_buffer") self.logger.info("Ready to play stream, but wait %s seconds until the buffer is filled..." % str(stream_buffer)) time.sleep(round(float(stream_buffer))) return True # # Channel Type - Filesystem # def playlist_push(self, channel, uri): """ Adds an filesystem URI to the given `ChannelType.FILESYSTEM` channel. Args: channel (Channel): The channel to push the file to uri (String): The URI of the file Returns: (Boolean): `True` if successful """ if channel not in ChannelType.FILESYSTEM.channels: raise InvalidChannelException self.logger.info(SU.pink("playlist.push('%s', '%s'" % (channel, uri))) self.enable_transaction() audio_store = self.config.get("audiofolder") filepath = EngineUtil.uri_to_filepath(audio_store, uri) result = self.__send_lqc_command__(self.client, channel, "playlist_push", filepath) self.logger.info("%s.playlist_push result: %s" % (channel, result)) self.disable_transaction() # If successful, Liquidsoap returns a resource ID of the queued track return int(result) >= 0 def playlist_seek(self, channel, seconds_to_seek): """ Forwards the player of the given `ChannelType.FILESYSTEM` channel by (n) seconds. Args: channel (Channel): The channel to push the file to seconds_to_seeks (Float): The seconds to skip Returns: (String): Liquidsoap response """ if channel not in ChannelType.FILESYSTEM.channels: raise InvalidChannelException self.enable_transaction() result = self.__send_lqc_command__(self.client, channel, "playlist_seek", str(seconds_to_seek)) self.logger.info("%s.playlist_seek result: %s" % (channel, result)) self.disable_transaction() return result def playlist_clear(self, channel): """ Removes all tracks currently queued in the given `ChannelType.FILESYSTEM` channel. Args: channel (Channel): The channel to push the file to Returns: (String): Liquidsoap response """ if channel not in ChannelType.FILESYSTEM.channels: raise InvalidChannelException self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel)) self.enable_transaction() result = self.__send_lqc_command__(self.client, channel, "playlist_clear") self.logger.info("%s.playlist_clear result: %s" % (channel, result)) self.disable_transaction() return result # # Fading # def fade_in(self, entry): """ Performs a fade-in for the given `entry` to the `entry.volume` loudness at channel `entry.channel`. Args: entry (Entry): The entry to fade Returns: (Boolean): `True` if successful """ try: fade_in_time = float(self.config.get("fade_in_time")) if fade_in_time > 0: self.fade_in_active = True target_volume = entry.volume step = fade_in_time / target_volume msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \ (entry.channel, str(step), str(target_volume)) self.logger.info(SU.pink(msg)) # Enable logging, which might have been disabled in a previous fade-out self.disable_logging = True self.client.disable_logging = True for i in range(target_volume): self.channel_volume(entry.channel.value, i + 1) time.sleep(step) msg = "Finished with fading-in '%s'." % entry.channel self.logger.info(SU.pink(msg)) self.fade_in_active = False if not self.fade_out_active: self.disable_logging = False self.client.disable_logging = False except LQConnectionError as e: self.logger.critical(str(e)) return True def fade_out(self, entry): """ Performs a fade-out for the given `entry` at channel `entry.channel`. Args: entry (Entry): The entry to fade Returns: (Boolean): `True` if successful """ try: fade_out_time = float(self.config.get("fade_out_time")) if fade_out_time > 0: step = abs(fade_out_time) / entry.volume msg = "Starting to fading-out '%s'. Step is %ss." % (entry.channel, str(step)) self.logger.info(SU.pink(msg)) # Disable logging... it is going to be enabled again after fadein and -out is finished self.disable_logging = True self.client.disable_logging = True for i in range(entry.volume): self.channel_volume(entry.channel.value, entry.volume-i-1) time.sleep(step) msg = "Finished with fading-out '%s'" % entry.channel self.logger.info(SU.pink(msg)) # Enable logging again self.fade_out_active = False if not self.fade_in_active: self.disable_logging = False self.client.disable_logging = False except LQConnectionError as e: self.logger.critical(str(e)) return True # # Recording # # # ------------------------------------------------------------------------------------------ # # def recorder_stop(self): # self.enable_transaction() # for i in range(5): # if self.config.get("rec_" + str(i)) == "y": # self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop") # self.disable_transaction() # # ------------------------------------------------------------------------------------------ # # def recorder_start(self, num=-1): # if not self.is_liquidsoap_running: # if num==-1: # msg = "Want to start recorder, but LiquidSoap is not running" # else: # msg = "Want to start recorder " + str(num) + ", but LiquidSoap is not running" # self.logger.warning(msg) # return False # self.enable_transaction() # if num == -1: # self.recorder_start_all() # else: # self.recorder_start_one(num) # self.disable_transaction() # # ------------------------------------------------------------------------------------------ # # def recorder_start_all(self): # if not self.is_liquidsoap_running: # self.logger.warning("Want to start all recorder, but LiquidSoap is not running") # return False # self.enable_transaction() # for i in range(5): # self.recorder_start_one(i) # self.disable_transaction() # # ------------------------------------------------------------------------------------------ # # def recorder_start_one(self, num): # if not self.is_liquidsoap_running: # return False # if self.config.get("rec_" + str(num)) == "y": # returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status") # if returnvalue == "off": # self.__send_lqc_command__(self.client, "recorder", str(num), "start") # # ------------------------------------------------------------------------------------------ # # def get_recorder_status(self): # self.enable_transaction(self.client) # recorder_state = self.__send_lqc_command__(self.client, "record", "status") # self.disable_transaction(self.client) # return recorder_state # # Basic Methods # def init_player(self): """ Initializes the LiquidSoap Player after startup of the engine. Returns: (String): Message that the player is started. """ t = StartupThread(self) t.start() return "Engine Core startup done!" # ------------------------------------------------------------------------------------------ # def __send_lqc_command__(self, lqs_instance, namespace, command, *args): """ Ein Kommando an Liquidsoap senden @type lqs_instance: object @param lqs_instance: Instance of LiquidSoap Client @type namespace: string @param namespace: Namespace of function @type command: string @param command: Function name @type args: list @param args: List of parameters @rtype: string @return: Response from LiquidSoap """ try: if not self.disable_logging: if namespace == "recorder": self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args)) else: if command == "": self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + str(args)) else: self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args)) # call wanted function ... # FIXME REFACTOR all calls in a common way if command in ["playlist_push", "playlist_seek", "playlist_clear", "stream_set_url", "stream_start", "stream_stop", "stream_status"]: func = getattr(lqs_instance, command) result = func(str(namespace), *args) else: func = getattr(lqs_instance, namespace) result = func(command, *args) if not self.disable_logging: self.logger.debug("LiquidSoapCommunicator got response " + str(result)) self.connection_attempts = 0 return result except LQConnectionError as e: self.logger.error("Connection Error when sending " + str(namespace) + "." + str(command) + str(args)) if self.try_to_reconnect(): time.sleep(0.2) self.connection_attempts += 1 if self.connection_attempts < 5: # reconnect self.__open_conn(self.client) self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args)) # grab return value retval = self.__send_lqc_command__(lqs_instance, namespace, command, *args) # disconnect self.__close_conn(self.client) # return the val return retval else: if command == "": msg = "Rethrowing Exception while trying to send " + str(namespace) + str(args) else: msg = "Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args) self.logger.info(msg) self.disable_transaction(socket=self.client, force=True) raise e else: self.event_dispatcher.on_critical("Criticial Liquidsoap connection issue", \ "Could not connect to Liquidsoap (Multiple attempts)", e) raise e def is_active(self): """ Checks if Liquidsoap is running """ try: self.uptime() self.is_liquidsoap_running = True except LQConnectionError as e: self.logger.info("Liquidsoap is not running so far") self.is_liquidsoap_running = False except Exception as e: self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) self.is_liquidsoap_running = False return self.is_liquidsoap_running def engine_state(self): """ Retrieves the state of all inputs and outputs. """ state = self.__send_lqc_command__(self.client, "engine", "state") return state def liquidsoap_help(self): """ Retrieves the Liquidsoap help. """ data = self.__send_lqc_command__(self.client, "help", "") if not data: self.logger.warning("Could not get Liquidsoap's help") else: self.logger.debug("Got Liquidsoap's help") return data def version(self): """ Get the version of Liquidsoap. """ data = self.__send_lqc_command__(self.client, "version", "") self.logger.debug("Got Liquidsoap's version") return data def uptime(self): """ Retrieves the uptime of Liquidsoap. """ data = self.__send_lqc_command__(self.client, "uptime", "") self.logger.debug("Got Liquidsoap's uptime") return data # # Connection and Transaction Handling # # ------------------------------------------------------------------------------------------ # def try_to_reconnect(self): self.enable_transaction() return self.transaction > 0 # ------------------------------------------------------------------------------------------ # def enable_transaction(self, socket=None): # set socket to playout if nothing else is given if socket is None: socket = self.client self.transaction = self.transaction + 1 self.logger.debug(TerminalColors.WARNING.value + "Enabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value) if self.transaction > 1: return try: self.__open_conn(socket) except FileNotFoundError: self.disable_transaction(socket=socket, force=True) subject = "CRITICAL Exception when connecting to Liquidsoap" msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?" self.logger.critical(SU.red(msg)) self.event_dispatcher.on_critical(subject, msg, None) # ------------------------------------------------------------------------------------------ # def disable_transaction(self, socket=None, force=False): if not force: # nothing to disable if self.transaction == 0: return # decrease transaction counter self.transaction = self.transaction - 1 # debug msg self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value) # return if connection is still needed if self.transaction > 0: return else: self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value) # close conn and set transactioncounter to 0 self.__close_conn(socket) self.transaction = 0 # ------------------------------------------------------------------------------------------ # def __open_conn(self, socket): # already connected if self.transaction > 1: return self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value) # try to connect socket.connect() # ------------------------------------------------------------------------------------------ # def __close_conn(self, socket): # set socket to playout if socket is None: socket = self.client # do not disconnect if a transaction is going on if self.transaction > 0: return # say bye socket.byebye() # debug msg self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value)