Skip to content
Snippets Groups Projects
Commit 1afbe7b7 authored by David Trattnig's avatar David Trattnig
Browse files

Dedicated mixer class. #44

parent 55725a8a
No related branches found
No related tags found
Loading
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from enum import Enum
from modules.base.exceptions import LQConnectionError
from modules.base.utils import SimpleUtil as SU
class MixerType(Enum):
"""
Types of mixers mapped to the Liquidsoap mixer ids.
"""
MAIN = "mixer"
FALLBACK = "mixer_fallback"
class Mixer():
"""
A virtual mixer.
"""
config = None
logger = None
connector = None
mixer_id = None
channels = None
fade_in_active = None
fade_out_active = None
def __init__(self, config, mixer_id, connector):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.mixer_id = mixer_id
self.fade_in_active = None
self.fade_out_active = None
self.connector = connector
self.mixer_initialize()
#
# Mixer
#
def mixer_initialize(self):
"""
- Pull all faders down to volume 0.
- Initialize default channels per type
"""
self.connector.enable_transaction()
time.sleep(1) # TODO Check is this is still required
channels = self.mixer_channels_reload()
for channel in channels:
self.channel_volume(channel, "0")
self.connector.disable_transaction()
def mixer_status(self):
"""
Returns the state of all mixer channels
"""
cnt = 0
inputstate = {}
self.connector.enable_transaction()
inputs = self.mixer_channels()
for channel in inputs:
inputstate[channel] = self.channel_status(cnt)
cnt = cnt + 1
self.connector.disable_transaction()
return inputstate
def mixer_channels(self):
"""
Retrieves all mixer channels
"""
if self.channels is None or len(self.channels) == 0:
self.channels = self.connector.send_lqc_command(self.mixer_id.value, "mixer_inputs")
return self.channels
def mixer_channels_selected(self):
"""
Retrieves all selected channels of the mixer.
"""
cnt = 0
activeinputs = []
self.connector.enable_transaction()
inputs = self.mixer_channels()
for channel in inputs:
status = self.channel_status(cnt)
if "selected=true" in status:
activeinputs.append(channel)
cnt = cnt + 1
self.connector.disable_transaction()
return activeinputs
def mixer_channels_except(self, input_type):
"""
Retrieves all mixer channels except the ones of the given type.
"""
try:
activemixer_copy = self.mixer_channels().copy()
activemixer_copy.remove(input_type)
except ValueError as e:
self.logger.error("Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e)))
except AttributeError:
self.logger.critical("Empty channel list")
return activemixer_copy
def mixer_channels_reload(self):
"""
Reloads all mixer channels.
"""
self.channels = None
return self.mixer_channels()
def channel_status(self, channel_number):
"""
Retrieves the status of a channel identified by the channel number.
"""
return self.connector.send_lqc_command(self.mixer_id.value, "mixer_status", channel_number)
#
# Channel
#
def channel_select(self, channel, select):
"""
Selects/deselects some mixer channel
Args:
pos (Integer): The channel number
select (Boolean): Select or deselect
Returns:
(String): Liquidsoap server response
"""
channels = self.mixer_channels()
try:
index = channels.index(channel)
if len(channel) < 1:
self.logger.critical("Cannot select channel. There are no channels!")
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_select", index, select)
return message
except Exception as e:
self.logger.critical("Ran into exception when selecting channel. Reason: " + str(e))
def channel_activate(self, channel, activate):
"""
Combined call of following to save execution time:
- Select some mixer channel
- Increase the volume to 100,
Args:
pos (Integer): The channel number
activate (Boolean): Activate or deactivate
Returns:
(String): Liquidsoap server response
"""
channels = self.mixer_channels()
try:
index = channels.index(channel)
if len(channel) < 1:
self.logger.critical("Cannot activate channel. There are no channels!")
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_activate", index, activate)
return message
except Exception as e:
self.logger.critical("Ran into exception when activating channel. Reason: " + str(e))
def channel_volume(self, channel, volume):
"""
Set volume of a channel
Args:
channel (Channel): The channel
volume (Integer) Volume between 0 and 100
"""
channel = str(channel)
try:
if str(volume) == "100":
channels = self.mixer_channels()
index = channels.index(channel)
else:
channels = self.mixer_channels()
index = channels.index(channel)
except ValueError as e:
msg = f"Cannot set volume of channel '{channel}' to {str(volume)}. Reason: {str(e)}"
self.logger.error(SU.red(msg))
return
try:
if len(channel) < 1:
msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.")
self.logger.warning(msg)
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_volume", str(index), str(int(volume)))
if not self.connector.disable_logging:
if message.find('volume=' + str(volume) + '%'):
self.logger.info(SU.pink("Set volume of channel '%s' to %s" % (channel, str(volume))))
else:
msg = SU.red("Setting volume of channel " + channel + " has gone wrong! Liquidsoap message: " + message)
self.logger.warning(msg)
return message
except AttributeError as e: #(LQConnectionError, AttributeError):
self.connector.disable_transaction(force=True)
msg = SU.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e))
self.logger.error(msg)
#
# Fading
#
def fade_in(self, channel, volume):
"""
Performs a fade-in for the given channel.
Args:
channel (Channel): The channel to fade
volume (Integer): The target volume
Returns:
(Boolean): `True` if successful
"""
try:
fade_in_time = float(self.config.get("fade_in_time"))
if fade_in_time > 0:
self.fade_in_active = True
target_volume = volume
step = fade_in_time / target_volume
msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \
(channel, str(step), str(target_volume))
self.logger.info(SU.pink(msg))
# Enable logging, which might have been disabled in a previous fade-out
self.connector.disable_logging = True
self.connector.client.disable_logging = True
for i in range(target_volume):
self.channel_volume(channel.value, i + 1)
time.sleep(step)
msg = "Finished with fading-in '%s'." % channel
self.logger.info(SU.pink(msg))
self.fade_in_active = False
if not self.fade_out_active:
self.connector.disable_logging = False
self.connector.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return False
return True
def fade_out(self, channel, volume):
"""
Performs a fade-out for the given channel.
Args:
channel (Channel): The channel to fade
volume (Integer): The start volume
Returns:
(Boolean): `True` if successful
"""
try:
fade_out_time = float(self.config.get("fade_out_time"))
if fade_out_time > 0:
step = abs(fade_out_time) / volume
msg = "Starting to fading-out '%s'. Step is %ss." % (channel, str(step))
self.logger.info(SU.pink(msg))
# Disable logging... it is going to be enabled again after fadein and -out is finished
self.connector.disable_logging = True
self.connector.client.disable_logging = True
for i in range(volume):
self.channel_volume(channel.value, volume-i-1)
time.sleep(step)
msg = "Finished with fading-out '%s'" % channel
self.logger.info(SU.pink(msg))
# Enable logging again
self.fade_out_active = False
if not self.fade_in_active:
self.connector.disable_logging = False
self.connector.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return False
return True
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment