# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging import time from enum import Enum from src.base.exceptions import LQConnectionError from src.base.utils import SimpleUtil as SU class MixerType(Enum): """ Types of mixers mapped to the Liquidsoap mixer ids. """ MAIN = "mixer" # FALLBACK = "mixer_fallback" class MixerUtil: """ Little helpers for the mixer. """ @staticmethod def channel_status_dict(status): """ Transforms a channel status string to a dictionary. """ s = {} pairs = status.split(" ") for pair in pairs: kv = pair.split("=") s[kv[0]] = kv[1] return s class Mixer(): """ A virtual mixer. """ config = None logger = None connector = None mixer_id = None channels = None fade_in_active = None fade_out_active = None def __init__(self, config, mixer_id, connector): """ Constructor Args: config (AuraConfig): The configuration """ self.config = config self.logger = logging.getLogger("AuraEngine") self.mixer_id = mixer_id self.fade_in_active = None self.fade_out_active = None self.connector = connector self.mixer_initialize() # # Mixer # def mixer_initialize(self): """ - Pull all faders down to volume 0. - Initialize default channels per type """ self.connector.enable_transaction() time.sleep(1) # TODO Check is this is still required channels = self.mixer_channels_reload() for channel in channels: self.channel_volume(channel, "0") self.connector.disable_transaction() def mixer_status(self): """ Returns the state of all mixer channels """ cnt = 0 inputstate = {} self.connector.enable_transaction() inputs = self.mixer_channels() for channel in inputs: inputstate[channel] = self.channel_status(cnt) cnt = cnt + 1 self.connector.disable_transaction() return inputstate def mixer_channels(self): """ Retrieves all mixer channels """ if self.channels is None or len(self.channels) == 0: self.channels = self.connector.send_lqc_command(self.mixer_id.value, "mixer_inputs") return self.channels def mixer_channels_selected(self): """ Retrieves all selected channels of the mixer. """ cnt = 0 activeinputs = [] self.connector.enable_transaction() inputs = self.mixer_channels() for channel in inputs: status = self.channel_status(cnt) if "selected=true" in status: activeinputs.append(channel) cnt = cnt + 1 self.connector.disable_transaction() return activeinputs def mixer_channels_except(self, input_type): """ Retrieves all mixer channels except the ones of the given type. """ try: activemixer_copy = self.mixer_channels().copy() activemixer_copy.remove(input_type) except ValueError as e: self.logger.error("Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e))) except AttributeError: self.logger.critical("Empty channel list") return activemixer_copy def mixer_channels_reload(self): """ Reloads all mixer channels. """ self.channels = None return self.mixer_channels() # # Channel # def channel_number(self, channel): """ Returns the channel number for the given channel ID. Args: channel (Channel): The channel Returns: (Integer): The channel number """ channels = self.mixer_channels() index = channels.index(channel) if index < 0: self.logger.critical(f"There's no valid channel number for channel ID '{channel.value}'") return None return index def channel_status(self, channel_number): """ Retrieves the status of a channel identified by the channel number. Args: channel_number (Integer): The channel number Returns: (String): Channel status info as a String """ return self.connector.send_lqc_command(self.mixer_id.value, "mixer_status", channel_number) def channel_select(self, channel, select): """ Selects/deselects some mixer channel Args: pos (Integer): The channel number select (Boolean): Select or deselect Returns: (String): Liquidsoap server response """ channels = self.mixer_channels() try: index = channels.index(channel) if len(channel) < 1: self.logger.critical("Cannot select channel. There are no channels!") else: message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_select", index, select) return message except Exception as e: self.logger.critical("Ran into exception when selecting channel. Reason: " + str(e)) def channel_activate(self, channel, activate): """ Combined call of following to save execution time: - Select some mixer channel - Increase the volume to 100, Args: pos (Integer): The channel number activate (Boolean): Activate or deactivate Returns: (String): Liquidsoap server response """ channels = self.mixer_channels() try: index = channels.index(channel) if len(channel) < 1: self.logger.critical("Cannot activate channel. There are no channels!") else: message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_activate", index, activate) return message except Exception as e: self.logger.critical("Ran into exception when activating channel. Reason: " + str(e)) def channel_current_volume(self, channel): """ Retrieves the current volume of the channel. """ channel_number = self.channel_number(channel.value) status = self.channel_status(channel_number) channel_status = MixerUtil.channel_status_dict(status) volume = channel_status.get("volume") if volume: return int(volume.split("%")[0]) else: self.logger.error(f"Invalid volume for channel {channel.value} (status: '{status}'") return 0 def channel_volume(self, channel, volume): """ Set volume of a channel Args: channel (Channel): The channel volume (Integer) Volume between 0 and 100 """ channel = str(channel) try: if str(volume) == "100": channels = self.mixer_channels() index = channels.index(channel) else: channels = self.mixer_channels() index = channels.index(channel) except ValueError as e: msg = f"Cannot set volume of channel '{channel}' to {str(volume)}. Reason: {str(e)}" self.logger.error(SU.red(msg)) return try: if len(channel) < 1: msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") self.logger.warning(msg) else: message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_volume", str(index), str(int(volume))) if not self.connector.disable_logging: if message.find('volume=' + str(volume) + '%'): self.logger.info(SU.pink("Set volume of channel '%s' to %s" % (channel, str(volume)))) else: msg = SU.red("Setting volume of channel " + channel + " has gone wrong! Liquidsoap message: " + message) self.logger.warning(msg) return message except AttributeError as e: #(LQConnectionError, AttributeError): self.connector.disable_transaction(force=True) msg = SU.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) self.logger.error(msg) # # Fading # def fade_in(self, channel, volume): """ Performs a fade-in for the given channel. Args: channel (Channel): The channel to fade volume (Integer): The target volume Returns: (Boolean): `True` if successful """ try: current_volume = self.channel_current_volume(channel) if current_volume == volume: self.logger.warning(f"Current volume for channel {channel.value} is already at target volume of {volume}% SKIPPING...") return elif current_volume > volume: self.logger.warning(f"Current volume {current_volume}% of channel {channel.value} exceeds target volume of {volume}% SKIPPING...") return fade_in_time = float(self.config.get("fade_in_time")) if fade_in_time > 0: self.fade_in_active = True target_volume = volume step = fade_in_time / target_volume msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \ (channel, str(step), str(target_volume)) self.logger.info(SU.pink(msg)) # Enable logging, which might have been disabled in a previous fade-out self.connector.disable_logging = True self.connector.client.disable_logging = True for i in range(target_volume): self.channel_volume(channel.value, i + 1) time.sleep(step) msg = "Finished with fading-in '%s'." % channel self.logger.info(SU.pink(msg)) self.fade_in_active = False if not self.fade_out_active: self.connector.disable_logging = False self.connector.client.disable_logging = False except LQConnectionError as e: self.logger.critical(str(e)) return False return True def fade_out(self, channel, volume=None): """ Performs a fade-out for the given channel starting at it's current volume. Args: channel (Channel): The channel to fade volume (Integer): The start volume Returns: (Boolean): `True` if successful """ try: current_volume = self.channel_current_volume(channel) if not volume: volume = current_volume if current_volume == 0: self.logger.warning(f"Current volume for channel {channel.value} is already at target volume of 0%. SKIPPING...") return fade_out_time = float(self.config.get("fade_out_time")) if fade_out_time > 0: step = abs(fade_out_time) / current_volume msg = "Starting to fading-out '%s'. Step is %ss." % (channel, str(step)) self.logger.info(SU.pink(msg)) # Disable logging... it is going to be enabled again after fadein and -out is finished self.connector.disable_logging = True self.connector.client.disable_logging = True for i in range(volume): self.channel_volume(channel.value, volume-i-1) time.sleep(step) msg = "Finished with fading-out '%s'" % channel self.logger.info(SU.pink(msg)) # Enable logging again self.fade_out_active = False if not self.fade_in_active: self.connector.disable_logging = False self.connector.client.disable_logging = False except LQConnectionError as e: self.logger.critical(str(e)) return False return True