Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Show changes
Showing
with 3927 additions and 0 deletions
#!/bin/bash
# Check if databases are already set-up
if test -f "$LOCKFILE_DB"; then
echo "Aura Engine Databases are already existing! Skipping..."
else
# Create random password
PASS_ENGINE="$(openssl rand -base64 24)"
# Create databases and users
echo "--- SETTING UP DATABASE AND USERS ---"
echo "Please enter the MySQL/MariaDB root password!"
stty -echo
printf "Password: "
read rootpasswd
stty echo
printf "\n"
echo "---"
echo "Creating database for Aura Engine..."
mysql -uroot -p${rootpasswd} -e "CREATE DATABASE aura_engine CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
mysql -uroot -p${rootpasswd} -e "CREATE USER 'aura'@'localhost' IDENTIFIED BY '${PASS_ENGINE}';"
mysql -uroot -p${rootpasswd} -e "GRANT ALL PRIVILEGES ON aura_engine.* TO 'aura'@'localhost';"
mysql -uroot -p${rootpasswd} -e "FLUSH PRIVILEGES;"
echo "Done."
echo
echo
echo "Please note your database credentials for the next configuration steps:"
echo "-----------------------------------------------------------------------"
echo " Database: 'aura_engine'"
echo " User: 'aura'"
echo " Password: '${PASS_ENGINE}'"
echo "-----------------------------------------------------------------------"
echo
fi
\ No newline at end of file
#!/bin/bash
#
# Setup Database
#
# Set LOCK file location
LOCKFILE_DB=.engine.install-db.lock
# Check if databases are already set-up
if test -f "$LOCKFILE_DB"; then
echo "Aura Engine Databases are already existing! Skipping..."
else
echo "Setting up database ..."
echo
echo "Which database system do you want to use? (Press '1' or '2')"
echo " [1] MariaDB"
echo " [2] Other / Manually"
echo
while true; do
read -rsn1 input
if [ "$input" = "1" ]; then
echo "Creating DB for MariaDB ..."
bash scripts/setup-db-mariadb.sh
break
fi
if [ "$input" = "2" ]; then
echo "Manual database setup selected."
break
fi
done
# Create lockfile to avoid accidential re-creation of the database
touch $LOCKFILE_DB
fi
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import sys
import logging
from pathlib import Path
from configparser import ConfigParser
class AuraConfig:
"""
AuraConfig Class
Holds the Engine Configuration as in the file `engine.ini`.
"""
instance = None
ini_path = ""
logger = None
def __init__(self, ini_path="/etc/aura/engine.ini"):
"""
Initializes the configuration, defaults to `/etc/aura/engine.ini`.
If this file doesn't exist it uses `./config/engine.ini` from
the project directory.
Args:
ini_path(String): The path to the configuration file `engine.ini`
"""
self.logger = logging.getLogger("AuraEngine")
config_file = Path(ini_path)
if not config_file.is_file():
ini_path = "%s/config/engine.ini" % Path(__file__).parent.parent.parent.absolute()
self.ini_path = ini_path
self.load_config()
AuraConfig.instance = self
# Defaults
self.set("config_dir", os.path.dirname(ini_path))
self.set("install_dir", os.path.realpath(__file__ + "../../../.."))
self.set("use_test_data", False) # TODO Still needed?
@staticmethod
def config():
"""
Retrieves the global instances of the configuration.
"""
return AuraConfig.instance
def set(self, key, value):
"""
Setter for some specific config property.
Args:
key (String): key
default (*): value
"""
try:
self.__dict__[key] = int(value)
except:
self.__dict__[key] = str(value)
def get(self, key, default=None):
"""
Getter for some specific config property.
Args:
key (String): key
default (*): value
"""
if key not in self.__dict__:
if default:
self.set(key, default)
else:
self.logger.warning("Key " + key + " not found in configfile " + self.ini_path + "!")
return None
if key == "loglevel":
loglvl = self.__dict__[key]
if loglvl == "debug":
return logging.DEBUG
elif loglvl == "info":
return logging.INFO
elif loglvl == "warning":
return logging.WARNING
elif loglvl == "error":
return logging.ERROR
else:
return logging.CRITICAL
if key == "debug":
return self.__dict__[key].count("y")
return self.__dict__[key]
def get_database_uri(self):
"""
Retrieves the database connection string.
"""
db_name = self.get("db_name")
db_user = self.get("db_user")
db_pass = str(self.get("db_pass"))
db_host = self.get("db_host")
db_charset = self.get("db_charset", "utf8")
return "mysql://" + db_user + ":" + db_pass + "@" + db_host + "/" + db_name + "?charset=" + db_charset
def load_config(self):
"""
Set config defaults and load settings from file
"""
if not os.path.isfile(self.ini_path):
self.logger.critical(self.ini_path + " not found :(")
sys.exit(1)
# Read the file
f = open(self.ini_path, 'r')
ini_str = f.read()
f.close()
# Parse the values
config_parser = ConfigParser()
try:
config_parser.read_string(ini_str)
except Exception as e:
self.logger.critical("Cannot read " + self.ini_path + "! Reason: " + str(e))
sys.exit(0)
for section in config_parser.sections():
for key, value in config_parser.items(section):
v = config_parser.get(section, key).replace('"', '').strip()
self.set(key, v)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Scheduler Exceptions
class NoProgrammeLoadedException(Exception):
pass
class NoActiveScheduleException(Exception):
pass
# Soundsystem and Mixer Exceptions
class LoadSourceException(Exception):
pass
class InvalidChannelException(Exception):
pass
class PlaylistException(Exception):
pass
class NoActiveEntryException(Exception):
pass
# Liquidsoap Execeptions
class LQConnectionError(Exception):
pass
class LQStreamException(Exception):
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.base.config import AuraConfig
class AuraLogger():
"""
AuraLogger Class
Logger for all Aura Engine components. The default
logger is `AuraEngine`. Other loggers are defined
by passing a custom name on instantiation.
The logger respects the log-level as defined in the
engine's configuration file.
"""
config = None
logger = None
def __init__(self, config, name="AuraEngine"):
"""
Constructor to create a new logger defined by
the passed name.
Args:
name (String): The name of the logger
"""
self.config = config
self.__create_logger(name)
def __create_logger(self, name):
"""
Creates the logger instance for the given name.
Args:
name (String): The name of the logger
"""
lvl = self.config.get("loglevel")
# create logger
self.logger = logging.getLogger(name)
self.logger.setLevel(lvl)
if not self.logger.hasHandlers():
# create file handler for logger
file_handler = logging.FileHandler(self.config.get("logdir") + "/"+name+".log")
file_handler.setLevel(lvl)
# create stream handler for logger
stream_handler = logging.StreamHandler()
stream_handler.setLevel(lvl)
# set format of log
datepart = "%(asctime)s:%(name)s:%(levelname)s"
message = " - %(message)s - "
filepart = "[%(filename)s:%(lineno)s-%(funcName)s()]"
formatter = logging.Formatter(datepart + message + filepart)
# set log of handlers
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# add handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(stream_handler)
self.logger.critical("ADDED HANDLERS")
else:
self.logger.critical("REUSED LOGGER")
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import smtplib
from email.message import EmailMessage
class MailingException(Exception):
"""
Thrown when some mail cannot be sent.
"""
class AuraMailer():
"""
Service to send emails to Aura administrators.
"""
config = None
def __init__(self, config):
"""
Constructor to initialize service with Aura `config`.
Args:
config (AuraConfig): The configuration with the mail server details
"""
self.config = config
self.admin_mails = config.get("admin_mail")
#
# PUBLIC METHODS
#
def send_admin_mail(self, subject, body):
"""
Sends an email to the administrator as defined in the configuration.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
admin_mails = self.admin_mails.split()
for mail_to in admin_mails:
self.__send(mail_to, subject, body)
#
# PRIVATE METHODS
#
def __send(self, mail_to, subject, body):
"""
Sends an email to the given address.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
# read config
mail_server = self.config.get("mail_server")
mail_port = self.config.get("mail_server_port")
mail_user = self.config.get("mail_user")
mail_pass = self.config.get("mail_pass")
from_mail = self.config.get("from_mail")
# check settings
if mail_server == "":
raise MailingException("Mail Server not set")
if mail_port == "":
raise MailingException("Mailserver Port not set")
if mail_user == "":
raise MailingException("Mail user not set")
if mail_pass == "":
raise MailingException("No Password for mailing set")
if from_mail == "":
raise MailingException("From Mail not set")
# stuff the message together and ...
msg = EmailMessage()
msg.set_content(body)
mailsubject_prefix = self.config.get("mailsubject_prefix")
if mailsubject_prefix == "":
msg["Subject"] = subject
else:
msg["Subject"] = mailsubject_prefix + " " + subject
msg["From"] = from_mail
msg["To"] = mail_to
# ... send the mail
try:
server = smtplib.SMTP(mail_server, int(mail_port))
server.starttls()
server.login(mail_user, mail_pass)
server.send_message(msg)
server.quit()
except Exception as e:
raise MailingException(str(e))
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
from enum import Enum
class SimpleUtil:
"""
A container class for simple utility methods.
"""
@staticmethod
def clean_dictionary(data):
"""
Delete keys with the value `None` in a dictionary, recursively.
This alters the input so you may wish to `copy` the dict first.
Args:
data (dict): The dicationary
Returns:
(dict):
"""
for key, value in list(data.items()):
if value is None:
del data[key]
elif isinstance(value, dict):
SimpleUtil.clean_dictionary(value)
return data
@staticmethod
def fmt_time(timestamp):
"""
Formats a UNIX timestamp to a String displaying time in the format '%H:%M:%S'.
Args:
(Integer) timestamp: Unix epoch
Returns:
(String): Displaying the time
"""
return datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
@staticmethod
def nano_to_seconds(nanoseconds):
"""
Converts nano-seconds to senconds
Args:
(Integer) nanoseconds
Returns:
(Float): seconds
"""
return float(nanoseconds / 1000000000)
@staticmethod
def seconds_to_nano(seconds):
"""
Converts senconds to nano-seconds
Args:
(Integer) seconds
Returns:
(Float): nanoseconds
"""
return int(seconds * 1000000000)
@staticmethod
def timestamp(date_and_time=None):
"""
Transforms the given `datetime` into a UNIX epoch timestamp.
If no parameter is passed, the current timestamp is returned.
Args:
(Datetime) date_and_time: the date and time to transform.
Returns:
(Integer): timestamp in seconds.
"""
if not date_and_time:
date_and_time = datetime.datetime.now()
return time.mktime(date_and_time.timetuple())
@staticmethod
def strike(text):
"""
Creates a strikethrough version of the given text.
Args:
(String) text: the text to strike.
Returns:
(String): the striked text.
"""
result = ""
for c in str(text):
result += c + TerminalColors.STRIKE.value
return result
@staticmethod
def bold(text):
"""
Creates a bold version of the given text.
"""
return TerminalColors.BOLD.value + text + TerminalColors.ENDC.value
@staticmethod
def underline(text):
"""
Creates a underlined version of the given text.
"""
return TerminalColors.UNDERLINE.value + text + TerminalColors.ENDC.value
@staticmethod
def blue(text):
"""
Creates a blue version of the given text.
"""
return TerminalColors.BLUE.value + text + TerminalColors.ENDC.value
@staticmethod
def red(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.RED.value + text + TerminalColors.ENDC.value
@staticmethod
def pink(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.PINK.value + text + TerminalColors.ENDC.value
@staticmethod
def yellow(text):
"""
Creates a yellow version of the given text.
"""
return TerminalColors.YELLOW.value + text + TerminalColors.ENDC.value
@staticmethod
def green(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.GREEN.value + text + TerminalColors.ENDC.value
@staticmethod
def cyan(text):
"""
Creates a cyan version of the given text.
"""
return TerminalColors.CYAN.value + text + TerminalColors.ENDC.value
class TerminalColors(Enum):
"""
Colors for formatting terminal output.
"""
HEADER = "\033[95m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
PINK = "\033[35m"
CYAN = "\033[36m"
WARNING = "\033[31m"
FAIL = "\033[41m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
STRIKE = "\u0336"
ENDC = "\033[0m"
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
from src.base.utils import SimpleUtil as SU
from src.core.resources import ResourceType
class TransitionType(Enum):
"""
Types of fade-in and fade-out transition.
"""
INSTANT = "instant"
FADE = "fade"
class Channel(Enum):
"""
Channel name mappings to the Liqidsoap channel IDs.
"""
QUEUE_A = "in_filesystem_0"
QUEUE_B = "in_filesystem_1"
FALLBACK_QUEUE_A = "in_fallback_scheduled_0"
FALLBACK_QUEUE_B = "in_fallback_scheduled_1"
HTTP_A = "in_http_0"
HTTP_B = "in_http_1"
HTTPS_A = "in_https_0"
HTTPS_B = "in_https_1"
LIVE_0 = "linein_0"
LIVE_1 = "linein_1"
LIVE_2 = "linein_2"
LIVE_3 = "linein_3"
LIVE_4 = "linein_4"
def __str__(self):
return str(self.value)
class ChannelType(Enum):
"""
Engine channel types mapped to `Entry` source types.
"""
QUEUE = {
"id": "fs",
"numeric": 0,
"channels": [Channel.QUEUE_A, Channel.QUEUE_B]
}
FALLBACK_QUEUE = {
"id": "fallback_queue",
"numeric": 0,
"channels": [Channel.FALLBACK_QUEUE_A, Channel.FALLBACK_QUEUE_B]
}
HTTP = {
"id": "http",
"numeric": 1,
"channels": [Channel.HTTP_A, Channel.HTTP_B]
}
HTTPS = {
"id": "https",
"numeric": 2,
"channels": [Channel.HTTPS_A, Channel.HTTPS_B]
}
LIVE = {
"id": "live",
"numeric": 3,
"channels": [
Channel.LIVE_0,
Channel.LIVE_1,
Channel.LIVE_2,
Channel.LIVE_3,
Channel.LIVE_4
]
}
@property
def channels(self):
return self.value["channels"]
@property
def numeric(self):
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class EntryPlayState(Enum):
UNKNOWN = "unknown"
LOADING = "loading"
READY = "ready_to_play"
PLAYING = "playing"
FINISHED = "finished"
class LiquidsoapResponse(Enum):
SUCCESS = "Done"
STREAM_STATUS_POLLING = "polling"
STREAM_STATUS_STOPPED = "stopped"
STREAM_STATUS_CONNECTED = "connected"
class ChannelRouter():
"""
Wires source types with channels and channel-types.
"""
config = None
logger = None
resource_mapping = None
active_channels = None
def __init__(self, config, logger):
"""
Constructor
Args:
config (AuraConfig): The configuration
logger (Logger): The logger
"""
self.config = config
self.logger = logger
self.resource_mapping = {
ResourceType.FILE: ChannelType.QUEUE,
ResourceType.STREAM_HTTP: ChannelType.HTTP,
ResourceType.STREAM_HTTPS: ChannelType.HTTPS,
ResourceType.LINE: ChannelType.LIVE,
ResourceType.PLAYLIST: ChannelType.QUEUE,
ResourceType.POOL: ChannelType.QUEUE
}
self.active_channels = {
ChannelType.QUEUE: Channel.QUEUE_A,
ChannelType.FALLBACK_QUEUE: Channel.FALLBACK_QUEUE_A,
ChannelType.HTTP: Channel.HTTP_A,
ChannelType.HTTPS: Channel.HTTPS_A,
ChannelType.LIVE: Channel.LIVE_0
}
def set_active(self, channel_type, channel):
"""
Set the channel for the given resource type active
"""
self.active_channels[channel_type] = channel
def get_active(self, channel_type):
"""
Retrieves the active channel for the given resource type
"""
return self.active_channels[channel_type]
def type_of_channel(self, channel):
"""
Retrieves a `ChannelType` for the given `Channel`.
"""
if channel in ChannelType.QUEUE.channels:
return ChannelType.QUEUE
elif channel in ChannelType.FALLBACK_QUEUE.channels:
return ChannelType.FALLBACK_QUEUE
elif channel in ChannelType.HTTP.channels:
return ChannelType.HTTP
elif channel in ChannelType.HTTPS.channels:
return ChannelType.HTTPS
elif channel in ChannelType.LIVE.channels:
return ChannelType.LIVE
else:
return None
def type_for_resource(self, resource_type):
"""
Retrieves a `ChannelType` for the given `ResourceType`.
Only default mappings can be evaluatated. Custom variations
like fallback channels are not respected.
"""
return self.resource_mapping.get(resource_type)
def channel_swap(self, channel_type):
"""
Returns the currently inactive channel for a given type. For example if the currently some
file on channel QUEUE A is playing, the channel QUEUE B is returned for being used
to queue new entries.
Args:
entry_type (ResourceType): The resource type such es file, stream or live source
Returns:
(Channel, Channel): The previous and new channel
"""
previous_channel = self.active_channels[channel_type]
new_channel = None
msg = None
if channel_type == ChannelType.QUEUE:
if previous_channel == Channel.QUEUE_A:
new_channel = Channel.QUEUE_B
msg = "Swapped queue channel from A > B"
else:
new_channel = Channel.QUEUE_A
msg = "Swapped queue channel from B > A"
elif channel_type == ChannelType.FALLBACK_QUEUE:
if previous_channel == Channel.FALLBACK_QUEUE_A:
new_channel = Channel.FALLBACK_QUEUE_B
msg = "Swapped fallback queue channel from A > B"
else:
new_channel = Channel.FALLBACK_QUEUE_A
msg = "Swapped fallback channel from B > A"
elif channel_type == ChannelType.HTTP:
if previous_channel == Channel.HTTP_A:
new_channel = Channel.HTTP_B
msg = "Swapped HTTP Stream channel from A > B"
else:
new_channel = Channel.HTTP_A
msg = "Swapped HTTP Stream channel from B > A"
elif channel_type == ChannelType.HTTPS:
if previous_channel == Channel.HTTPS_A:
new_channel = Channel.HTTPS_B
msg = "Swapped HTTPS Stream channel from A > B"
else:
new_channel = Channel.HTTPS_A
msg = "Swapped HTTPS Stream channel from B > A"
else:
self.logger.warning(SU.red(f"No channel to swap - invalid entry_type '{channel_type}'"))
if msg: self.logger.info(SU.pink(msg))
return (previous_channel, new_channel)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
import urllib.parse
import configparser
import logging
from multiprocessing import Lock
from src.base.exceptions import LQConnectionError
from src.base.utils import TerminalColors
class LiquidSoapClient:
"""
LiquidSoapClient Class
Connects to a LiquidSoap instance over a socket and sends commands to it
"""
mutex = None
logger = None
debug = False
socket_path = ""
disable_logging = True
def __init__(self, config, socket_filename):
"""
Constructor
@type socket_path: string
@param socket_path: Der Pfad zum Socket des Liquidsoap-Scripts
"""
self.logger = logging.getLogger("AuraEngine")
self.socket_path = config.get('socketdir') + '/' + socket_filename
self.logger.debug("LiquidSoapClient using socketpath: " + self.socket_path)
# init
self.mutex = Lock()
self.connected = False
self.can_connect = True
self.message = ''
self.socket = None
self.metareader = configparser.ConfigParser()
# ------------------------------------------------------------------------------------------ #
def connect(self):
"""
Verbindung herstellen
"""
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect(self.socket_path)
except socket.error as e:
msg = "Cannot connect to socketpath " + self.socket_path + ". Reason: "+str(e)
self.logger.critical(TerminalColors.RED.value+msg+TerminalColors.ENDC.value)
self.can_connect = False
self.connected = False
# raise e
else:
self.can_connect = True
self.connected = True
return True
# AttributeError('characters_written')
# ------------------------------------------------------------------------------------------ #
def is_connected(self):
return self.connected
# ------------------------------------------------------------------------------------------ #
def write(self, data):
"""
Auf den Socket schreiben
@type data: string
@param data: Der String der gesendet wird
"""
if self.connected:
self.socket.sendall(data.decode("UTF-8"))
# ------------------------------------------------------------------------------------------ #
def read_all(self, timeout=2):
"""
Vom Socket lesen, bis dieser "END" sendet
@type timeout: int
@param timeout: Ein optionales Timeout
@rtype: string
@return: Die Antwort des Liquidsoap-Servers
"""
# make socket non blocking
# self.client.setblocking(0)
data = ''
try:
# set timeout
self.socket.settimeout(timeout)
# acquire the lock
self.mutex.acquire()
while True:
data += self.socket.recv(1).decode("utf-8")
# receive as long as we are not at the END or recv a Bye! from liquidsoap
if data.find("END\r\n") != -1 or data.find("Bye!\r\n") != -1:
data.replace("END\r\n", "")
break
# release the lock
self.mutex.release()
except Exception as e:
self.logger.error(TerminalColors.RED.value+str(e)+TerminalColors.ENDC.value)
self.mutex.release()
return data
# ------------------------------------------------------------------------------------------ #
def read(self):
"""
read from socket and store return value in self.message
@rtype: string
@return: The answer of liquidsoap server
"""
if self.connected:
ret = self.read_all().splitlines()
try:
last = ret.pop() # pop out end
if len(ret) > 1:
self.message = str.join(" - ", ret)
elif len(ret) == 1:
self.message = ret[0]
if last == "Bye!":
self.message = last
except Exception as e:
self.logger.error(str(e))
return self.message
# ------------------------------------------------------------------------------------------ #
def close(self):
"""
Quit senden und Verbindung schließen
"""
if self.connected:
message = "quit\r"
self.socket.sendall(message.decode("UTF-8"))
self.socket.close()
self.connected = False
# ------------------------------------------------------------------------------------------ #
def command(self, namespace, command, param=""):
"""
Kommando an Liquidosap senden
@type command: string
@param command: Kommando
@type namespace: string
@param namespace: Namespace/Kanal der angesprochen wird
@type param: mixed
@param param: ein optionaler Parameter
@rtype: string
@return: Die Antwort des Liquidsoap-Servers
"""
param = (param.strip() if param.strip() == "" else " " + urllib.parse.unquote(param.strip()))
if self.connected:
# print namespace + '.' + command + param + "\n"
if namespace is "":
message = str(command) + str(param) + str("\n")
else:
message = str(namespace) + str(".") + str(command) + str(param) + str("\n")
try:
if not self.disable_logging:
self.logger.debug("LiquidSoapClient sending to LiquidSoap Server: " + message[0:len(message)-1])
# send all the stuff over the socket to liquidsoap server
self.socket.sendall(message.encode())
if not self.disable_logging:
self.logger.debug("LiquidSoapClient waiting for reply from LiquidSoap Server")
# wait for reply
self.read()
if not self.disable_logging:
self.logger.debug("LiquidSoapClient got reply: " + self.message)
except BrokenPipeError as e:
self.logger.error(TerminalColors.RED.value+"Detected a problem with liquidsoap connection while sending: " + message + ". Reason: " + str(e) + "! Trying to reconnect."+TerminalColors.RED.value)
self.connect()
raise
except Exception as e:
self.logger.error("Unexpected error: " + str(e))
raise
return self.message
else:
msg = "LiquidsoapClient not connected to LiquidSoap Server"
self.logger.error(msg)
raise LQConnectionError(msg)
# ------------------------------------------------------------------------------------------ #
def help(self):
"""
get liquidsoap server help
@rtype: string
@return: the response of the liquidsoap server
"""
if self.connected:
self.command('help', '')
return self.message
# ------------------------------------------------------------------------------------------ #
def version(self):
"""
Liquidsoap get version
@rtype: string
@return: the response of the liquidsoap server
"""
if self.connected:
message = 'version'
self.command(message, '')
return self.message
# ------------------------------------------------------------------------------------------ #
def uptime(self):
"""
Liquidsoap get uptime
@rtype: string
@return: Die Antwort des Liquidsoap-Servers
"""
if self.connected:
self.command('uptime', '')
return self.message
# ------------------------------------------------------------------------------------------ #
def byebye(self):
"""
Liquidsoap say byebye
@rtype: string
@return: Die Antwort des Liquidsoap-Servers
"""
if self.connected:
self.command("", "quit")
return self.message
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from src.base.config import AuraConfig
from src.base.utils import TerminalColors, SimpleUtil as SU
from src.base.exceptions import LQConnectionError
from src.core.client.playerclient import LiquidSoapPlayerClient
class PlayerConnector():
"""
Establishes a Socket connection to Liquidsoap.
"""
client = None
logger = None
transaction = 0
connection_attempts = 0
disable_logging = False
event_dispatcher = None
def __init__(self, event_dispatcher):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.client = LiquidSoapPlayerClient(self.config, "engine.sock")
self.event_dispatcher = event_dispatcher
def send_lqc_command(self, namespace, command, *args):
"""
Ein Kommando an Liquidsoap senden
@type lqs_instance: object
@param lqs_instance: Instance of LiquidSoap Client
@type namespace: string
@param namespace: Namespace of function
@type command: string
@param command: Function name
@type args: list
@param args: List of parameters
@rtype: string
@return: Response from LiquidSoap
"""
lqs_instance = self.client
try:
if not self.disable_logging:
if namespace == "recorder":
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args))
else:
if command == "":
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + str(args))
else:
self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args))
# call wanted function ...
# FIXME REFACTOR all calls in a common way
if command in [
"queue_push",
"queue_seek",
"queue_clear",
"playlist_uri_set",
"playlist_uri_clear",
"stream_set_ur",
"stream_start",
"stream_stop",
"stream_status",
]:
func = getattr(lqs_instance, command)
result = func(str(namespace), *args)
elif namespace == "mixer" or namespace == "mixer_fallback":
func = getattr(lqs_instance, command)
result = func(str(namespace), *args)
else:
func = getattr(lqs_instance, namespace)
result = func(command, *args)
if not self.disable_logging:
self.logger.debug("LiquidSoapCommunicator got response " + str(result))
self.connection_attempts = 0
return result
except LQConnectionError as e:
self.logger.error("Connection Error when sending " + str(namespace) + "." + str(command) + str(args))
if self.try_to_reconnect():
time.sleep(0.2)
self.connection_attempts += 1
if self.connection_attempts < 5:
# reconnect
self.__open_conn(self.client)
self.logger.info("Trying to resend " + str(namespace) + "." + str(command) + str(args))
# grab return value
retval = self.send_lqc_command(namespace, command, *args)
# disconnect
self.__close_conn(self.client)
# return the val
return retval
else:
if command == "":
msg = "Rethrowing Exception while trying to send " + str(namespace) + str(args)
else:
msg = "Rethrowing Exception while trying to send " + str(namespace) + "." + str(command) + str(args)
self.logger.info(msg)
self.disable_transaction(socket=self.client, force=True)
raise e
else:
self.event_dispatcher.on_critical("Criticial Liquidsoap connection issue", \
"Could not connect to Liquidsoap (Multiple attempts)", e)
raise e
# ------------------------------------------------------------------------------------------ #
def try_to_reconnect(self):
self.enable_transaction()
return self.transaction > 0
# ------------------------------------------------------------------------------------------ #
def enable_transaction(self, socket=None):
# set socket to playout if nothing else is given
if socket is None:
socket = self.client
self.transaction = self.transaction + 1
self.logger.debug(TerminalColors.WARNING.value + "Enabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
if self.transaction > 1:
return
try:
self.__open_conn(socket)
except FileNotFoundError:
self.disable_transaction(socket=socket, force=True)
subject = "CRITICAL Exception when connecting to Liquidsoap"
msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?"
self.logger.critical(SU.red(msg))
self.event_dispatcher.on_critical(subject, msg, None)
# ------------------------------------------------------------------------------------------ #
def disable_transaction(self, socket=None, force=False):
if not force:
# nothing to disable
if self.transaction == 0:
return
# decrease transaction counter
self.transaction = self.transaction - 1
# debug msg
self.logger.debug(TerminalColors.WARNING.value + "DISabling transaction! cnt: " + str(self.transaction) + TerminalColors.ENDC.value)
# return if connection is still needed
if self.transaction > 0:
return
else:
self.logger.debug(TerminalColors.WARNING.value + "Forcefully DISabling transaction! " + TerminalColors.ENDC.value)
# close conn and set transactioncounter to 0
self.__close_conn(socket)
self.transaction = 0
# ------------------------------------------------------------------------------------------ #
def __open_conn(self, socket):
# already connected
if self.transaction > 1:
return
self.logger.debug(TerminalColors.GREEN.value + "LiquidSoapCommunicator opening conn" + TerminalColors.ENDC.value)
# try to connect
socket.connect()
# ------------------------------------------------------------------------------------------ #
def __close_conn(self, socket):
# set socket to playout
if socket is None:
socket = self.client
# do not disconnect if a transaction is going on
if self.transaction > 0:
return
# say bye
socket.byebye()
# debug msg
self.logger.debug(TerminalColors.BLUE.value + "LiquidSoapCommunicator closed conn" + TerminalColors.ENDC.value)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from src.core.client.client import LiquidSoapClient
class LiquidSoapPlayerClient(LiquidSoapClient):
#
# Mixer
#
# def mixer(self, mixer_id, command, *args):
# if command == "status":
# return self.mixer_status(mixer_id, *args)
# if command == "inputs":
# return self.mixer_inputs(mixer_id)
# if command == "volume":
# return self.mixer_volume(mixer_id, *args)
# if command == "select":
# if len(args) == 2:
# return self.mixer_select(mixer_id, args[0], args[1])
# if command == "activate":
# if len(args) == 2:
# return self.mixer_activate(mixer_id, args[0], args[1])
# return "LiquidSoapPlayerClient does not understand mixer."+command+str(args)
# ------------------------------------------------------------------------------------------ #
def mixer_inputs(self, mixer_id):
# send command
self.command(mixer_id, "inputs")
# convert to list and return it
return self.message.strip().split(' ')
# ------------------------------------------------------------------------------------------ #
def mixer_status(self, mixer_id, pos=""):
"""
Get state of a source in the mixer
@type pos: string
@param pos: Mixerposition
@rtype: string
@return: Response from LiquidSoap
"""
self.command(mixer_id, "status", str(pos))
return self.message
def mixer_volume(self, mixer_id, pos, volume):
"""
Sets some mixer channel to the given volume
Args:
pos (Integer): The channel number
volume (Integer): The volume
Returns:
(String): Liquidsoap server response
"""
self.command(mixer_id, "volume", str(pos) + " " + str(volume))
return self.message
def mixer_select(self, mixer_id, pos, select):
"""
Selects some mixer channel or vice versa.
Args:
pos (Integer): The channel number
select (Boolean): Select or deselect
Returns:
(String): Liquidsoap server response
"""
self.command(mixer_id, "select", str(pos) + " " + str(select).lower())
return self.message
def mixer_activate(self, mixer_id, pos, activate):
"""
Selects some mixer channel and increases the volume to 100 or vice versa.
Args:
pos (Integer): The channel number
activate (Boolean): Activate or deactivate
Returns:
(String): Liquidsoap server response
"""
self.command(mixer_id, "activate", str(pos) + " " + str(activate).lower())
return self.message
#
# Queues
#
def queue_push(self, channel, uri):
"""
Pushes the passed file URI to the `equeue` playlist channel.
Args:
channel (String): Liquidsoap Source ID
uri (String): Path to the file
"""
self.command(channel, 'push', uri)
return self.message
def queue_seek(self, channel, duration):
"""
Forward the playing `equeue` track/playlist of the given channel.
Args:
channel (String): Liquidsoap Source ID
duration (Integer): Seek duration ins seconds
Returns:
Liquidsoap server response
"""
self.command(channel, 'seek', str(duration))
return self.message
def queue_clear(self, channel):
"""
Clears all `equeue` playlist entries of the given channel.
Args:
channel (String): Liquidsoap Source ID
duration (Integer): Seek duration ins seconds
Returns:
Liquidsoap server response
"""
self.command(channel, 'clear')
return self.message
#
# Playlist
#
def playlist_uri_set(self, channel, uri):
"""
Sets the URI of a playlist source.
Args:
channel (String): Liquidsoap Source ID
uri (String): URI to the playlist file
Returns:
Liquidsoap server response
"""
self.command(channel, 'uri', uri)
return self.message
def playlist_uri_clear(self, channel):
"""
Clears the URI of a playlist source.
Args:
channel (String): Liquidsoap Source ID
uri (String): URI to the playlist file
Returns:
Liquidsoap server response
"""
self.command(channel, 'clear')
return self.message
#
# Stream
#
def stream_set_url(self, channel, url):
"""
Sets the URL on the given HTTP channel.
"""
self.command(channel, 'url', url)
return self.message
def stream_start(self, channel):
"""
Starts the HTTP stream set with `stream_set_url` on the given channel.
"""
self.command(channel, 'start')
return self.message
def stream_stop(self, channel):
"""
Stops the HTTP stream on the given channel.
"""
self.command(channel, 'stop')
return self.message
def stream_status(self, channel):
"""
Returns the status of the HTTP stream on the given channel.
"""
self.command(channel, 'status')
return self.message
#
# General Entries
#
def entry_status(self, rid):
"""
Retrieves the status of a given entry.
Args:
rid (String): Resource ID (RID)
Returns:
Liquidsoap server response
"""
self.command("request", "status", str(rid))
return self.message
#
# Other
#
def uptime(self, command=""): # no command will come
"""
Retrieves how long the engine is running already.
"""
return self.command("", "uptime")
def version(self, command=""): # no command will come
"""
Retrieves the Liquidsoap version.
"""
return self.command("", "version")
def engine(self, command, *args):
"""
Retrieves the state of all input and outputs.
"""
if command == "state":
return self.engine_state()
return "LiquidSoapPlayerClient does not understand engine." + command + str(args)
def engine_state(self):
"""
Retrieves the state of all input and outputs.
"""
self.command('auraengine', 'state')
return self.message
# ------------------------------------------------------------------------------------------ #
# def recorder(self, num, command, *args):
# if command == "status":
# return self.recorderstatus(num)
# if command == "start":
# return self.recorderstart(num)
# if command == "stop":
# return self.recorderstop(num)
# return "LiquidSoapPlayerClient does not understand mixer." + command + str(args)
# ------------------------------------------------------------------------------------------ #
# def recorderstatus(self, num):
# """
# get status of a recorder
# :return:
# """
# self.command("recorder_" + str(num), "status")
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def recorderstart(self, num):
# """
# get status of a recorder
# :return:
# """
# self.command("recorder_" + str(num), "start")
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def recorderstop(self, num):
# """
# get status of a recorder
# :return:
# """
# self.command("recorder_" + str(num), "stop")
# return self.message
# ------------------------------------------------------------------------------------------ #
# def skip(self, namespace="playlist", pos=""):
# """
# Source skippen
# @type namespace: string
# @param namespace: Namespace der Source
# @type pos: string
# @param pos: Die Position - optional - Position des Channels vom Mixer benötigt
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('skip', namespace, pos)
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def remove(self, pos, namespace="playlist"):
# """
# Track aus der secondary_queue oder der Playlist entfernen
# @type pos: string
# @param pos: Die Position
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('remove', namespace, str(pos))
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def insert(self, uri, pos='0', namespace="playlist"):
# """
# Track einfügen
# @type uri: string
# @param uri: Uri einer Audiodatei
# @type pos: string
# @param pos: Die Position
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('insert', namespace, str(pos) + ' ' + uri)
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def move(self, fromPos, toPos, namespace="playlist"):
# """
# Track von Position fromPos nach Position toPos verschieben
# @type fromPos: string/int
# @param fromPos: Position des zu verschiebenden Tracks
# @type toPos: string
# @param toPos: Die Position zu der verschoben werden soll
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('move', namespace, str(fromPos) + ' ' + str(toPos))
# return self.message
# ------------------------------------------------------------------------------------------ #
# def play(self, namespace="playlist"):
# """
# Source abspielen - funktioniert nur bei Playlist
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('play', namespace)
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def pause(self, namespace="playlist"):
# """
# Source pausieren/stoppen - funktioniert nur bei Playlist
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('pause', namespace)
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def flush(self, namespace="playlist"):
# """
# Playlist leeren
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('flush', namespace)
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def playlistData(self):
# """
# Metadaten der Playlist ausgeben
# @rtype: string
# @return: Ein Json-String
# """
# self.command('data', 'playlist')
# return self.message
# # ------------------------------------------------------------------------------------------ #
# def get_queue(self, namespace="ch1", queue='queue'):
# """
# Queue eines Kanals ausgeben
# @type namespace: string
# @param namespace: Namespace der Source
# @type queue: string
# @param queue: Name des queues (queue, primary_queue, secondary_queue)
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command(queue, namespace)
# return self.message
# ------------------------------------------------------------------------------------------ #
# def loadPlaylist(self, uri, params="", namespace="playlist"):
# """
# Playlist laden
# @type uri: string
# @param uri: Uri einer Playlist im XSPF-Format
# @type params: string
# @param params: obsolete
# @type namespace: string
# @param namespace: Namespace der Source - hier nur playlist
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('load', namespace, uri + params)
# return self.message
# ------------------------------------------------------------------------------------------ #
# def currentTrack(self, namespace="request"):
# """
# Das oder die ID(s) der gerade abgespielten requests erhalten
# @type namespace: string
# @param namespace: Namespace der Source
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers (als String)
# """
# self.command('on_air', namespace)
# return self.message
# ------------------------------------------------------------------------------------------ #
def volume(self, pos, volume, namespace="mixer"):
"""
Lautstärke eines Kanals setzen
@type pos: int/string
@param pos: Die Position/ Nummer des Kanals (playlist=0)
@type volume: int/string
@param volume: Zahl von 1 -100
@type namespace: string
@param namespace: Namespace der Source (immer mixer)
@rtype: string
@return: Die Antwort des Liquidsoap-Servers
"""
self.command('volume', namespace, str(pos) + ' ' + str(volume))
return self.message
# ------------------------------------------------------------------------------------------ #
# def playlist_remaining(self):
# """
# Wie lange läuft der aktuelle Track der Playlist noch
# @rtype: string
# @return: Die Antwort des Liquidsoap-Servers
# """
# self.command('remaining', 'playlist')
# return self.message
# ------------------------------------------------------------------------------------------ #
# def list_channels(self):
# """
# Channels auflisten (Simple JSON)
# """
# # Liquidsoap Kommando
# channels = self.sendLqcCommand(self.lqc, 'mixer', 'inputs')
# if not isinstance(channels, list):
# self.error('02')
# elif len(channels) < 1:
# self.warning('01')
# else:
# self.success('00', channels)
# self.notifyClient()
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import socket
import time
import json
from threading import Thread, Timer
from datetime import datetime, timedelta
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
class EngineControlInterface:
"""
Provides ability to control the engine in various ways.
"""
config = None
logger = None
engine = None
event_dispatcher = None
sci = None
def __init__(self, engine, event_dispatcher):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
self.engine = engine
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.logger.info(SU.yellow(f"[ECI] Engine Control Interface starting ..."))
self.sci = SocketControlInterface.get_instance(event_dispatcher)
def terminate(self):
"""
Terminates the instance and all related objects.
"""
if self.sci: self.sci.terminate()
class SocketControlInterface:
"""
Network socket server to control a running Engine from Liquidsoap.
Note this server only allows a single connection at once. This
service is primarly utilized to store new playlogs.
"""
PORT = 1337
ACTION_ON_METADATA = "on_metadata"
instance = None
config = None
logger = None
server = None
event_dispatcher = None
def __init__(self, event_dispatcher):
"""
Constructor
Args:
config (AuraConfig): Engine configuration
logger (AuraLogger): The logger
"""
if SocketControlInterface.instance:
raise Exception(SU.red("[ECI] Socket server is already running!"))
SocketControlInterface.instance = self
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher
host = "127.0.0.1"
thread = Thread(target = self.run, args = (self.logger, host))
thread.start()
@staticmethod
def get_instance(event_dispatcher):
"""
Returns the Singleton.
"""
if not SocketControlInterface.instance:
SocketControlInterface.instance = SocketControlInterface(event_dispatcher)
return SocketControlInterface.instance
def run(self, logger, host):
"""
Starts the socket server
"""
while(True):
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, SocketControlInterface.PORT))
break
except OSError as e:
wait_time = 2
self.logger.error(SU.red(f"Cannot bind to Socket. Retrying in {wait_time} seconds..."))
time.sleep(wait_time)
logger.info(SU.yellow(f'[ECI] Listening at {host}:{SocketControlInterface.PORT}'))
self.server.listen()
while(True):
(conn, client) = self.server.accept()
while(True):
r = SocketReader(conn)
p = HttpStream(r)
data = p.body_file().read()
logger.debug(SU.yellow(f'[ECI] Received socket data from {str(client)}: {str(data)}'))
try:
self.process(logger, json.loads(data))
conn.sendall(b'\n[ECI] processing done.\n')
except Exception as e:
logger.error(SU.red(f'[ECI] Error while processing request: {data}'), e)
conn.close()
break
def process(self, logger, data):
"""
Process incoming actions.
"""
if "action" in data:
if data["action"] == SocketControlInterface.ACTION_ON_METADATA:
meta_data = data["data"]
meta_data["duration"] = data["track_duration"]
logger.debug(SU.yellow(f"[ECI] Executing action: "+SocketControlInterface.ACTION_ON_METADATA))
self.event_dispatcher.on_metadata(data["data"])
logger.info(SU.yellow(f"[ECI] Event '{SocketControlInterface.ACTION_ON_METADATA}' issued successfully"))
else:
logger.error(SU.red("[ECI] Unknown action: " + data["action"]))
else:
logger.error(SU.red(f'[ECI] Missing action in request: {data}'))
def terminate(self):
SocketControlInterface.instance = None
self.server.close()
self.logger.info(SU.yellow("[ECI] Shutting down..."))
class EngineExecutor(Timer):
"""
Base class for timed or threaded execution of Engine commands.
Primarily used for automations performed by the scheduler.
"""
logger = logging.getLogger("AuraEngine")
timer_store = {}
child_timer = None
direct_exec = None
timer_id = None
timer_type = None
param = None
diff = None
dt = None
def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
"""
Constructor
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
child_timer (EngineExeuctor): Child action which is bound to this timer
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
from src.core.engine import Engine
now_unix = Engine.engine_time()
self.child_timer = child_timer
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time:
diff = 0
else:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
self.exec_timed()
self.start()
self.update_store()
def exec_now(self):
"""
Immediate execution within a thread. It's not stored in the timer store.
"""
self.direct_exec = True
thread = Thread(target = self.func, args = (self.param,))
thread.start()
def exec_timed(self):
"""
Timed execution in a thread.
"""
def wrapper_func(param=None):
# Remove from store
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineExecutor.timer_store[self.timer_id]
# Call actual function
if param: self.func(param,)
else: self.func()
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
def update_store(self):
"""
Adds the instance to the store and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
if existing_command.child_timer:
self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))
EngineExecutor.timer_store[self.timer_id] = self
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
def is_alive(self):
"""
Returns true if the command is still due to be executed.
"""
if self.direct_exec == True:
return False
return super().is_alive()
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"
@staticmethod
def log_commands():
"""
Prints a list of active timers to the log.
"""
timers = EngineExecutor.timer_store.values()
msg = "\n [ ENGINE COMMAND QUEUE ]\n"
if not timers:
msg += "None available!\n"
else:
for timer in timers:
msg += f" => {str(timer)}\n"
if timer.child_timer:
msg += f" => {str(timer.child_timer)}\n"
EngineExecutor.logger.info(msg + "\n")
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import logging
from contextlib import suppress
from threading import Thread
import meta
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, LoadSourceException
from src.core.resources import ResourceClass, ResourceUtil
from src.core.channels import ChannelType, TransitionType, LiquidsoapResponse, EntryPlayState, ResourceType, ChannelRouter
from src.core.events import EngineEventDispatcher
from src.core.control import EngineControlInterface
from src.core.mixer import Mixer, MixerType
from src.core.client.connector import PlayerConnector
class Engine():
"""
The Engine.
"""
instance = None
engine_time_offset = 0.0
logger = None
eci = None
channels = None
channel_router = None
scheduler = None
event_dispatcher = None
plugins = None
connector = None
def __init__(self):
"""
Constructor
"""
if Engine.instance:
raise Exception("Engine is already running!")
Engine.instance = self
self.logger = logging.getLogger("AuraEngine")
self.config = AuraConfig.config()
Engine.engine_time_offset = self.config.get("lqs_delay_offset")
self.plugins = dict()
self.channel_router = ChannelRouter(self.config, self.logger)
self.start()
def start(self):
"""
Starts the engine. Called when the connection to the sound-system implementation
has been established.
"""
self.event_dispatcher = EngineEventDispatcher(self)
self.eci = EngineControlInterface(self, self.event_dispatcher)
self.connector = PlayerConnector(self.event_dispatcher)
self.event_dispatcher.on_initialized()
while not self.is_connected():
self.logger.info(SU.yellow("Waiting for Liquidsoap to be running ..."))
time.sleep(2)
self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap"))
self.player = Player(self.connector, self.event_dispatcher)
self.event_dispatcher.on_boot()
self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__))
self.event_dispatcher.on_ready()
#
# Basic Methods
#
def is_connected(self):
"""
Checks if there's a valid connection to Liquidsoap.
"""
has_connection = False
try:
self.uptime()
has_connection = True
except LQConnectionError as e:
self.logger.info("Liquidsoap is not running so far")
except Exception as e:
self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e))
return has_connection
def engine_state(self):
"""
Retrieves the state of all inputs and outputs.
"""
state = self.connector.send_lqc_command("engine", "state")
return state
def version(self):
"""
Get the version of Liquidsoap.
"""
data = self.connector.send_lqc_command("version", "")
return data
def uptime(self):
"""
Retrieves the uptime of Liquidsoap.
"""
self.connector.enable_transaction()
data = self.connector.send_lqc_command("uptime", "")
self.connector.disable_transaction()
return data
@staticmethod
def engine_time():
"""
Liquidsoap is slow in executing commands, therefore it's needed to schedule
actions by (n) seconds in advance, as defined in the configuration file by
the property `lqs_delay_offset`. it's important to note that this method
requires the class variable `EngineUtil.engine_time_offset` to be set on
Engine initialization.
Returns:
(Integer): the Unix epoch timestamp including the offset
"""
return SU.timestamp() + Engine.engine_time_offset
@staticmethod
def get_instance():
"""
Returns the one and only engine.
"""
return Engine.instance
def terminate(self):
"""
Terminates the engine and all related processes.
"""
if self.eci: self.eci.terminate()
#
# PLAYER
#
class Player:
"""
Engine Player.
"""
config = None
logger = None
connector = None
channels = None
channel_router = None
event_dispatcher = None
mixer = None
mixer_fallback = None
def __init__(self, connector, event_dispatcher):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher
self.connector = connector
self.channel_router = ChannelRouter(self.config, self.logger)
self.mixer = Mixer(self.config, MixerType.MAIN, self.connector)
self.mixer_fallback = Mixer(self.config, MixerType.FALLBACK, self.connector)
def preload(self, entry):
"""
Pre-Load the entry. This is required before the actual `play(..)` can happen.
Be aware when using this method to queue a very short entry (shorter than ``) this may
result in sitations with incorrect timing. In this case bundle multiple short entries as
one queue using `preload_playlist(self, entries)`.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
`entry.state`.
Args:
entries ([Entry]): An array holding filesystem entries
"""
entry.status = EntryPlayState.LOADING
self.logger.info("Loading entry '%s'" % entry)
is_ready = False
# LIVE
if entry.get_content_type() in ResourceClass.LIVE.types:
entry.channel = "linein_" + entry.source.split("line://")[1]
is_ready = True
else:
channel_type = self.channel_router.type_for_resource(entry.get_content_type())
entry.previous_channel, entry.channel = self.channel_router.channel_swap(channel_type)
# QUEUE
if entry.get_content_type() in ResourceClass.FILE.types:
is_ready = self.queue_push(entry.channel, entry.source)
# STREAM
elif entry.get_content_type() in ResourceClass.STREAM.types:
is_ready = self.stream_load_entry(entry)
if is_ready:
entry.status = EntryPlayState.READY
self.event_dispatcher.on_queue([entry])
def preload_group(self, entries, channel_type=ChannelType.QUEUE):
"""
Pre-Load multiple filesystem entries at once. This call is required before the
actual `play(..)` can happen. Due to their nature, non-filesystem entries cannot be queued
using this method. In this case use `preload(self, entry)` instead. This method also allows
queuing of very short files, such as jingles.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
`entry.state`.
Args:
entries ([Entry]): An array holding filesystem entries
channel_type (ChannelType): The type of channel where it should be queued (optional)
"""
channels = None
# Validate entry type
for entry in entries:
if entry.get_content_type() != ResourceType.FILE:
raise InvalidChannelException
# Determine channel
channels = self.channel_router.channel_swap(channel_type)
# Queue entries
for entry in entries:
entry.status = EntryPlayState.LOADING
self.logger.info("Loading entry '%s'" % entry)
# Choose and save the input channel
entry.previous_channel, entry.channel = channels
if self.queue_push(entry.channel, entry.source) == True:
entry.status = EntryPlayState.READY
self.event_dispatcher.on_queue(entries)
return channels
def play(self, entry, transition):
"""
Plays a new `Entry`. In case of a new schedule (or some intented, immediate transition),
a clean channel is selected and transitions between old and new channel is performed.
This method expects that the entry is pre-loaded using `preload(..)` or `preload_group(self, entries)`
before being played. In case the pre-roll has happened for a group of entries, only the
first entry of the group needs to be passed.
Args:
entry (PlaylistEntry): The audio source to be played
transition (TransitionType): The type of transition to use e.g. fade-in or instant volume level.
queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so;
otherwise a new channel of the same type is activated
"""
with suppress(LQConnectionError):
channel_type = self.channel_router.type_of_channel(entry.channel)
mixer = self.mixer
if channel_type == ChannelType.FALLBACK_QUEUE:
mixer = self.mixer_fallback
# Instant activation or fade-in
self.connector.enable_transaction()
if transition == TransitionType.FADE:
mixer.channel_select(entry.channel.value, True)
mixer.fade_in(entry.channel, entry.volume)
else:
mixer.channel_activate(entry.channel.value, True)
self.connector.disable_transaction()
# Update active channel for the current channel type
self.channel_router.set_active(channel_type, entry.channel)
# Dear filesystem channels, please leave the room as you would like to find it!
if entry.previous_channel and \
entry.previous_channel in ChannelType.QUEUE.channels and \
entry.previous_channel in ChannelType.FALLBACK_QUEUE.channels:
def clean_up():
# Wait a little, if there is some long fade-out. Note, this also means,
# this channel should not be used for at least some seconds (including clearing time).
time.sleep(2)
self.connector.enable_transaction()
mixer.channel_activate(entry.previous_channel.value, False)
res = self.queue_clear(entry.previous_channel)
self.logger.info("Clear Queue Response: " + res)
self.connector.disable_transaction()
Thread(target=clean_up).start()
# Filesystem meta-changes trigger the event via Liquidsoap, so only
# issue event for LIVE and STREAM:
if not entry.channel in ChannelType.QUEUE.channels:
self.event_dispatcher.on_play(entry)
def stop(self, entry, transition):
"""
Stops the currently playing entry.
Args:
entry (Entry): The entry to stop playing
transition (TransitionType): The type of transition to use e.g. fade-out.
"""
with suppress(LQConnectionError):
self.connector.enable_transaction()
if not entry.channel:
self.logger.warn(SU.red("Trying to stop entry %s, but it has no channel assigned" % entry))
return
if transition == TransitionType.FADE:
self.mixer.fade_out(entry.channel)
else:
self.mixer.channel_volume(entry.channel, 0)
self.logger.info(SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry)))
self.connector.disable_transaction()
self.event_dispatcher.on_stop(entry)
def start_fallback_playlist(self, entries):
"""
Sets any scheduled fallback playlist and performs a fade-in.
Args:
entries ([Entry]): The playlist entries
"""
self.preload_group(entries, ChannelType.FALLBACK_QUEUE)
self.play(entries[0], TransitionType.FADE)
def stop_fallback_playlist(self):
"""
Performs a fade-out and clears any scheduled fallback playlist.
"""
dirty_channel = self.channel_router.get_active(ChannelType.FALLBACK_QUEUE)
self.logger.info(f"Fading out channel '{dirty_channel}'")
self.connector.enable_transaction()
self.mixer_fallback.fade_out(dirty_channel)
self.connector.disable_transaction()
def clean_up():
# Wait a little, if there is some long fade-out. Note, this also means,
# this channel should not be used for at least some seconds (including clearing time).
time.sleep(2)
self.connector.enable_transaction()
self.mixer_fallback.channel_activate(dirty_channel.value, False)
res = self.queue_clear(dirty_channel)
self.logger.info("Clear Fallback Queue Response: " + res)
self.connector.disable_transaction()
self.event_dispatcher.on_fallback_cleaned(dirty_channel)
Thread(target=clean_up).start()
#
# Channel Type - Stream
#
def stream_load_entry(self, entry):
"""
Loads the given stream entry and updates the entries's status codes.
Args:
entry (Entry): The entry to be pre-loaded
Returns:
(Boolean): `True` if successfull
"""
self.stream_load(entry.channel, entry.source)
time.sleep(1)
retry_delay = self.config.get("input_stream_retry_delay")
max_retries = self.config.get("input_stream_max_retries")
retries = 0
while not self.stream_is_ready(entry.channel, entry.source):
self.logger.info("Loading Stream ...")
if retries >= max_retries:
raise LoadSourceException("Could not connect to stream while waiting for %s seconds!" % str(retries*retry_delay))
time.sleep(retry_delay)
retries += 1
return True
def stream_load(self, channel, url):
"""
Preloads the stream URL on the given channel. Note this method is blocking
some serious amount of time; hence it's worth being called asynchroneously.
Args:
channel (Channel): The stream channel
uri (String): The stream URL
Returns:
(Boolean): `True` if successful
"""
result = None
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "stream_stop")
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("%s.stop result: %s" % (channel, result))
raise LQStreamException("Error while stopping stream!")
result = self.connector.send_lqc_command(channel, "stream_set_url", url)
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("%s.set_url result: %s" % (channel, result))
raise LQStreamException("Error while setting stream URL!")
# Liquidsoap ignores commands sent without a certain timeout
time.sleep(2)
result = self.connector.send_lqc_command(channel, "stream_start")
self.logger.info("%s.start result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def stream_is_ready(self, channel, url):
"""
Checks if the stream on the given channel is ready to play. Note this method is blocking
some serious amount of time even when successfull; hence it's worth being called asynchroneously.
Args:
channel (Channel): The stream channel
uri (String): The stream URL
Returns:
(Boolean): `True` if successful
"""
result = None
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "stream_status")
self.logger.info("%s.status result: %s" % (channel, result))
if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value):
return False
lqs_url = result.split(" ")[1]
if not url == lqs_url:
self.logger.error("Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url))
return False
self.connector.disable_transaction()
stream_buffer = self.config.get("input_stream_buffer")
self.logger.info("Ready to play stream, but wait %s seconds until the buffer is filled..." % str(stream_buffer))
time.sleep(round(float(stream_buffer)))
return True
#
# Channel Type - Queue
#
def queue_push(self, channel, uri):
"""
Adds an filesystem URI to the given `ChannelType.QUEUE` channel.
Args:
channel (Channel): The channel to push the file to
uri (String): The URI of the file
Returns:
(Boolean): `True` if successful
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.logger.info(SU.pink("queue.push('%s', '%s'" % (channel, uri)))
self.connector.enable_transaction()
audio_store = self.config.get("audio_source_folder")
extension = self.config.get("audio_source_extension")
filepath = ResourceUtil.uri_to_filepath(audio_store, uri, extension)
result = self.connector.send_lqc_command(channel, "queue_push", filepath)
self.logger.info("%s.queue_push result: %s" % (channel, result))
self.connector.disable_transaction()
# If successful, Liquidsoap returns a resource ID of the queued track
return int(result) >= 0
def queue_seek(self, channel, seconds_to_seek):
"""
Forwards the player of the given `ChannelType.QUEUE` channel by (n) seconds.
Args:
channel (Channel): The channel to push the file to
seconds_to_seeks (Float): The seconds to skip
Returns:
(String): Liquidsoap response
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "queue_seek", str(seconds_to_seek))
self.logger.info("%s.seek result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def queue_clear(self, channel):
"""
Removes all tracks currently queued in the given `ChannelType.QUEUE` channel.
Args:
channel (Channel): The channel to push the file to
Returns:
(String): Liquidsoap response
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "queue_clear")
self.logger.info("%s.clear result: %s" % (channel, result))
self.connector.disable_transaction()
return result
#
# Channel Type - Playlist
#
def playlist_set_uri(self, channel, playlist_uri):
"""
Sets the URI of a playlist.
Args:
channel (Channel): The channel to push the file to
playlist_uri (String): The path to the playlist
Returns:
(String): Liquidsoap response
"""
self.logger.info(SU.pink("Setting URI of playlist '%s' to '%s'" % (channel, playlist_uri)))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "playlist_uri_set", playlist_uri)
self.logger.info("%s.playlist_uri result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def playlist_clear_uri(self, channel):
"""
Clears the URI of a playlist.
Args:
channel (Channel): The channel to push the file to
Returns:
(String): Liquidsoap response
"""
self.logger.info(SU.pink("Clearing URI of playlist '%s'" % (channel)))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "playlist_uri_clear")
self.logger.info("%s.playlist_uri_clear result: %s" % (channel, result))
self.connector.disable_transaction()
return result
class EngineSplash:
@staticmethod
def splash_screen(component, version):
"""
Prints the engine logo and version info.
"""
return """\n
█████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗
██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝
███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗
██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝
██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗
╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝
%s v%s - Ready to play!
\n""" % (component, version)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import datetime
from threading import Thread
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.base.mail import AuraMailer
from src.plugins.monitor import AuraMonitor
from src.plugins.trackservice import TrackServiceHandler
class EventBinding():
"""
A binding between the event dispatcher and some event handler.
It allows you to subscribe to events in a chained way:
```
binding = dispatcher.attach(AuraMonitor)
binding.subscribe("on_boot").subscribe("on_play")
```
"""
dispatcher = None
instance = None
def __init__(self, dispatcher, instance):
"""
Constructor
"""
self.dispatcher = dispatcher
self.instance = instance
def subscribe(self, event_type):
"""
Subscribes the instance to some event identified by the `event_type` string.
"""
self.dispatcher.subscribe(self.instance, event_type)
return self
def get_instance(self):
"""
Returns the object within that binding.
"""
return self.instance
class EngineEventDispatcher():
"""
Executes handlers for engine events.
"""
logger = None
config = None
subscriber_registry = None
mailer = None
engine = None
scheduler = None
monitor = None
def __init__(self, engine):
"""
Constructor
"""
self.subscriber_registry = dict()
self.logger = logging.getLogger("AuraEngine")
self.config = AuraConfig.config()
self.mailer = AuraMailer(self.config)
self.engine = engine
binding = self.attach(AuraMonitor)
binding.subscribe("on_boot")
binding.subscribe("on_sick")
binding.subscribe("on_resurrect")
binding = self.attach(TrackServiceHandler)
binding.subscribe("on_timeslot")
binding.subscribe("on_play")
binding.subscribe("on_metadata")
binding.subscribe("on_queue")
def attach(self, clazz):
"""
Creates an instance of the given Class.
"""
instance = clazz(self.engine)
return EventBinding(self, instance)
def subscribe(self, instance, event_type):
"""
Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
"""
if not event_type in self.subscriber_registry:
self.subscriber_registry[event_type] = []
self.subscriber_registry[event_type].append(instance)
def call_event(self, event_type, args):
"""
Calls all subscribers for the given event type.
"""
if not event_type in self.subscriber_registry:
return
listeners = self.subscriber_registry[event_type]
if not listeners:
return
for listener in listeners:
method = getattr(listener, event_type)
if method:
if args:
method(args)
else:
method()
#
# Events
#
def on_initialized(self):
"""
Called when the engine is initialized, just before
"""
self.logger.debug("on_initialized(..)")
from src.scheduling.scheduler import AuraScheduler
self.scheduler = AuraScheduler(self.engine)
self.call_event("on_initialized", None)
def on_boot(self):
"""
Called when the engine is starting up. This happens after the initialization step.
Connection to Liquidsoap should be available here.
"""
self.logger.debug("on_boot(..)")
self.call_event("on_boot", None)
def on_ready(self):
"""
Called when the engine is booted and ready to play.
"""
self.logger.debug("on_ready(..)")
self.scheduler.on_ready()
def on_timeslot(self, timeslot):
"""
Event Handler which is called by the scheduler when the current timeslot is refreshed.
Args:
source (String): The `PlaylistEntry` object
"""
def func(self, timeslot):
self.logger.debug("on_timeslot(..)")
self.call_event("on_timeslot", timeslot)
thread = Thread(target = func, args = (self, timeslot))
thread.start()
def on_play(self, entry):
"""
Event Handler which is called by the engine when some entry is actually playing.
Args:
source (String): The `PlaylistEntry` object
"""
def func(self, entry):
self.logger.debug("on_play(..)")
# Assign timestamp for play time
entry.entry_start_actual = datetime.datetime.now()
self.call_event("on_play", entry)
thread = Thread(target = func, args = (self, entry))
thread.start()
def on_metadata(self, data):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
Args:
data (dict): A collection of metadata related to the current track
"""
def func(self, data):
self.logger.debug("on_metadata(..)")
self.call_event("on_metadata", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_stop(self, entry):
"""
The entry on the assigned channel has been stopped playing.
"""
def func(self, entry):
self.logger.debug("on_stop(..)")
self.call_event("on_stop", entry)
thread = Thread(target = func, args = (self, entry))
thread.start()
def on_fallback_updated(self, playlist_uri):
"""
Called when the scheduled fallback playlist has been updated.
"""
self.logger.debug("on_fallback_updated(..)")
self.call_event("on_fallback_updated", playlist_uri)
def on_fallback_cleaned(self, cleaned_channel):
"""
Called when the scheduled fallback queue has been cleaned up.
"""
self.logger.debug("on_fallback_cleaned(..)")
self.call_event("on_fallback_cleaned", cleaned_channel)
def on_idle(self):
"""
Callend when no entry is playing
"""
def func(self):
self.logger.debug("on_idle(..)")
self.logger.error(SU.red("Currently there's nothing playing!"))
self.call_event("on_idle", None)
thread = Thread(target = func, args = (self, ))
thread.start()
def on_schedule_change(self, schedule):
"""
Called when the playlist or entries of the current schedule have changed.
"""
def func(self, schedule):
self.logger.debug("on_schedule_change(..)")
self.call_event("on_schedule_change", schedule)
thread = Thread(target = func, args = (self, schedule))
thread.start()
def on_queue(self, entries):
"""
One or more entries have been queued and are currently pre-loaded.
"""
def func(self, entries):
self.logger.debug("on_queue(..)")
self.call_event("on_queue", entries)
thread = Thread(target = func, args = (self, entries))
thread.start()
def on_sick(self, data):
"""
Called when the engine is in some unhealthy state.
"""
def func(self, data):
self.logger.debug("on_sick(..)")
self.call_event("on_sick", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_resurrect(self, data):
"""
Called when the engine turned healthy again after being sick.
"""
def func(self, data):
self.logger.debug("on_resurrect(..)")
self.call_event("on_resurrect", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_critical(self, subject, message, data=None):
"""
Callend when some critical event occurs
"""
def func(self, subject, message, data):
self.logger.debug("on_critical(..)")
if not data: data = ""
self.mailer.send_admin_mail(subject, message + "\n\n" + str(data))
self.call_event("on_critical", (subject, message, data))
thread = Thread(target = func, args = (self, subject, message, data))
thread.start()
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from enum import Enum
from src.base.exceptions import LQConnectionError
from src.base.utils import SimpleUtil as SU
class MixerType(Enum):
"""
Types of mixers mapped to the Liquidsoap mixer ids.
"""
MAIN = "mixer"
FALLBACK = "mixer_fallback"
class MixerUtil:
"""
Little helpers for the mixer.
"""
@staticmethod
def channel_status_dict(status):
"""
Transforms a channel status string to a dictionary.
"""
s = {}
pairs = status.split(" ")
for pair in pairs:
kv = pair.split("=")
s[kv[0]] = kv[1]
return s
class Mixer():
"""
A virtual mixer.
"""
config = None
logger = None
connector = None
mixer_id = None
channels = None
fade_in_active = None
fade_out_active = None
def __init__(self, config, mixer_id, connector):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.mixer_id = mixer_id
self.fade_in_active = None
self.fade_out_active = None
self.connector = connector
self.mixer_initialize()
#
# Mixer
#
def mixer_initialize(self):
"""
- Pull all faders down to volume 0.
- Initialize default channels per type
"""
self.connector.enable_transaction()
time.sleep(1) # TODO Check is this is still required
channels = self.mixer_channels_reload()
for channel in channels:
self.channel_volume(channel, "0")
self.connector.disable_transaction()
def mixer_status(self):
"""
Returns the state of all mixer channels
"""
cnt = 0
inputstate = {}
self.connector.enable_transaction()
inputs = self.mixer_channels()
for channel in inputs:
inputstate[channel] = self.channel_status(cnt)
cnt = cnt + 1
self.connector.disable_transaction()
return inputstate
def mixer_channels(self):
"""
Retrieves all mixer channels
"""
if self.channels is None or len(self.channels) == 0:
self.channels = self.connector.send_lqc_command(self.mixer_id.value, "mixer_inputs")
return self.channels
def mixer_channels_selected(self):
"""
Retrieves all selected channels of the mixer.
"""
cnt = 0
activeinputs = []
self.connector.enable_transaction()
inputs = self.mixer_channels()
for channel in inputs:
status = self.channel_status(cnt)
if "selected=true" in status:
activeinputs.append(channel)
cnt = cnt + 1
self.connector.disable_transaction()
return activeinputs
def mixer_channels_except(self, input_type):
"""
Retrieves all mixer channels except the ones of the given type.
"""
try:
activemixer_copy = self.mixer_channels().copy()
activemixer_copy.remove(input_type)
except ValueError as e:
self.logger.error("Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e)))
except AttributeError:
self.logger.critical("Empty channel list")
return activemixer_copy
def mixer_channels_reload(self):
"""
Reloads all mixer channels.
"""
self.channels = None
return self.mixer_channels()
#
# Channel
#
def channel_number(self, channel):
"""
Returns the channel number for the given channel ID.
Args:
channel (Channel): The channel
Returns:
(Integer): The channel number
"""
channels = self.mixer_channels()
index = channels.index(channel)
if index < 0:
self.logger.critical(f"There's no valid channel number for channel ID '{channel.value}'")
return None
return index
def channel_status(self, channel_number):
"""
Retrieves the status of a channel identified by the channel number.
Args:
channel_number (Integer): The channel number
Returns:
(String): Channel status info as a String
"""
return self.connector.send_lqc_command(self.mixer_id.value, "mixer_status", channel_number)
def channel_select(self, channel, select):
"""
Selects/deselects some mixer channel
Args:
pos (Integer): The channel number
select (Boolean): Select or deselect
Returns:
(String): Liquidsoap server response
"""
channels = self.mixer_channels()
try:
index = channels.index(channel)
if len(channel) < 1:
self.logger.critical("Cannot select channel. There are no channels!")
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_select", index, select)
return message
except Exception as e:
self.logger.critical("Ran into exception when selecting channel. Reason: " + str(e))
def channel_activate(self, channel, activate):
"""
Combined call of following to save execution time:
- Select some mixer channel
- Increase the volume to 100,
Args:
pos (Integer): The channel number
activate (Boolean): Activate or deactivate
Returns:
(String): Liquidsoap server response
"""
channels = self.mixer_channels()
try:
index = channels.index(channel)
if len(channel) < 1:
self.logger.critical("Cannot activate channel. There are no channels!")
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_activate", index, activate)
return message
except Exception as e:
self.logger.critical("Ran into exception when activating channel. Reason: " + str(e))
def channel_current_volume(self, channel):
"""
Retrieves the current volume of the channel.
"""
channel_number = self.channel_number(channel.value)
status = self.channel_status(channel_number)
channel_status = MixerUtil.channel_status_dict(status)
volume = channel_status.get("volume")
if volume:
return int(volume.split("%")[0])
else:
self.logger.error(f"Invalid volume for channel {channel.value} (status: '{status}'")
return 0
def channel_volume(self, channel, volume):
"""
Set volume of a channel
Args:
channel (Channel): The channel
volume (Integer) Volume between 0 and 100
"""
channel = str(channel)
try:
if str(volume) == "100":
channels = self.mixer_channels()
index = channels.index(channel)
else:
channels = self.mixer_channels()
index = channels.index(channel)
except ValueError as e:
msg = f"Cannot set volume of channel '{channel}' to {str(volume)}. Reason: {str(e)}"
self.logger.error(SU.red(msg))
return
try:
if len(channel) < 1:
msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.")
self.logger.warning(msg)
else:
message = self.connector.send_lqc_command(self.mixer_id.value, "mixer_volume", str(index), str(int(volume)))
if not self.connector.disable_logging:
if message.find('volume=' + str(volume) + '%'):
self.logger.info(SU.pink("Set volume of channel '%s' to %s" % (channel, str(volume))))
else:
msg = SU.red("Setting volume of channel " + channel + " has gone wrong! Liquidsoap message: " + message)
self.logger.warning(msg)
return message
except AttributeError as e: #(LQConnectionError, AttributeError):
self.connector.disable_transaction(force=True)
msg = SU.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e))
self.logger.error(msg)
#
# Fading
#
def fade_in(self, channel, volume):
"""
Performs a fade-in for the given channel.
Args:
channel (Channel): The channel to fade
volume (Integer): The target volume
Returns:
(Boolean): `True` if successful
"""
try:
current_volume = self.channel_current_volume(channel)
if current_volume == volume:
self.logger.warning(f"Current volume for channel {channel.value} is already at target volume of {volume}% SKIPPING...")
return
elif current_volume > volume:
self.logger.warning(f"Current volume {current_volume}% of channel {channel.value} exceeds target volume of {volume}% SKIPPING...")
return
fade_in_time = float(self.config.get("fade_in_time"))
if fade_in_time > 0:
self.fade_in_active = True
target_volume = volume
step = fade_in_time / target_volume
msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \
(channel, str(step), str(target_volume))
self.logger.info(SU.pink(msg))
# Enable logging, which might have been disabled in a previous fade-out
self.connector.disable_logging = True
self.connector.client.disable_logging = True
for i in range(target_volume):
self.channel_volume(channel.value, i + 1)
time.sleep(step)
msg = "Finished with fading-in '%s'." % channel
self.logger.info(SU.pink(msg))
self.fade_in_active = False
if not self.fade_out_active:
self.connector.disable_logging = False
self.connector.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return False
return True
def fade_out(self, channel, volume=None):
"""
Performs a fade-out for the given channel starting at it's current volume.
Args:
channel (Channel): The channel to fade
volume (Integer): The start volume
Returns:
(Boolean): `True` if successful
"""
try:
current_volume = self.channel_current_volume(channel)
if not volume:
volume = current_volume
if current_volume == 0:
self.logger.warning(f"Current volume for channel {channel.value} is already at target volume of 0%. SKIPPING...")
return
fade_out_time = float(self.config.get("fade_out_time"))
if fade_out_time > 0:
step = abs(fade_out_time) / current_volume
msg = "Starting to fading-out '%s'. Step is %ss." % (channel, str(step))
self.logger.info(SU.pink(msg))
# Disable logging... it is going to be enabled again after fadein and -out is finished
self.connector.disable_logging = True
self.connector.client.disable_logging = True
for i in range(volume):
self.channel_volume(channel.value, volume-i-1)
time.sleep(step)
msg = "Finished with fading-out '%s'" % channel
self.logger.info(SU.pink(msg))
# Enable logging again
self.fade_out_active = False
if not self.fade_in_active:
self.connector.disable_logging = False
self.connector.client.disable_logging = False
except LQConnectionError as e:
self.logger.critical(str(e))
return False
return True
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
class ResourceType(Enum):
"""
Media content types.
"""
FILE = "file:"
STREAM_HTTP = "http:"
STREAM_HTTPS = "https:"
LINE = "line:"
PLAYLIST = "playlist:"
POOL = "pool:"
class ResourceClass(Enum):
"""
Media content classes.
"""
FILE = {
"id": "fs",
"numeric": 0,
"types": [ResourceType.FILE]
}
STREAM = {
"id": "fs",
"numeric": 0,
"types": [ResourceType.STREAM_HTTP, ResourceType.STREAM_HTTPS]
}
LIVE = {
"id": "http",
"numeric": 1,
"types": [ResourceType.LINE]
}
PLAYLIST = {
"id": "https",
"numeric": 2,
"types": [ResourceType.PLAYLIST, ResourceType.POOL]
}
@property
def types(self):
return self.value["types"]
@property
def numeric(self):
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class ResourceUtil(Enum):
"""
Utilities for different resource types.
"""
@staticmethod
def get_content_type(uri):
"""
Returns the content type identified by the passed URI.
Args:
uri (String): The URI of the source
Returns:
(ResourceType)
"""
if uri.startswith(ResourceType.STREAM_HTTPS.value):
return ResourceType.STREAM_HTTPS
if uri.startswith(ResourceType.STREAM_HTTP.value):
return ResourceType.STREAM_HTTP
if uri.startswith(ResourceType.POOL.value):
return ResourceType.POOL
if uri.startswith(ResourceType.FILE.value):
return ResourceType.FILE
if uri.startswith(ResourceType.LINE.value):
return ResourceType.LINE
@staticmethod
def get_content_class(content_type):
"""
Returns the content class identified by the passed type.
Args:
content_type (ContentType): The content type
Returns:
(ResourceType)
"""
if content_type in ResourceClass.FILE.types:
return ResourceClass.FILE
if content_type in ResourceClass.STREAM.types:
return ResourceClass.STREAM
if content_type in ResourceClass.LIVE.types:
return ResourceClass.LIVE
if content_type in ResourceClass.PLAYLIST.types:
return ResourceClass.PLAYLIST
@staticmethod
def generate_m3u_file(target_file, audio_store_path, entries, entry_extension):
"""
Writes a M3U file based on the given playlist object.
Args:
target_file (File): The M3U playlist to write
audio_store_path (String): Folder containing the source files
entries (PlaylistEntry): Entries of the playlist
entry_extension (String): The file extension of the playlist entries
"""
file = open(target_file, "w")
fb = [ "#EXTM3U" ]
for entry in entries:
if ResourceUtil.get_content_type(entry.source) == ResourceType.FILE:
path = ResourceUtil.uri_to_filepath(audio_store_path, entry.source, entry_extension)
fb.append(f"#EXTINF:{entry.duration},{entry.meta_data.artist} - {entry.meta_data.title}")
fb.append(path)
file.writelines(fb)
file.close()
@staticmethod
def uri_to_filepath(base_dir, uri, source_extension):
"""
Converts a file-system URI to an actual, absolute path to the file.
Args:
basi_dir (String): The location of the audio store.
uri (String): The URI of the file
source_extension (String): The file extension of audio sources
Returns:
path (String): Absolute file path
"""
return base_dir + "/" + uri[7:] + source_extension
@staticmethod
def get_entries_string(entries):
"""
Returns a list of entries as String for logging purposes.
"""
s = ""
if isinstance(entries, list):
for entry in entries:
s += str(entry)
if entry != entries[-1]: s += ", "
else:
s = str(entries)
return s
@staticmethod
def lqs_annotate_cuein(uri, cue_in):
"""
Wraps the given URI with a Liquidsoap Cue In annotation.
Args:
uri (String): The path to the audio source
cue_in (Float): The value in seconds wher the cue in should start
Returns:
(String): The annotated URI
"""
if cue_in > 0.0:
uri = "annotate:liq_cue_in=\"%s\":%s" % (str(cue_in), uri)
return uri
\ No newline at end of file