Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Show changes
Showing
with 5811 additions and 0 deletions
#!/usr/bin/env python3
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Entrypoint to run the Engine.
"""
import logging
import signal
import sys
import threading
import confuse
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
from aura_engine.engine import Engine
class EngineRunner:
"""
EngineRunner is in charge of starting the engine.
"""
logger: logging.Logger
config: confuse.Configuration
engine: Engine
def __init__(self):
"""
Constructor.
"""
self.config = AuraConfig.instance.config
AuraLogger(self.config)
self.logger = logging.getLogger("engine")
self.engine = Engine()
def run(self):
"""
Start Engine Core.
"""
self.engine.start()
def exit_gracefully(self, signum, frame):
"""
Shutdown of the engine. Also terminates the Liquidsoap thread.
"""
for thread in threading.enumerate():
self.logger.info(thread.name)
if self.engine:
self.engine.terminate()
self.logger.info(f"Gracefully terminated Aura Engine! (signum:{signum}, frame:{frame})")
sys.exit(0)
#
# START THE ENGINE
#
if __name__ == "__main__":
runner = EngineRunner()
signal.signal(signal.SIGINT, runner.exit_gracefully)
signal.signal(signal.SIGTERM, runner.exit_gracefully)
runner.run()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Simple API library.
~~~~~~~~~~~~~~~~~~~~~
Simple API is an HTTP library, for implicit error handling. This is meant for situations
where you are mostly interested in results and keep going independently if there is an
error or not.
By default the library is meant for digesting REST endpoints.
The responses are wrapped in a dictionary containing additial fields which are used at a
frequent basis:
```py
{
"response" (Response): Actual Python response object
"error" (String): In case an error occurred
"exception" (Exception): In case an exception occurred
"json" (Dictionary): In case of an json request this is the deserialized data
}
```
Basic GET usage:
>>> from api import SimpleRestApi
>>> r = api.get("https://aura.radio/foo.json")
>>> r.response.status_code
200
>>> r.response.content
<raw content>
>>> r.json
<unmarshalled json>
"""
import json
import logging
import os
from pathlib import Path
from urllib.parse import urlparse
import requests
from aura_engine.base.lang import DotDict
from aura_engine.base.utils import SimpleUtil as SU
class SimpleRestApi:
"""
Simple wrapper on `requests` to deal with REST APIs.
Use it for services which do not want to deal with exception
handling but with results only.
SimpleRestApi has implicit logging of invalid states at logs
to the `engine` logger by default.
"""
CONTENT_JSON = "application/json"
default_headers = {"content-type": "application/json"}
logger = None
def __init__(self, logger_name="engine"):
self.logger = logging.getLogger(logger_name)
def exception_handler(func):
"""
Decorate functions with `@exception_handler` to handle API exceptions in a simple way.
Args:
func (_type_): The decorated function
"""
def handle_response(*args, **kwargs) -> dict:
"""Process the decorator response.
Returns:
dict: {
"response": requests.Response object
"error": String with error message
"exception": The actual exception
}
"""
fn = func.__name__.upper()
msg_template = f"during {fn} at '{args[1]}'"
error = None
exc = None
response_dict = None
response = requests.Response()
response.status_code = 400
try:
response = func(*args, **kwargs)
if isinstance(response, dict):
response_dict = response
response = response_dict.response
if int(response.status_code) >= 300:
reason = "-"
if hasattr(response, "reason"):
reason = response.reason
error = f"{response.status_code} | Error {msg_template}: {reason}"
args[0].logger.error(SU.red(error))
except requests.exceptions.ConnectionError as e:
exc = e
error = f"Bad Request {msg_template}"
args[0].logger.error(SU.red(error))
except requests.exceptions.Timeout as e:
exc = e
error = f"Timeout {msg_template}"
args[0].logger.error(SU.red(error))
except requests.exceptions.RequestException as e:
exc = e
error = f"Unknown Error {msg_template}"
args[0].logger.error(SU.red(error))
except Exception as e:
exc = e
error = f"Unknown Exception {msg_template}"
args[0].logger.error(SU.red(error), e)
finally:
result_dict = {"response": response, "error": error, "exception": exc}
if response_dict:
result_dict = result_dict | response_dict
return DotDict(result_dict)
return handle_response
def clean_dictionary(self, data: dict) -> dict:
"""
Delete keys with the value `None` in a dictionary, recursively.
Args:
data (dict): The dictionary
Returns:
(dict): The cleaned dictionary
"""
data = data.copy()
for key, value in list(data.items()):
if value is None:
del data[key]
elif isinstance(value, dict):
SimpleRestApi.clean_dictionary(self, value)
return data
def serialize_json(self, data: dict, clean_data=True) -> str:
"""
Marshall a dictionary as JSON String.
Args:
data (dict): Dictionary holding the data
Returns:
str: JSON String
"""
if clean_data:
data = self.clean_dictionary(data)
json_data = json.dumps(data, indent=4, sort_keys=True, default=str)
self.logger.info("Built JSON: " + json_data)
return json_data
def deserialize_json(self, response: str) -> str:
"""
Unmarshall a JSON String to a dictionary.
Args:
response (dict): Response object
Returns:
dict: JSON as dictionary
"""
json_data = None
try:
json_data = response.json()
except Exception:
self.logger.error(f"Invalid JSON: {response.content}")
return None
return json_data
@exception_handler
def get(
self, url: str, headers: dict = None, params: dict = None, timeout: int = 5
) -> requests.Response:
"""
GET from an URL.
Args:
url (str): The URL of the request
Returns:
{
"response": requests.Response,
"error": str,
"exception": Exception
}
"""
json_data = None
if not headers:
headers = SimpleRestApi.default_headers
response = requests.get(url, headers=headers, params=params, timeout=timeout)
if headers.get("content-type") == SimpleRestApi.CONTENT_JSON:
json_data = self.deserialize_json(response)
return DotDict({"response": response, "json": json_data})
@exception_handler
def post(self, url: str, data: dict, headers: dict = None, timeout: int = 5):
"""
POST to an URL.
Args:
url (str): The URL of the request
data (dict): Data payload for request body
headers (dict, optional): Optional headers, defaults to `SimpleRestApi.default_headers`
Returns:
{
"response": requests.Response,
"error": str,
"exception": Exception
}
"""
if not headers:
headers = SimpleRestApi.default_headers
body: str = self.serialize_json(data)
return requests.post(url, data=body, headers=headers, timeout=timeout)
@exception_handler
def put(
self, url: str, data: dict, headers: dict = None, timeout: int = 5
) -> requests.Response:
"""
PUT to an URL.
Args:
url (str): The URL of the request
data (dict): Data payload for request body
headers (dict, optional): Optional headers, defaults to `SimpleRestApi.default_headers`
Returns:
{
"response": requests.Response,
"error": str,
"exception": Exception
}
"""
if not headers:
headers = SimpleRestApi.default_headers
body: str = self.serialize_json(data)
return requests.put(url, data=body, headers=headers, timeout=timeout)
class SimpleCachedRestApi:
"""
Wrapper to cache GET responses based on the simple REST API.
It uses a network-first strategy:
1. Query the requested API endpoint
2. Store the result in a JSON file
3. Return the result as a JSON object
If the API endpoint is not available at step 1.) the cached JSON from the
most recent, previously successful request is returned.
"""
cache_location: str
simple_api: SimpleRestApi
logger = None
def __init__(self, simple_api: SimpleRestApi, cache_location: str, logger_name="engine"):
if cache_location[-1] != "/":
cache_location += "/"
cache_location += "api/"
os.makedirs(cache_location, exist_ok=True)
self.simple_api = simple_api
self.cache_location = cache_location
self.logger = logging.getLogger(logger_name)
def get(self, url: str, headers: dict = None, params: dict = None) -> requests.Response:
"""
GET from an URL while also storing the result in the local cache.
Args:
url (str): The URL of the request
Returns:
{
"response": requests.Response,
"error": str,
"exception": Exception
}
"""
filename = self.build_filename(url)
cache_filepath = self.cache_location + filename
result = self.simple_api.get(url, headers, params)
if result and result.json and result.response.status_code == 200:
with open(cache_filepath, "w") as file:
json.dump(result.json, file)
file.close()
else:
json_data = None
try:
file = open(cache_filepath, "r")
json_data = json.load(file)
file.close()
except FileNotFoundError:
pass
if json_data:
result = {
"response": DotDict({"status_code": 304, "error": "Not Modified"}),
"json": json_data,
}
else:
result = {
"response": DotDict({"status_code": 404, "error": "Not Found in local cache"}),
"json": None,
}
return DotDict(result)
def build_filename(self, url: str) -> str:
"""
Build a valid file name based on the URI parts of an URL.
Args:
url (str): The URL to build the filename from
Returns:
str: File name representing an URL
"""
parts = urlparse(url)
dirs = parts.path.strip("/").split("/")
return "-".join(dirs) + ".json"
def prune_cache_dir(self):
"""
Delete everything in the API cache directory.
"""
[f.unlink() for f in Path(self.cache_location).iterdir() if f.is_file()]
class LiquidsoapUtil:
"""
Utilities specific to Liquidsoap.
"""
@staticmethod
def json_to_dict(data: str) -> dict:
"""
Convert a Liquidsoap JSON String to dictionary.
"""
data = data.replace("+", " ")
data = data.replace("-", " ")
data = requests.utils.unquote(data)
return json.loads(data)
@staticmethod
def annotate_uri(uri: str, annotations: dict) -> str:
"""
Wrap the given URI with the passed annotation dictionary.
"""
metadata = ""
for k, v in annotations.items():
metadata += f'{k}="{v}",'
uri = f"annotate:{metadata[:-1]}:{uri}"
return uri
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Dealing with configuration data.
"""
import logging
import os
import os.path
import re
import sys
from pathlib import Path
import confuse
import yaml
template = {
"general": {
"socket_dir": str,
"cache_dir": str,
},
"log": {
"directory": str,
"level": confuse.OneOf(["debug", "info", "warning", "error", "critical"]),
},
"monitoring": {
"heartbeat": {"host": str, "port": int, "frequency": int},
},
"api": {
"steering": {"status": str, "calendar": str},
"tank": {"session": str, "secret": str, "status": str, "playlist": str},
"engine": {
"number": int,
"status": str,
"store_playlog": str,
"store_clock": str,
"store_health": str,
},
},
"scheduler": {
"audio": {
"source_folder": str,
"source_extension": str,
"playlist_folder": str,
"engine_latency_offset": float,
},
"fetching_frequency": int,
"scheduling_window_start": int,
"scheduling_window_end": int,
"preload_offset": int,
"input_stream": {"buffer": float},
"fade_in_time": float,
"fade_out_time": float,
},
}
class AuraConfig:
"""
Creates config by reading yaml file according to template above.
"""
_instance = None
config_file_path = ""
confuse_config: confuse.Configuration
config: confuse.Configuration | None = (
None # TODO points to a validated config (hopefully later)
)
logger: logging.Logger | None = None
# FIXME: Class properties are deprecated in Python 3.11
# and will not be supported in Python 3.13
@classmethod
@property
def instance(cls):
"""Create and return singleton instance."""
if cls._instance is None:
cls._instance = AuraConfig()
return cls._instance
def __init__(self, config_file_path="/etc/aura/engine.yaml"):
"""
Initialize the configuration, defaults to `/etc/aura/engine.yaml`.
If this file doesn't exist it uses `./config/engine.yaml` from
the project directory.
Args:
config_file_path(String): The path to the configuration file `engine.yaml`
"""
self.logger = logging.getLogger("engine")
config_file = Path(config_file_path)
project_root = Path(__file__).parent.parent.parent.parent.absolute()
if not config_file.is_file():
config_file_path = f"{project_root}/config/engine.yaml"
self.config_file_path = config_file_path
print(f"Using configuration at: {config_file_path}")
envar_matcher = re.compile(r"\$\{([^}^{]+)\}")
def envar_constructor(loader, node):
value = os.path.expandvars(node.value)
# workaround not to parse numerics as strings
try:
value = int(value)
except ValueError:
pass
try:
value = float(value)
except ValueError:
pass
return value
envar_loader = yaml.SafeLoader
envar_loader.add_implicit_resolver("!envar", envar_matcher, None)
envar_loader.add_constructor("!envar", envar_constructor)
self.confuse_config = confuse.Configuration("engine", loader=envar_loader)
self.confuse_config.set_file(config_file_path)
self.load_config()
# custom overrides and defaults
self.confuse_config["install_dir"].set(os.path.realpath(project_root))
self.confuse_config["config_dir"].set(os.path.dirname(config_file_path))
AuraConfig.instance = self
def init_version(self, version: dict):
"""
Read and set the component version from VERSION file in project root.
"""
self.confuse_config["version_control"].set(version.get("control"))
self.confuse_config["version_core"].set(version.get("core"))
self.confuse_config["version_liquidsoap"].set(version.get("liquidsoap"))
def load_config(self):
"""
Set config defaults and load settings from file.
"""
if not os.path.isfile(self.config_file_path):
self.logger.critical(self.config_file_path + " not found :(")
sys.exit(1)
self.config = self.confuse_config.get(template)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A collection of meta-programming and language utilities.
"""
import inspect
from functools import wraps
from multiprocessing import Lock
def synchronized(member):
"""
@synchronized decorator.
Lock a method for synchronized access only. The lock is stored to the function or class
instance, depending on what is available.
"""
@wraps(member)
def wrapper(*args, **kwargs):
lock = vars(member).get("_synchronized_lock", None)
result = ""
try:
if lock is None:
lock = vars(member).setdefault("_synchronized_lock", Lock())
lock.acquire()
result = member(*args, **kwargs)
lock.release()
except Exception as e:
lock.release()
raise e
return result
return wrapper
def private(member):
"""
@private decorator.
Use this to annotate your methods for private-visibility.
This is an more expressive alternative to the pythonic underscore visibility.
"""
@wraps(member)
def wrapper(*args, **kwargs):
me = member.__name__
stack = inspect.stack()
calling_class = stack[1][0].f_locals["self"].__class__.__name__
calling_method = stack[1][0].f_code.co_name
if calling_method not in dir(args[0]) and calling_method is not me:
msg = f'"{me}(..)" called by "{calling_class}.{calling_method}(..)" is private'
print(msg)
raise Exception(msg)
return member(*args, **kwargs)
return wrapper
class DotDict(dict):
"""
Wrap a dictionary with `DotDict()` to allow property access using the dot.notation.
"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Logging all the noise.
"""
import logging
import confuse
class AuraLogger:
"""
Logger for all Aura Engine components.
The default logger is `AuraEngine`. Other loggers are defined by passing a custom name on
instantiation. The logger respects the log-level as defined in the engine's configuration
file.
"""
config: confuse.Configuration
logger: logging.Logger
def __init__(self, config: confuse.Configuration, name="engine"):
"""
Initialize the logger.
Args:
config (AuraConfig): The configuration file
name (String): The name of the logger
"""
self.config = config
lvl = self.get_log_level()
self.create_logger(name, lvl)
def get_log_level(self):
"""
Retrieve the configured log level (default=INFO).
"""
lvl = self.config.log.level
mapping = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
log_level = mapping.get(lvl)
if not log_level:
print("No log level configured. Using INFO.")
log_level = logging.INFO
print(f"Setting log level {log_level} ({lvl})")
return log_level
def create_logger(self, name, lvl):
"""
Create the logger instance for the given name.
Args:
name (String): The name of the logger
lvl (Enum): The logging level
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(lvl)
if not self.logger.hasHandlers():
# create file handler for logger
file_handler = logging.FileHandler(self.config.log.directory + "/" + name + ".log")
file_handler.setLevel(lvl)
# create stream handler for logger
stream_handler = logging.StreamHandler()
stream_handler.setLevel(lvl)
# set format of log
datepart = "%(asctime)s:%(name)s:%(levelname)s"
message = " - %(message)s - "
filepart = "[%(filename)s:%(lineno)s-%(funcName)s()]"
formatter = logging.Formatter(datepart + message + filepart)
# set log of handlers
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# add handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(stream_handler)
self.logger.debug("ADDED HANDLERS")
else:
self.logger.debug("REUSED LOGGER")
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A collection of all kinds of simplifications.
"""
import datetime
import json
import time
from enum import Enum
from logging import Logger
class SimpleUtil:
"""
A container class for simple utility methods.
"""
@staticmethod
def string_to_datetime(datetime_str: str):
"""
Convert a ISO 8601 date-time string into `datetime`.
"""
if datetime_str:
return datetime.datetime.fromisoformat(datetime_str)
return None
@staticmethod
def timestamp_to_datetime(timestamp: float) -> datetime:
"""
Convert a timestamp to datetime.
Args:
timestamp (float): The timestamp to convert.
Returns:
(datetime): The `datetime` object.
"""
if timestamp:
return datetime.datetime.fromtimestamp(timestamp)
return None
@staticmethod
def fmt_time(timestamp: int):
"""
Format a UNIX timestamp to a String displaying time in the format '%H:%M:%S'.
Args:
(Integer) timestamp: Unix epoch
Returns:
(String): Displaying the time
"""
return datetime.datetime.fromtimestamp(timestamp).strftime("%H:%M:%S")
@staticmethod
def round_seconds(dt: datetime) -> datetime:
"""
Rounds date/time to the nearest second.
Args:
dt (datetime): the date/time object to round.
Returns:
datetime: the rounded version.
"""
rounded_dt = dt + datetime.timedelta(seconds=0.5)
return rounded_dt.replace(microsecond=0)
@staticmethod
def nano_to_seconds(nanoseconds) -> float:
"""
Convert nano-seconds to seconds.
Args:
(Integer) nanoseconds
Returns:
(Float): seconds
@deprecated since Tank moves to seconds as float.
"""
return float(nanoseconds / 1000000000)
@staticmethod
def timestamp(date_and_time=None):
"""
Transform the given `datetime` into a UNIX epoch timestamp.
If no parameter is passed, the current timestamp is returned.
Args:
(Datetime) date_and_time: The date and time to transform.
Returns:
(Integer): timestamp in seconds.
"""
if not date_and_time:
date_and_time = datetime.datetime.now()
return time.mktime(date_and_time.timetuple())
@staticmethod
def strike(text):
"""
Create a strikethrough version of the given text.
Args:
(String) text: The text to strike.
Returns:
(String): the striked text.
"""
result = ""
for c in str(text):
result += c + TerminalColors.STRIKE.value
return result
@staticmethod
def bold(text):
"""
Create a bold version of the given text.
"""
return TerminalColors.BOLD.value + text + TerminalColors.ENDC.value
@staticmethod
def underline(text):
"""
Create a underlined version of the given text.
"""
return TerminalColors.UNDERLINE.value + text + TerminalColors.ENDC.value
@staticmethod
def blue(text):
"""
Create a blue version of the given text.
"""
return TerminalColors.BLUE.value + text + TerminalColors.ENDC.value
@staticmethod
def red(text):
"""
Create a red version of the given text.
"""
return TerminalColors.RED.value + text + TerminalColors.ENDC.value
@staticmethod
def pink(text):
"""
Create a red version of the given text.
"""
return TerminalColors.PINK.value + text + TerminalColors.ENDC.value
@staticmethod
def yellow(text):
"""
Create a yellow version of the given text.
"""
return TerminalColors.YELLOW.value + text + TerminalColors.ENDC.value
@staticmethod
def green(text):
"""
Create a red version of the given text.
"""
return TerminalColors.GREEN.value + text + TerminalColors.ENDC.value
@staticmethod
def cyan(text):
"""
Create a cyan version of the given text.
"""
return TerminalColors.CYAN.value + text + TerminalColors.ENDC.value
@staticmethod
def log_json(logger: Logger, json_data: dict):
"""
Write formatted JSON to the debug logger.
Args:
logger (Logger): The logger.
json_data (dict): The json object.
"""
json_str = json.dumps(json_data, sort_keys=True, indent=2, separators=(",", ": "))
logger.debug(SimpleUtil.cyan(json_str))
class TerminalColors(Enum):
"""
Colors for formatting terminal output.
"""
HEADER = "\033[95m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
PINK = "\033[35m"
CYAN = "\033[36m"
WARNING = "\033[31m"
FAIL = "\033[41m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
STRIKE = "\u0336"
ENDC = "\033[0m"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Remote-control the Engine with these services.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from threading import Lock, Thread, Timer
from aura_engine.base.utils import SimpleUtil as SU
class EngineExecutor(Timer):
"""
Base class for timed or threaded execution of Engine commands.
Primarily used for automations performed by the scheduler.
"""
timer_store: dict = {}
logger = logging.getLogger("engine")
_lock = None
direct_exec: bool = None
is_aborted: bool = None
parent_timer: Timer = None
child_timer: Timer = None
timer_id: str = None
timer_type: str = None
update_count: int = None
func = None
param = None
diff = None
dt = None
def __init__(
self,
timer_type: str = "BASE",
parent_timer: Timer = None,
due_time=None,
func=None,
param=None,
):
"""
Constructor.
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
parent_timer (EngineExeuctor): Parent action which is a prerequisite for this timer
due_time (Float): When timer should be executed.
For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passed to the function
"""
self._lock = Lock()
from aura_engine.engine import Engine
now_unix = Engine.engine_time()
# Init parent-child relation
self.parent_timer = parent_timer
if self.parent_timer:
self.parent_timer.child_timer = self
# Init meta data
self.direct_exec = False
self.is_aborted = False
self.timer_type = timer_type
self.update_count = 0
diff = 0
if due_time:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
dt_str = SU.round_seconds(self.dt).strftime("%Y-%m-%d_%H:%M:%S")
self.timer_id = f"{timer_type}:{func.__name__}:{dt_str}"
self.func = func
self.param = param
is_stored = self.update_store()
if not is_stored:
msg = f"Timer '{self.timer_id}' omitted cuz it already exists but is dead"
self.logger.info(SU.yellow(msg))
self.is_aborted = True
else:
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.warning(SU.yellow(msg))
self.exec_now()
elif diff == 0:
self.logger.debug(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
msg = f"Timer '{self.timer_id}' to be executed in default manner"
self.logger.debug(msg)
self.exec_timed()
def wait_for_parent(self):
"""
Child timers are dependent on their parents.
So let's wait until parents are done with their stuff => finished execution.
Checks the parent state to befinished every 0.2 seconds.
@private
"""
if self.parent_timer and self.parent_timer.is_alive():
tid = self.timer_id
ptid = self.parent_timer.timer_id
self.logger.info(SU.yellow(f"Timer '{tid}' is waiting for parent '{ptid}'..."))
self.parent_timer.join()
self.logger.info(SU.yellow(f"Timer '{tid}' wait done: '{ptid}' finished"))
def exec_now(self):
"""
Immediate execution within a thread. It is not stored in the timer store.
It also assigns the `timer_id` as the thread name.
@private
"""
self.direct_exec = True
self.wait_for_parent()
thread = Thread(name=self.timer_id, target=self.func, args=(self.param,))
thread.start()
def exec_timed(self):
"""
Do timed execution in a thread.
This method introduces a slight delay to ensure the thread is properly initialized before
starting it.
It also assigns the `timer_id` as the thread name.
@private
"""
def wrapper_func(param=None):
self.wait_for_parent()
if param:
self.func(
param,
)
else:
self.func()
super().__init__(self.diff, wrapper_func, (self.param,))
self._name = self.timer_id
self.start()
def update_store(self):
"""
Add the instance to the store and cancels any previously existing commands.
If a timer with the given ID is already existing but also already executed,
then it is not added to the store. In such case the method returns `False`.
Returns:
(Boolean): True if the timer has been added to the store. False if the
timer is already existing but dead.
@private
"""
with self._lock:
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
# Check if existing timer has been executed already -> don't update
if not existing_command.is_alive():
msg = f"Existing dead timer (ID: {self.timer_id}) - no update."
self.logger.debug(msg)
return False
# Only update living timer when there's no completed parent
elif self.parent_timer and not self.parent_timer.is_alive():
self.logger.debug("Parent finished, leave the existing child alone.")
return False
# Parent and child are still waiting for execution -> update
else:
msg = f"Cancelling existing timer with ID: {self.timer_id}"
self.logger.debug(msg)
existing_command.cancel()
self.update_count = existing_command.update_count + 1
EngineExecutor.timer_store[self.timer_id] = self
self.logger.debug(f"Stored command timer with ID: {self.timer_id}")
return True
def is_alive(self):
"""
Return true if the command is still due to be executed.
@private
"""
if self.direct_exec:
return False
if self.is_aborted:
return False
return super().is_alive()
def __str__(self):
"""
Make a String representation of the timer.
"""
return f"[{self.timer_id}] exec at {str(self.dt)} \
(alive: {self.is_alive()}, updates: {self.update_count})"
@staticmethod
def remove_stale_timers():
"""
Remove timers from store which have been executed and are older than 3 hours.
"""
timers = EngineExecutor.timer_store.values()
del_keys = []
for timer in timers:
if timer.dt < datetime.now() - timedelta(hours=3):
if not timer.child_timer or (
timer.child_timer and not timer.child_timer.is_alive()
):
msg = f"Removing already executed timer with ID: {timer.timer_id}"
timer.logger.debug(msg)
del_keys.append(timer.timer_id)
for timer_id in del_keys:
del EngineExecutor.timer_store[timer_id]
@staticmethod
def command_history():
"""
Return a list of recent active and inactive timers to the logger.
"""
return EngineExecutor.timer_store.values()
@staticmethod
def log_commands():
"""
Print a list of recent active and inactive timers to the logger.
"""
msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
EngineExecutor.remove_stale_timers()
timers = EngineExecutor.timer_store.values()
if not timers:
msg += "\nNone available!\n"
else:
for timer in timers:
if not timer.parent_timer:
line = f" => {str(timer)}\n"
if timer.is_alive():
line = SU.green(line)
msg += line
if timer.child_timer:
line = f" => {str(timer.child_timer)}\n"
if timer.child_timer.is_alive():
line = SU.green(line)
msg += line
EngineExecutor.logger.info(msg + "\n")
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Channels Module.
Base types for channels:
- ChannelName: Valid channel names as defined in core.
- ChannelType: Holds mappings to concrete channel names.
Channel definitions:
- GenericChannel: All other channels inherit from this one.
- QueueChannel: Handles queues such as filesystem items.
- StreamChannel: Handles stream connections.
- LineChannel: Handles line audio input.
"""
from __future__ import annotations
import json
import logging
import time
from enum import Enum, StrEnum
from threading import Thread
import confuse
from aura_engine.base.api import LiquidsoapUtil as LU
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import DotDict, private
from aura_engine.base.utils import SimpleUtil as SU
class ChannelName(str, Enum):
"""
Allowed channel names.
These are name mappings to the Liqidsoap channel IDs.
"""
QUEUE_A = "in_queue_0"
QUEUE_B = "in_queue_1"
HTTP_A = "in_stream_0"
HTTP_B = "in_stream_1"
LIVE_0 = "aura_engine_line_in_0"
LIVE_1 = "aura_engine_line_in_1"
LIVE_2 = "aura_engine_line_in_2"
LIVE_3 = "aura_engine_line_in_3"
LIVE_4 = "aura_engine_line_in_4"
FALLBACK_FOLDER = "fallback_folder"
FALLBACK_PLAYLIST = "fallback_playlist"
def __str__(self):
return str(self.value)
class ChannelType(dict, Enum):
"""
Available channel types.
Engine channel types mapped to type of `PlaylistItem.source`.
"""
QUEUE = {"id": "fs", "numeric": 0, "channels": [ChannelName.QUEUE_A, ChannelName.QUEUE_B]}
HTTP = {"id": "http", "numeric": 1, "channels": [ChannelName.HTTP_A, ChannelName.HTTP_B]}
LIVE = {
"id": "live",
"numeric": 3,
"channels": [
ChannelName.LIVE_0,
ChannelName.LIVE_1,
ChannelName.LIVE_2,
ChannelName.LIVE_3,
ChannelName.LIVE_4,
],
}
FALLBACK_POOL = {
"id": "fallback_pool",
"numeric": 5,
"channels": [ChannelName.FALLBACK_FOLDER, ChannelName.FALLBACK_PLAYLIST],
}
@property
def channels(self):
"""
Retrieve all channels for given type.
"""
return self.value["channels"]
@property
def numeric(self):
"""
Retrieve numeric representation of channel type for given type.
"""
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class GenericChannel:
"""
Base class for channel implementations.
Attributes:
type (ChannelType): Type of channel such as queue, stream or line
name (ChannelName): Name of the channel as defined in Liquidsoap
index (int): Position of the channel on the mixer
ready (bool): Indicates if the channel is ready to be used
selected (bool): Indicates if the channel is selected
single (bool): ?
volume (int): Volume from -1 to 100, where -1 indicates an error
remaining (float): Seconds remaining to be played
"""
logger: logging.Logger
config: confuse.Configuration
type: ChannelType = None
name: ChannelName = None
index: int = None
ready: bool = None
selected: bool = None
single: bool = None
volume: int = None
remaining: float = None
def __init__(self, channel_index: int, channel_name: int, mixer):
"""
Initialize the channel instance.
Args:
channel_index (int): Index of the channel on the mixer
channel_name (ChannelName): Name of the channel
mixer (Mixer): The mixer instance
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
self.mixer = mixer
self.name = channel_name
self.index = channel_index
def get_index(self) -> int:
"""
Retrieve the channel index.
Returns:
int: The index
"""
return self.index
def get_type(self) -> ChannelType:
"""
Retrieve the `ChannelType`.
"""
return self.type
def get_status(self) -> dict:
"""
Get channel status information.
Returns:
dict: {
ready (bool): Indicates if the channel is ready to be used
selected (bool): Indicates if the channel is selected
single (bool): ?
volume (int): Volume from -1 to 100, where -1 indicates an error
remaining (float): Seconds remaining to be played
}
"""
return DotDict(
{
"ready": self.ready,
"selected": self.selected,
"single": self.single,
"volume": self.volume,
"remaining": self.remaining,
}
)
def set_status(self, ready: bool, selected: bool, single: bool, volume: int, remain: float):
"""
Set channel status information.
Args:
ready (bool): Indicates if the channel is ready to be used
selected (bool): Indicates if the channel is selected
single (bool): ?
volume (int): Volume from -1 to 100, where -1 indicates an error
remaining (float): Seconds remaining to be played
"""
self.ready = ready
self.selected = selected
self.single = single
self.volume = volume
self.remaining = remain
def load(self, metadata: dict = None) -> bool:
"""
Interface definition for loading a channel track.
Args:
uri (str): The URI to load
metadata (dict): Metadata to assign to the channel, when playing (optional)
Returns:
(bool): True if track loaded successfully
"""
if metadata:
json_meta = json.dumps(metadata, ensure_ascii=False)
return self.set_track_metadata(json_meta)
return True
def fade_in(self, volume: int, instant=False):
"""
Perform a fade-in for the given channel.
Args:
volume (int): Volume from -1 to 100, where -1 indicates an error
instant(bool): If true the fade instantly jumps to target volume
Returns:
(bool): True if fade successful
"""
if instant:
self.logger.info(SU.pink(f"Activate channel {self}"))
return self.mixer.activate_channel(self, True)
else:
self.logger.info(SU.pink(f"Fade in channel {self}"))
faded = self.mixer.select_channel(self, True)
selected = self.mixer.fade_in(self, volume)
self.mixer.set_active_channel(self)
return faded and selected
def fade_out(self, instant=False):
"""
Perform a fade-out for the given channel starting at its current volume.
Args:
instant(bool): If true the fade instantly jumps to zero volume
Returns:
(bool): True if fade successful
"""
if instant:
self.logger.info(SU.pink(f"Activate channel {self}"))
return self.mixer.activate_channel(self, False)
else:
self.logger.info(SU.pink(f"Fade out channel {self}"))
faded = self.mixer.fade_out(self)
selected = self.mixer.select_channel(self, False)
self.mixer.set_active_channel(None)
return faded and selected
def roll(self, seconds_to_roll):
"""
Fast-forward to a position in time within the queue track.
Most channels do not support this feature.
"""
return True
def __str__(self):
"""
String representation of the Channel.
"""
return f"[{self.index} : {self.name}]"
@private
def set_track_metadata(self, json_metadata: str) -> bool:
"""
Set the metadata as current track metadata on the given channel.
This is only needed for non-queue channels. They pass the metadata inline with their
request URIs
Args:
json_metadata(str): String containing metadata as JSON
Returns:
(bool): True if metadata successfully set
@private
"""
response = self.mixer.client.exec(self.name, "set_track_metadata", json_metadata)
msg = f"Response for '{self.name}.set_track_metadata': {response}"
self.logger.info(SU.pink(msg))
if PlayoutStatusResponse(response) != PlayoutStatusResponse.SUCCESS:
msg = f"Error while setting metadata on {self.name} to:\n{json_metadata}"
self.logger.error(SU.red(msg))
return False
return True
class QueueChannel(GenericChannel):
"""
Channel for queues such as a collection of filesystem URIs.
"""
def __init__(self, channel_index, channel_name, mixer):
"""
Initialize the queue channel instance.
Args:
mixer (Mixer): The mixer instance
channel_index (int): Channel index on the mixer
"""
self.type = ChannelType.QUEUE
super().__init__(channel_index, channel_name, mixer)
def load(self, uri: str = None, metadata: dict = None):
"""
Load the provided URI and pass metadata.
Does not load the `super` implementation, as queues have their individual approach to
pass metadata in the URI.
Args:
uri (str): The URI to load
metadata (dict): Metadata to assign to the channel, when playing (optional)
Returns:
(bool): True if track loaded successfully
"""
self.logger.info(SU.pink(f"{self.name}.push('{uri}')"))
if metadata:
uri = LU.annotate_uri(uri, metadata)
# this relies on the name in_queue_0, or in_queue_1
response = self.mixer.client.exec(self.name, "push", uri)
self.logger.debug(SU.pink(f"{self.name}.push result: {response}"))
# If successful, Liquidsoap returns a resource ID of the queued track
resource_id = -1
try:
resource_id = int(response)
except ValueError:
msg = SU.red(f"Got invalid resource ID: '{response}'")
self.logger.error(msg)
return False
return resource_id >= 0
def roll(self, seconds):
"""
Fast-forward to a position in time within the queue track.
Args:
seconds(int): How many seconds the FFWD should performed
Returns:
(bool): True after successful roll
"""
response = self.mixer.client.exec(self.name, "roll", str(seconds))
if response == "OK":
return True
return False
def fade_out(self, instant=False):
"""
Fade out channel and flush the queue.
Args:
instant (bool, optional): Instant volume change instead of fade. Defaults to False
Returns:
(bool): True after successful fade out
"""
response = super().fade_out(instant)
self.flush()
return response
@private
def flush(self):
"""
Remove all items from queue.
@private
"""
def flush_queue():
# Wait some moments, if there is some long fade-out. Note, this also
# means, this channel should not be used for at least some seconds
# (including clearing time).
clear_timeout = 5
msg = f"Clearing channel {self} in {clear_timeout} seconds"
self.logger.info(SU.pink(msg))
time.sleep(clear_timeout)
# Deactivate channel
response = self.mixer.client.exec(self.name, "clear")
msg = f"Cleared queue channel '{self.name}' with result '{response}'"
self.logger.info(SU.pink(msg))
Thread(target=flush_queue).start()
class StreamChannel(GenericChannel):
"""
Channel for audio stream input.
"""
def __init__(self, channel_index, channel_name, mixer):
"""
Initialize the queue channel instance.
Args:
mixer (Mixer): The mixer instance
channel_index (int): Channel index on the mixer
"""
self.type = ChannelType.HTTP
super().__init__(channel_index, channel_name, mixer)
def load(self, uri: str = None, metadata: dict = None):
"""
Load the given stream item and updates the playlist items's status codes.
Args:
uri (str): The URI to load
metadata (dict): Metadata to assign to the channel, when playing (optional)
Returns:
(bool): True if track loaded successfully
"""
self.logger.debug(SU.pink(f"Loading stream '{uri}'"))
self.stop()
self.set_url(uri)
self.start()
timeout = 10 # hardcoded: block related timers as short as possible
timeout_time = SU.timestamp() + timeout
while not self.is_ready(uri):
if SU.timestamp() > timeout_time:
msg = f"Load stream {uri} timed out"
raise LoadSourceException(msg)
time.sleep(1)
response = super().load(metadata)
return response
@private
def is_ready(self, url):
"""
Check if the stream on the given channel is ready to play.
Note this method is blocking some serious amount of time even when successful; hence it is
worth being called asynchronously.
Args:
channel (ChannelName): The stream channel
url (String): The stream URL
Returns:
(Boolean): `True` if successful
@private
"""
is_ready = True
response = self.mixer.client.exec(self.name, "status")
msg = f"{self.name}.status result: {response}"
self.logger.info(SU.pink(msg))
if PlayoutStatusResponse(response) != PlayoutStatusResponse.STREAM_STATUS_CONNECTED:
return False
lqs_url = response.split(" ")[1]
if not url == lqs_url:
msg = f"Wrong URL '{lqs_url}' set for channel '{self.name}', expected: '{url}'."
self.logger.error(msg)
is_ready = False
if is_ready:
stream_buffer = self.config.scheduler.input_stream.buffer
msg = f"Ready to play stream, but wait {stream_buffer} seconds to fill buffer..."
self.logger.info(SU.pink(msg))
time.sleep(round(float(stream_buffer)))
return is_ready
@private
def stop(self):
"""
Stop the stream.
"""
response = self.mixer.client.exec(self.name, "stop")
if PlayoutStatusResponse(response) != PlayoutStatusResponse.SUCCESS:
self.logger.error(SU.red(f"{self.name}.stop result: {response}"))
raise LoadSourceException("Error while stopping stream!")
return response
@private
def set_url(self, url):
"""
Set the stream URL.
"""
response = self.mixer.client.exec(self.name, "url", url)
if PlayoutStatusResponse(response) != PlayoutStatusResponse.SUCCESS:
self.logger.error(SU.red(f"{self.name}.url result: {response}"))
raise LoadSourceException("Error while setting stream URL!")
return response
@private
def start(self):
"""
Start the stream URL.
"""
response = self.mixer.client.exec(self.name, "start")
self.logger.info(SU.pink(f"{self.name}.start result: {response}"))
return response
class LineChannel(GenericChannel):
"""
Channel for line audio input.
"""
def __init__(self, channel_index, channel_name, mixer):
"""
Initialize the queue channel instance.
Args:
mixer (Mixer): The mixer instance
channel_index (int): Channel index on the mixer
"""
self.type = ChannelType.LIVE
super().__init__(channel_index, channel_name, mixer)
def load(self, uri: str = None, timeout: int = None, metadata: dict = None):
"""
Load the line channel.
Args:
uri (str): For line source the URI is always null
metadata (dict): Metadata to assign to the channel, when playing (optional)
Returns:
(bool): True if track loaded successfully
"""
response = super().load(metadata)
return response
class ChannelFactory:
"""
A factory to construct channels based on a given channel name.
"""
logger: logging.Logger
config: confuse.Configuration
def __init__(self, mixer):
"""
Initialize the channel factory.
Args:
mixer (Mixer): The mixer instance
"""
self.config = AuraConfig.instance
self.logger = logging.getLogger("engine")
self.mixer = mixer
def create_channel(self, channel_index, channel_name: ChannelName, mixer) -> GenericChannel:
"""
Create a channel with the provided details.
Depending on the given channel name, a different channel is instantiated.
Args:
channel_index (int): The index of the channel on the mixer
channel_name (ChannelName): The channel name as defined in Liquidsoap
mixer (Mixer): The mixer instance
Returns:
(GenericChannel): A concrete implementation of the generic channel
Raises:
ValueError: If no matching channel type is found
"""
if channel_name in ChannelType.QUEUE.channels:
self.logger.debug(f"Create new QUEUE channel '{channel_name}'")
return QueueChannel(channel_index, channel_name, mixer)
if channel_name in ChannelType.HTTP.channels:
self.logger.debug(f"Create new STREAM channel '{channel_name}'")
return StreamChannel(channel_index, channel_name, mixer)
if channel_name in ChannelType.LIVE.channels:
self.logger.debug(f"Create new LINE channel '{channel_name}'")
return LineChannel(channel_index, channel_name, mixer)
error_message = (
f"No valid ChannelType found for channel '{channel_name}'. Not working as intended."
)
self.logger.error(error_message)
raise ValueError(error_message)
class PlayoutStatusResponse(StrEnum):
"""
Response values indicating some status.
"""
SUCCESS = "OK"
STREAM_STATUS_POLLING = "polling"
STREAM_STATUS_STOPPED = "stopped"
STREAM_STATUS_CONNECTED = "connected"
INVALID = "invalid"
@classmethod
def _missing_(cls, value):
if value in ["Done", "Done!", "Donee!"]:
return cls.SUCCESS
if value.startswith(cls.STREAM_STATUS_CONNECTED):
return cls.STREAM_STATUS_CONNECTED
return cls.INVALID
class LoadSourceException(Exception):
"""
Exception thrown when some source could not be loaded or updated.
"""
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Message and connection handling to Engine Core (Liquidsoap).
"""
from __future__ import annotations
import logging
import socket
import urllib.parse
import confuse
import aura_engine.events as events
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import private, synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.mixer import Mixer
from aura_engine.resources import ResourceUtil
class CoreClient:
"""
Client managing communication with Engine Core (Liquidsoap).
"""
skip_log_commands = ("aura_engine.status", "mixer.volume")
instance: CoreClient | None = None
logger: logging.Logger
config: confuse.Configuration
connection = None
event_dispatcher = None
conn: CoreConnection
def __init__(self, event_dispatcher):
"""
Initialize the client.
"""
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
self.event_dispatcher = event_dispatcher
self.conn = CoreConnection()
@staticmethod
def get_instance(event_dispatcher) -> CoreClient:
"""
Get an instance of the client singleton.
"""
if not CoreClient.instance:
CoreClient.instance = CoreClient(event_dispatcher)
return CoreClient.instance
@synchronized
def connect(self):
"""
Open connection.
@synchronized
"""
try:
if not self.conn.is_connected():
self.conn.open()
except CoreConnectionError as e:
self.logger.critical(SU.red(str(e)))
@synchronized
def disconnect(self):
"""
Close the connection.
@synchronized
"""
if not self.conn.is_connected():
self.conn.close()
@synchronized
def exec(self, namespace: str, action: str, args: str = "") -> str:
"""
Execute a command.
Args:
namespace (str): The namespace for the command to execute.
action (str): The action to execute.
args (str, optional): Arguments passed with the action. Defaults to "".
Raises:
CoreConnectionError: Raised when there is a connection or communication error.
Returns:
str: result of the command (optional).
@synchronized
"""
response = None
if not self.conn.is_connected():
self.conn.open()
try:
command = self.build_command(namespace, action, args)
self.log_debug(command, f"[>>] {command}")
response = self.conn.send(command)
if response:
self.log_debug(command, f"[<<] {response}")
except CoreConnectionError as e:
msg = "Error while issuing command to Liquidsoap"
raise CoreConnectionError(msg, e)
return response
@private
def build_command(self, namespace: str, action: str, args: str) -> str:
"""
Construct a command string for sending to Liquidsoap.
Args:
namespace (str): The namespace for the command to execute.
action (str): The action to execute.
args (str, optional): Arguments passed with the action. Defaults to "".
Returns:
str: The command string
@private
"""
args = str(args).strip()
args = " " + urllib.parse.unquote(args) if args != "" else ""
namespace = str(namespace) + "." if namespace else ""
command = f"{namespace}{action}{args}"
return command
@private
def log_debug(self, command: str, log_message: str):
"""
Check if the command is excluded from debug logging.
This is meant to avoid log-pollution by status and fade commands.
@private
"""
if self.config.log.level == "debug":
cmds = CoreClient.skip_log_commands
base_cmd = command.split(" ")[0]
if not base_cmd.startswith(cmds):
self.logger.debug(log_message)
class PlayoutClient(CoreClient):
"""
Client managing communication with Engine Core (Liquidsoap).
"""
mixer: Mixer
def __init__(self, event_dispatcher: events.EngineEventDispatcher):
"""
Initialize the client.
"""
super().__init__(event_dispatcher)
self.mixer = Mixer("mixer", self)
@staticmethod
def get_instance(event_dispatcher: events.EngineEventDispatcher) -> CoreClient:
"""
Get an instance of the client singleton.
"""
if not PlayoutClient.instance:
PlayoutClient.instance = PlayoutClient(event_dispatcher)
return PlayoutClient.instance
# def get_mixer(self):
# """
# Get the mixer instance.
# """
# return self.mixer
#
# ns:*
def get_uptime(self) -> str:
"""
Get info on how long core is running already.
"""
return self.exec("", "uptime")
# ns:aura_engine
def get_version(self) -> str:
"""
Get JSON with version information.
"""
return self.exec("aura_engine", "version")
def get_status(self) -> str:
"""
Get engine status such as uptime and fallback mode.
"""
return self.exec("aura_engine", "status")
def set_config(self, json_config: str) -> str:
"""
Send JSON with configuration options to core.
"""
return self.exec("aura_engine", "update_config", json_config)
class CoreConnection:
"""
Handles connections and sends commands to Engine Core (Liquidsoap).
"""
ENCODING = "UTF-8"
logger: logging.Logger
config: confuse.Configuration
socket_path: str
socket: socket.socket
connected: bool
message: str
def __init__(self):
"""
Initialize the connection.
"""
self.logger = logging.getLogger("engine")
config = AuraConfig.instance
socket_path = config.config.general.socket_dir + "/engine.sock"
self.socket_path = ResourceUtil.to_abs_path(socket_path)
self.logger.debug(f"Using socket at '{self.socket_path}'")
self.connected = False
self.message = ""
def is_connected(self):
"""
Return `True` if a connection is established.
"""
return self.connected
def open(self):
"""
Connect to Liquidsoap socket.
Raises:
CoreConnectionError: Raised when there is a connection or communication
Error.
"""
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect(self.socket_path)
except FileNotFoundError as e:
msg = f"Socket file at '{self.socket_path}' not found. Is Liquidsoap running?"
self.connected = False
raise CoreConnectionError(msg, e)
except socket.error as e:
msg = f"Cannot connect to socket at '{self.socket_path}'"
self.connected = False
raise CoreConnectionError(msg, e)
except Exception as e:
msg = f"Unknown error while connecting to socket at '{self.socket_path}'"
self.connected = False
raise CoreConnectionError(msg, e)
else:
self.connection_attempts = 0
self.connected = True
def close(self):
"""
Send quit command and close connection.
"""
if self.connected:
message = "quit\r"
self.socket.sendall(message.encode(CoreConnection.ENCODING))
self.socket.close()
self.connected = False
def send(self, command: str) -> str:
"""
Send command to Liquidsoap.
Args:
command (str): The command string to be executed
Raises:
CoreConnectionError: Thrown when not connected
Returns:
str: Result of the command
"""
result = None
command += "\n"
try:
self.socket.sendall(command.encode())
result = self.read()
except BrokenPipeError as broken_pipe:
msg = "Broken Pipe while sending command"
self.logger.error(SU.red(msg), broken_pipe)
self.connected = False
raise CoreConnectionError(msg)
except TimeoutError as timeout_error:
msg = "Socket timeout while reading"
self.logger.error(SU.red(msg), timeout_error)
self.connected = False
raise CoreConnectionError(msg)
except Exception as e:
msg = "Unknown Error while sending command"
self.logger.error(SU.red(msg), e)
self.connected = False
raise CoreConnectionError(msg)
return str(result)
@private
def read_all(self, timeout: int = 2) -> str:
"""
Read data from the socket until `END` signal is received.
Args:
timeout (int, optional): Reading timeout in seconds. Defaults to 2.
Returns:
str: The response
@private
"""
data = ""
self.socket.settimeout(timeout)
while True:
data += self.socket.recv(1).decode(CoreConnection.ENCODING)
if data.find("END\r\n") != -1 or data.find("Bye!\r\n") != -1:
data.replace("END\r\n", "")
break
return data
@private
def read(self) -> str:
"""
Read from socket and store return value in `self.message` and return it.
Returns:
str: message read from socket
@private
"""
ret = self.read_all().splitlines()
last = ret.pop()
if last != "Bye!":
if len(ret) > 1:
self.message = str.join(" - ", ret)
elif len(ret) == 1:
self.message = ret[0]
else:
self.message = last
return self.message
class CoreConnectionError(Exception):
"""
Exception thrown when there is a connection problem with Liquidsoap.
"""
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
The virtual mixer as it is realized using Liquidsoap.
A mixer consists of general methods to adjust volume, enable/disable channels etc. It also has
set of channels with their own control options.
"""
from __future__ import annotations
import logging
import re
import time
from typing import TYPE_CHECKING
import confuse
from aura_engine.base.api import LiquidsoapUtil as LU
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import private
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import (
ChannelFactory,
ChannelName,
ChannelType,
GenericChannel,
)
if TYPE_CHECKING:
from aura_engine.core.client import PlayoutClient
class Mixer:
"""
A virtual mixer.
"""
logger: logging.Logger
config: confuse.Configuration
client: PlayoutClient
mixer_id: str
channels: dict
channel_names: list
active_channel_name: ChannelName | None = None
def __init__(self, mixer_id: str, client: PlayoutClient):
"""
Initialize the mixer.
It loads all the available channels from Liquidsoap and pulls the faders down to zero.
Args:
mixer_id (str): The ID of the mixer in Liquidsoap
client (PlayoutClient): The client for controlling playout
"""
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
self.mixer_id = mixer_id
self.client = client
self.channel_names = []
self.channels = {}
self.refresh_channels()
# TODO Graceful reboot: At some point the current track playing could
# resume inside Liquidsoap in case only Engine restarted (See #77).
for n in self.channel_names:
self.set_channel_volume(self.channels.get(n), 0)
def get_inputs(self) -> dict:
"""
Return the state of all mixer input channels.
"""
self.refresh_channels()
inputs = {}
for idx, channel in enumerate(self.channel_names):
inputs[channel] = self.get_channel_status(idx)
return inputs
def get_outputs(self) -> dict:
"""
Retrieve the state of all mixer outputs.
"""
outputs = self.client.exec(self.mixer_id, "outputs")
outputs = LU.json_to_dict(outputs)
return outputs
def get_channel(self, channel_name: ChannelName) -> GenericChannel:
"""
Retrieve a channel identified by name.
"""
if channel_name:
return self.channels.get(channel_name)
return None
def get_active_channel(self) -> GenericChannel | None:
"""
Retrieve the currently active channel.
"""
if self.active_channel_name:
return self.get_channel(self.active_channel_name)
return None
def set_active_channel(self, channel: GenericChannel):
"""
Set the currently active channel.
"""
if channel:
self.active_channel_name = channel.name
self.logger.info(SU.pink(f"Set active channel to '{channel}'"))
else:
self.active_channel_name = None
self.logger.info(SU.pink("Reset active channel"))
def get_free_channel(self, channel_type: ChannelType) -> GenericChannel:
"""
Return any _free_ channel of the given type.
A channel which is not currently active is seen as _free_.
Args:
channel_type (ChannelType): The type of channel to be retrieved
Returns:
(ChannelName, ChannelName): The active and next free channel of the requested type
"""
free_channel = []
active_channel = self.get_active_channel()
if active_channel and active_channel.type == channel_type:
free_channels = [c for c in channel_type.channels if c != self.active_channel_name]
if len(free_channels) < 1:
msg = f"Requesting channel of type '{channel_type}' but none free. \
Active channel: '{active_channel}'"
self.logger.critical(SU.red(msg))
else:
free_channel = free_channels[0]
else:
free_channel = channel_type.channels[0]
self.logger.info(SU.pink(f"Got free '{channel_type}' channel '{free_channel}'"))
return self.channels.get(free_channel)
def select_channel(self, channel: GenericChannel, select: bool) -> bool:
"""
Select or deselect some mixer channel.
Args:
channel (ChannelName): The channel number
select (Boolean): Select or deselect
Returns:
(String): Liquidsoap server response
"""
self.refresh_channels()
try:
if not self.channels:
self.logger.critical(SU.red("Cannot select channel cuz there are no channels"))
return False
index = channel.get_index()
select_liquidsoap = "true" if select else "false"
response = self.client.exec(self.mixer_id, "select", f"{index} {select_liquidsoap}")
self.update_channel_status(index, response)
return True
except Exception as e:
self.logger.critical(SU.red("Ran into exception when selecting channel"), e)
return False
def activate_channel(self, channel: GenericChannel, activate: bool) -> bool:
"""
Activate a channel.
Combined call of following to save execution time:
- Select some mixer channel
- Increase the volume to 100,
Args:
channel (ChannelName): The channel number
activate (bool): Activate or deactivate
Returns:
(str): Liquidsoap server response
"""
self.refresh_channels()
try:
if not self.channels:
self.logger.critical(SU.red("Cannot activate channel cuz there are no channels"))
else:
index = channel.get_index()
activate_liquidsoap = "true" if activate else "false"
response = self.client.exec(
self.mixer_id, "activate", f"{index} {activate_liquidsoap}"
)
if response == "OK":
return True
except Exception as e:
self.logger.critical(SU.red("Ran into exception when activating channel."), e)
return False
def fade_in(self, channel: GenericChannel, volume: int = 100) -> bool:
"""
Perform a fade-in for the given channel.
Args:
channel (GenericChannel): The channel to fade
volume (Integer): The target volume
Returns:
(bool): `True` if successful
TODO Think about using native Liquidsoap fading. This should bring better performance
and wanted separation of concerns.
"""
try:
current_volume = self.get_channel_volume(channel)
if current_volume == volume:
msg = f"Skip fade in of {channel}: Already at target volume of {volume}%"
self.logger.info(msg)
return True
elif current_volume > volume:
msg = f"Skip fade in of {channel}: Current volume of {current_volume}% exceeds \
target volume of {volume}%"
self.logger.info(msg)
return True
fade_in_time = self.config.scheduler.fade_in_time
if fade_in_time > 0:
self.fade_in_active = True
target_volume = volume
step = fade_in_time / target_volume
msg = f"Fade in of {channel} to {target_volume}% ({step}s steps)"
self.logger.debug(SU.pink(msg))
for i in range(target_volume):
self.set_channel_volume(channel, i + 1)
time.sleep(step)
msg = f"Fade in of {channel} done"
self.logger.debug(SU.pink(msg))
except Exception as e:
self.logger.critical(SU.red(e.message), e)
return False
return True
def fade_out(self, channel: GenericChannel) -> bool:
"""
Perform a fade-out for the given channel starting at its current volume.
Args:
channel (GenericChannel): The channel to fade
Returns:
(Boolean): `True` if successful
TODO Think about using native Liquidsoap fading. This should bring better performance
and wanted separation of concerns.
"""
try:
current_volume = self.get_channel_volume(channel)
if current_volume == 0:
msg = f"Channel {channel} already at target volume of 0%. SKIPPING..."
self.logger.info(msg)
return True
fade_out_time = self.config.scheduler.fade_out_time
if fade_out_time > 0:
step = abs(fade_out_time) / current_volume
msg = f"Start to fade out {channel} ({step}s step)"
self.logger.debug(SU.pink(msg))
for i in range(current_volume):
self.set_channel_volume(channel, current_volume - i - 1)
time.sleep(step)
msg = f"Finished fade out of {channel}"
self.logger.debug(SU.pink(msg))
except Exception as e:
self.logger.critical(SU.red(e.message), e)
return False
return True
#
# Private Methods
#
@private
def refresh_channels(self):
"""
Retrieve all mixer channel names and create channel instances, if not available.
@private
"""
def create_channel(name) -> int:
self.logger.debug(f"Set new channel name '{name}'")
self.channel_names.append(name)
idx = self.channel_names.index(name)
channel = ChannelFactory.create_channel(self, idx, name, self)
self.channels[name] = channel
return idx
if not self.channel_names:
# Get channel names
# here we expect something like "in_queue_0, in_queue_1, ..."
channel_names = self.client.exec(self.mixer_id, "inputs")
channel_names = channel_names.split(" ")
channel_names = [channel.split(".")[0] for channel in channel_names]
# Create channels objects if not yet available
for name in channel_names:
try:
self.channel_names.index(name)
except ValueError:
idx = create_channel(name)
status = self.get_channel_status(idx)
msg = f"Channel {self.get_channel(name)} status: {status}"
self.logger.info(SU.pink(msg))
@private
def get_channel_number(self, channel_name: ChannelName) -> int:
"""
Return the channel number for the given channel name.
Args:
channel (ChannelName): The channel
Returns:
(Integer): The channel number
@private
"""
self.refresh_channels()
index = self.channel_names.index(channel_name)
if index < 0:
msg = f"There's no valid channel number for channel ID '{channel_name}'"
self.logger.critical(SU.red(msg))
return -1
return index
@private
def get_channel_status(self, channel_number: int) -> dict:
"""
Retrieve the status of a channel identified by the channel number.
Args:
channel_number (int): The channel number
Returns:
(dict): Channel status dictionary
@private
"""
response = self.client.exec(self.mixer_id, "status", str(channel_number))
return self.update_channel_status(channel_number, response)
@private
def update_channel_status(self, channel_number: int, status_string: str):
"""
Update channel status.
Args:
channel_number (int): the index of the channel on the mixer
status_string (dict): channel status fields as single string
@private
"""
status = {}
pairs = status_string.split(" ")
for pair in pairs:
kv = pair.split("=")
status[kv[0]] = kv[1]
channel_name = self.channel_names[channel_number]
channel: GenericChannel = self.channels.get(channel_name)
channel.set_status(
bool(status.get("ready")),
bool(status.get("selected")),
bool(status.get("single")),
int(status.get("volume").split("%")[0]),
float(status.get("remaining")),
)
return channel.get_status()
@private
def get_channel_volume(self, channel: GenericChannel) -> int:
"""
Retrieve the current volume of the channel.
Args:
channel (GenericChannel): The channel
Returns:
(int): Volume between 0 and 100 or -1 in case of an error
@private
"""
status = self.get_channel_status(channel.get_index())
return status.volume
@private
def set_channel_volume(self, channel: GenericChannel, volume: int):
"""
Set volume of a channel.
Args:
channel (GenericChannel): The channel
volume (int): Volume between 0 and 100
@private
"""
self.refresh_channels()
playout_volume = str(int(volume) / 100)
args = f"{channel.get_index()} {playout_volume}"
message = self.client.exec(self.mixer_id, "volume", args)
search = re.search(r"volume=(\d+)", message).group(1)
actual_volume = int(search)
if actual_volume != volume:
msg = f"Error setting volume of channel {channel} to {volume}%: {message}"
self.logger.error(SU.red(msg))
else:
self.update_channel_status(channel.index, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
The Engine.
"""
from __future__ import annotations
import json
import logging
import time
from contextlib import suppress
from enum import Enum
import confuse
import tomli
import aura_engine.events as events
import aura_engine.scheduling.scheduler as scheduler # noqa
import aura_engine.scheduling.timetable as timetable # noqa
from aura_engine.base.api import LiquidsoapUtil as LU
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import DotDict
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import ChannelType, QueueChannel
from aura_engine.core.client import CoreConnectionError, PlayoutClient
from aura_engine.core.mixer import Mixer
from aura_engine.resources import (
ResourceClass,
ResourceMapping,
ResourceType,
ResourceUtil,
)
from aura_engine.scheduling.domain import PlaylistItem
class Engine:
"""
The Engine.
"""
instance: Engine = None
logger: logging.Logger
config: confuse.Configuration
engine_time_offset = 0.0
scheduler: scheduler.AuraScheduler = None # noqa: F811
event_dispatcher: events.EngineEventDispatcher = None
playout: PlayoutClient | None = None
playout_state = None
def __init__(self):
"""
Initialize the engine.
"""
if Engine.instance:
raise Exception("Engine is already running!")
Engine.instance = self
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
Engine.engine_time_offset = self.config.scheduler.audio.engine_latency_offset
def start(self):
"""
Start the engine.
Called when the connection to the sound-system implementation has been established.
"""
from aura_engine.plugins.clock import ClockInfoHandler
from aura_engine.plugins.monitor import AuraMonitor
self.event_dispatcher = events.EngineEventDispatcher(self)
# Subscribe event handlers to event dispatcher events
binding = self.event_dispatcher.attach(AuraMonitor)
binding.subscribe("on_boot")
binding.subscribe("on_sick")
binding.subscribe("on_resurrect")
binding = self.event_dispatcher.attach(ClockInfoHandler)
binding.subscribe("on_play")
binding.subscribe("on_fallback_active")
from aura_engine.scheduling.scheduler import AuraScheduler
scheduler = AuraScheduler(self)
self.event_dispatcher.initialize(scheduler)
while not self.is_connected():
self.logger.info(SU.yellow("Waiting for Liquidsoap to be running ..."))
time.sleep(2)
self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap"))
self.event_dispatcher.on_boot()
self.logger.info(EngineSplash.splash_screen())
self.event_dispatcher.on_ready()
#
# Basic Methods
#
def is_connected(self):
"""
Check if there's a valid connection to Liquidsoap.
"""
has_connection = False
try:
if not self.playout:
self.playout = PlayoutClient.get_instance(self.event_dispatcher)
self.playout.get_uptime()
self.logger.info(SU.green("Initialize Player..."))
self.player = Player(self.playout, self.event_dispatcher)
has_connection = True
except CoreConnectionError:
self.logger.debug("Liquidsoap is not running so far")
except Exception as e:
self.logger.error(f"Cannot check if Liquidsoap is running. \nReason: {e}", e)
return has_connection
def update_playout_state(self):
"""
Retrieve the state of all inputs and outputs.
"""
state = self.playout.get_status()
state = DotDict(LU.json_to_dict(state))
def dispatch_fallback_event():
timetable: timetable.TimetableService = self.scheduler.get_timetable() # noqa: F811
timeslot = timetable.get_current_timeslot()
self.event_dispatcher.on_fallback_active(timeslot)
# Initialize state
if not self.playout_state:
if state.is_fallback:
self.logger.info(SU.yellow("Set initial playout state to FALLBACK"))
dispatch_fallback_event()
else:
self.logger.info(SU.green("Set initial playout state"))
elif state and self.playout_state.is_fallback != state.is_fallback:
if state.is_fallback:
self.logger.info(SU.yellow("Playout turned into FALLBACK state"))
else:
self.logger.info(SU.green("Playout turned back into normal state"))
self.playout_state = state
return self.playout_state
def get_playout_state(self):
"""
Retrieve the state of Engine Core.
"""
return self.playout_state
def init_version(self) -> dict:
"""
Get the versions of Engine components and store in configuration.
Returns:
dict: {
"control": Engine Control version
"core": Engine Core version
"liquidsoap": Liquidsoap version
}
"""
with open("pyproject.toml", mode="rb") as config:
toml_file = tomli.load(config)
ctrl_version = toml_file["tool"]["poetry"]["version"]
versions = self.playout.get_version()
versions = DotDict(json.loads(versions))
versions.control = ctrl_version
AuraConfig.instance.init_version(versions)
@staticmethod
def engine_time():
"""
Get the engine perspective on time.
Liquidsoap is slow in executing commands, therefore it's needed to schedule
actions by (n) seconds in advance, as defined in the configuration file by
the property `engine_latency_offset`. it's important to note that this method
requires the class variable `EngineUtil.engine_time_offset` to be set on
Engine initialization.
Returns:
(Integer): the Unix epoch timestamp including the offset
"""
return SU.timestamp() + Engine.engine_time_offset
@staticmethod
def get_instance() -> Engine:
"""
Return the one and only engine.
Create an instance, if it doesn't exist yet.
Returns:
(Engine): The engine instance.
"""
if not Engine.instance:
Engine.instance = Engine()
return Engine.instance
def terminate(self):
"""
Terminate the engine and all related processes.
"""
if self.scheduler:
self.scheduler.terminate()
# TODO terminate core connection
#
# PLAYER
#
class Player:
"""
Engine Player.
"""
class TransitionType(str, Enum):
"""
Types for instant and fade transitions.
"""
INSTANT = "instant"
FADE = "fade"
logger: logging.Logger
config: confuse.Configuration
channels = None
resource_map: ResourceMapping
event_dispatcher: events.EngineEventDispatcher
mixer: Mixer
def __init__(self, playout: PlayoutClient, event_dispatcher: events.EngineEventDispatcher):
"""
Initialize the player.
Args:
playout (PlayoutClient): Client for connecting to Engine Core (Liquidsoap)
event_dispatcher (EventDispatcher): Dispatcher for issuing events
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
self.event_dispatcher = event_dispatcher
self.resource_map = ResourceMapping()
# self.mixer = playout.get_mixer()
self.mixer = Mixer("mixer", playout)
def preload(self, item: PlaylistItem):
"""
Pre-load the item. This is required before the actual `play(..)` can happen.
Be aware when using this method in order to queue a very short item this may
result in situations with incorrect timing. In this case bundle multiple short
items as one queue using `preload_playlist(self, items)`.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
the `item.play` state object.
Args:
item (PlaylistItem): An array holding filesystem items
"""
is_queue = item.get_content_type() in ResourceClass.FILE.types
metadata = ResourceUtil.generate_track_metadata(item, not is_queue)
self.logger.debug(SU.pink(f"Loading item '{item}'"))
uri = None
is_ready = False
# LINE
if item.get_content_type() in ResourceClass.LIVE.types:
channel_name = self.resource_map.live_channel_for_resource(item.source)
chosen_channel = self.mixer.get_channel(channel_name)
else:
channel_type = self.resource_map.type_for_resource(item.get_content_type())
chosen_channel = self.mixer.get_free_channel(channel_type)
# QUEUE
if is_queue:
uri = ResourceUtil.source_to_filepath(item.source)
# STREAM
elif item.get_content_type() in ResourceClass.STREAM.types:
uri = item.source
if not chosen_channel:
self.logger.critical(SU.red("No channel for '{item.source}' source found"))
else:
msg = f"Assign channel {chosen_channel} to item"
item.play.set_loading(chosen_channel)
self.logger.info(SU.pink(msg))
is_ready = item.play.channel.load(uri, metadata=metadata)
if is_ready:
item.play.set_ready()
self.event_dispatcher.on_queue([item])
def preload_group(self, items: list):
"""
Preload multiple filesystem/queue items at once.
This call is required before the actual `play(..)` can happen. Due to their nature,
non-filesystem items cannot be queued using this method. In this case use
`preload(self, item)` instead. This method also allows queuing of very short files, such
as jingles.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
`item.state`.
Args:
items ([PlaylistItem]): list of playlist items.
"""
free_channel = self.mixer.get_free_channel(ChannelType.QUEUE)
# Validate item type
for item in items:
if item.get_content_type() != ResourceType.FILE:
raise InvalidChannelException
# Determine channel & queue items
item: PlaylistItem
for item in items:
item.play.set_loading(free_channel)
self.logger.debug(SU.pink(f"Loading item '{item}'"))
# Choose and save the input channel
metadata = ResourceUtil.generate_track_metadata(item)
file_path = ResourceUtil.source_to_filepath(item.source)
if item.play.channel.load(file_path, metadata):
item.play.set_ready()
self.event_dispatcher.on_queue(items)
def play(self, item: PlaylistItem, transition: TransitionType):
"""
Play a new `PlaylistItem`.
In case of a new timeslot (or some intended, immediate transition), a prepared channel is
selected and transitions between old and new channel is performed.
This method expects that the item is pre-loaded using `preload(..)` or
`preload_group(self, items)` before being played. In case the pre-loading has happened
for a group of items, only the first item of the group needs to be passed.
Args:
item (PlaylistItem): The audio source to be played
transition (Player.TransitionType): The type of transition to use e.g. fade-in or
instant volume level.
queue (Boolean): If `True` the item is queued if the `ChannelType` does allow so;
otherwise a new channel of the same type is activated
TODO The command `item.play.set_playing()` sets the playlist item's status to _playing_
and assigns the current play start time. This is required for resource like line-in.
Other source like audio files have their start marker natively assigned by Liquidsoap.
These timestamp represent the actual real start time and should somehow be retrieved
from Liquidsoap, in order to also have the _real deal_ in Engine.
"""
with suppress(CoreConnectionError):
# Stop any active channel
self.stop(Player.TransitionType.FADE)
# Start the new channel
item.play.set_playing()
instant = not (transition == Player.TransitionType.FADE)
item.play.channel.fade_in(item.volume, instant=instant)
# Dispatch event
self.event_dispatcher.on_play(item)
def stop(self, transition: TransitionType):
"""
Stop the currently active channel either instantly or by fading out.
Args:
transition (Player.TransitionType): The type of transition to use e.g. fade-out
"""
with suppress(CoreConnectionError):
channel = self.mixer.get_active_channel()
if not channel:
self.logger.info(SU.pink("Nothing to stop, no active channel"))
return
instant = not (transition == Player.TransitionType.FADE)
channel.fade_out(instant=instant)
self.event_dispatcher.on_stop(channel)
def roll(self, channel: QueueChannel, seconds: int) -> bool:
"""
Pre-roll the player of the given `ChannelType.QUEUE` channel by (n) seconds.
Args:
channel (ChannelName): The channel to push the file to
seconds (Float): The seconds to roll
Returns:
(bool): True after successful rolling
"""
return channel.roll(seconds)
class InvalidChannelException(Exception):
"""
Exception thrown when the given channel is invalid.
"""
pass
class EngineSplash:
"""Print the splash and version information on boot."""
@staticmethod
def splash_screen():
"""
Print the engine logo and version info.
"""
version = AuraConfig.instance.confuse_config["version_control"].get()
core_version = AuraConfig.instance.confuse_config["version_core"].get()
liq_version = AuraConfig.instance.confuse_config["version_liquidsoap"].get()
return f"""\n
█████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗
██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝
███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗
██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝
██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗
╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝
control v{version}, core v{core_version}, liquidsoap v{liq_version} - Ready to play!
\n"""
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Event binding and propagation.
"""
from __future__ import annotations
import logging
from threading import Thread
from typing import TYPE_CHECKING, Type
import confuse
from aura_engine.base.config import AuraConfig
from aura_engine.scheduling.domain import PlaylistItem
from aura_engine.scheduling.scheduler import AuraScheduler
if TYPE_CHECKING:
from aura_engine.core.channels import GenericChannel
from aura_engine.engine import Engine
from aura_engine.scheduling.domain import Timeslot
class EventBinding:
"""
A binding between the event dispatcher and some event handler.
It allows you to subscribe to events in a chained way:
```
binding = dispatcher.attach(AuraMonitor)
binding.subscribe("on_boot").subscribe("on_play")
```
"""
dispatcher: EngineEventDispatcher
instance = None
def __init__(self, dispatcher: EngineEventDispatcher, instance):
"""
Initialize the dispatcher.
"""
self.dispatcher = dispatcher
self.instance = instance
def subscribe(self, event_type: str):
"""
Subscribe the the instance to some event identified by the `event_type` string.
"""
self.dispatcher.subscribe(self.instance, event_type)
return self
def get_instance(self):
"""
Return the object within that binding.
"""
return self.instance
class EngineEventDispatcher:
"""
Execute handlers for engine events.
"""
logger: logging.Logger
config: confuse.Configuration
subscriber_registry: dict
engine: Engine | None = None
scheduler: AuraScheduler
# FIXME: is monitor even used?
# monitor = None
def __init__(self, engine: Engine):
"""
Initialize the event dispatcher.
Args:
engine (Engine): The engine instance.
"""
self.subscriber_registry = dict()
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
self.engine = engine
#
# Methods
#
def attach(self, clazz: Type) -> EventBinding:
"""
Create an instance of given class and bind it to the dispatcher.
Args:
clazz (class): The class to bind with.
Returns:
(EventBinding): The binding.
"""
instance = clazz(self.engine)
return EventBinding(self, instance)
def subscribe(self, instance, event_type: str):
"""
Subscribe to some event type.
Preferably use it via `EventBinding.subscribe(..)`.
"""
if event_type not in self.subscriber_registry:
self.subscriber_registry[event_type] = []
self.subscriber_registry[event_type].append(instance)
def call_event(self, event_type, *args):
"""
Call all subscribers for the given event type.
"""
if event_type not in self.subscriber_registry:
return
listeners = self.subscriber_registry[event_type]
if not listeners:
return
for listener in listeners:
method = getattr(listener, event_type)
if method:
if args and len(args) > 0:
method(*args)
else:
method()
def initialize(self, scheduler: AuraScheduler):
"""
Call when the engine is initialized, just before it is ready.
Important: Subsequent events are called synchronously, hence blocking.
Args:
scheduler (AuraScheduler): The scheduler to be started
"""
self.logger.debug("on_initialized(..)")
self.scheduler = scheduler
self.scheduler.boot()
self.call_event("on_initialized", None)
#
# Events
#
def on_boot(self):
"""
Call when the engine is starting up.
This happens after the initialization step. Connection to Liquidsoap should be available
here.
Important: Subsequent events are called synchronously, hence blocking.
"""
self.logger.debug("on_boot(..)")
self.call_event("on_boot")
def on_ready(self):
"""
Call when the engine has finished booting and is ready to play.
"""
def func(self: EngineEventDispatcher, param):
self.logger.debug("on_ready(..)")
self.scheduler.on_ready()
self.call_event("on_ready", param)
thread = Thread(target=func, args=(self, None))
thread.start()
def on_timeslot_start(self, timeslot: Timeslot):
"""
Call when a timeslot starts.
"""
def func(self: EngineEventDispatcher, timeslot: Timeslot):
self.logger.debug("on_timeslot_start(..)")
self.call_event("on_timeslot_start", timeslot)
thread = Thread(target=func, args=(self, timeslot))
thread.start()
def on_timeslot_end(self, timeslot: Timeslot):
"""
Call when a timeslot ends.
"""
def func(self: EngineEventDispatcher, timeslot: Timeslot):
self.logger.debug("on_timeslot_end(..)")
self.call_event("on_timeslot_end", timeslot)
thread = Thread(target=func, args=(self, timeslot))
thread.start()
def on_play(self, item: PlaylistItem):
"""
Call by the engine when some play command to Liquidsoap is issued.
This does not indicate that Liquidsoap started playing actually, only that the command has
been issued. To get the metadata update issued by Liquidsoap use `on_metadata` instead.
This event is not issued when media is played by Liquidsoap in fallback scenarios.
Args:
item (PlaylistItem): The item to play
"""
def func(self: EngineEventDispatcher, item: PlaylistItem):
self.logger.debug("on_play(..)")
self.call_event("on_play", item)
thread = Thread(target=func, args=(self, item))
thread.start()
def on_stop(self, channel: GenericChannel):
"""
Call when the passed channel has stopped playing.
"""
def func(self: EngineEventDispatcher, channel: GenericChannel):
self.logger.debug("on_stop(..)")
self.call_event("on_stop", channel)
thread = Thread(target=func, args=(self, channel))
thread.start()
def on_fallback_active(self, timeslot: Timeslot):
"""
Call when a fallback is activated.
Fallback means the station fallback audio source is played when:
1. No timeslot is available. Due to "virtual timeslots" this case can actually never
happen.
2. The current timeframe of the active timeslot provides no audio source/playlists.
This is always the case with virtual timeslots, but can also happen with ordinary
timeslots.
Args:
timeslot (Timeslot): The active timeslot, if available. Can be `None`.
"""
def func(self: EngineEventDispatcher, timeslot: Timeslot):
self.logger.debug("on_fallback_active(..)")
self.call_event("on_fallback_active", timeslot)
thread = Thread(target=func, args=(self, timeslot))
thread.start()
def on_queue(self, items: list):
"""
Call when one or more playlist items have been queued and are currently being pre-loaded.
Args:
items (list): List of playlist items.
"""
def func(self: EngineEventDispatcher, items: list):
self.logger.debug("on_queue(..)")
self.call_event("on_queue", items)
thread = Thread(target=func, args=(self, items))
thread.start()
def on_sick(self, data: dict):
"""
Call when the engine is in some unhealthy state.
"""
def func(self: EngineEventDispatcher, data: dict):
self.logger.debug("on_sick(..)")
self.call_event("on_sick", data)
thread = Thread(target=func, args=(self, data))
thread.start()
def on_resurrect(self, data: dict):
"""
Call when the engine turned healthy again after being sick.
"""
def func(self: EngineEventDispatcher, data: dict):
self.logger.debug("on_resurrect(..)")
self.call_event("on_resurrect", data)
thread = Thread(target=func, args=(self, data))
thread.start()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2022 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Update the clock information stored in Engine API.
TODO Refactor: This whole class is subject to refactoring, since we are thinking about
delegating clock information to steering as a provider. This makes especially sense,
after Steering is providing "virtual timeslots", which currently need to be interpolated
at multiple places. @see aura#312
TODO Refactor: It should be reviewed if it's worth storing all the current, additional
metadata to Engine API.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import confuse
import aura_engine.scheduling.domain as domain
import aura_engine.scheduling.timetable as timetable
from aura_engine.base.api import SimpleRestApi
from aura_engine.base.config import AuraConfig
from aura_engine.resources import ResourceUtil
if TYPE_CHECKING:
from aura_engine.engine import Engine
class ClockInfoHandler:
"""
Send current studio clock information to the Engine API endpoint.
"""
logger: logging.Logger
config: confuse.Configuration
api = None
engine: Engine | None = None
def __init__(self, engine: Engine):
"""
Initialize.
"""
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
self.api = SimpleRestApi()
self.engine = engine
if not self.engine:
self.logger.critical("ClockInfoHandler | Engine instance not available!")
return
def on_scheduler_cycle(self, timetable: timetable.TimetableService):
"""Call when a new scheduling cycle has been started."""
# TODO clock updates should also be triggered on scheduler update cycles
# This will be part of the coming "default station playlist" logic
pass
def on_fallback_active(self, timeslot: domain.Timeslot):
"""
Call when a fallback is activated.
Args:
timeslot (Timeslot): The active timeslot.
"""
if self.engine is None:
return
scheduler = self.engine.scheduler
upcoming_timeslots: list[domain.Timeslot] = scheduler.timetable.get_next_timeslots()
self.logger.info(f"Fallback activated within timeslot '{timeslot}'")
self.post_clock_info(timeslot, None, upcoming_timeslots)
def on_play(self, item: domain.PlaylistItem):
"""
Event Handler which is called by the engine when some play command to Liquidsoap is issued.
This does not indicate that Liquidsoap started playing actually, only that the command has
been issued. Naturally this happens slightly before single tracks play audio.
This event is not issued when media is played by Liquidsoap in fallback scenarios.
Args:
item (PlaylistItem):
"""
if self.engine is None:
return
timetable: timetable.TimetableService = self.engine.scheduler.timetable
active_timeslot: domain.Timeslot = item.get_playlist().get_timeslot()
active_playlist: domain.Playlist = active_timeslot.get_current_playlist()
# TODO Check if there could be a timing issue between active timeslot and upcoming ones
upcoming_timeslots = timetable.get_next_timeslots()
self.post_clock_info(active_timeslot, active_playlist, upcoming_timeslots)
def post_clock_info(
self,
active_timeslot: domain.Timeslot,
active_playlist: domain.Playlist | None,
upcoming_timeslots: list[domain.Timeslot],
):
"""
Post current information on timeslots and playlist to the Engine API clock endpoint.
"""
if len(upcoming_timeslots) >= 2:
upcoming_timeslots = upcoming_timeslots[0:2]
built_upcoming = []
for upcoming_timeslot in upcoming_timeslots:
built_upcoming.append(self.build_timeslot(upcoming_timeslot, None))
data = {
"engineSource": self.config.api.engine.number,
"currentTimeslot": self.build_timeslot(active_timeslot, active_playlist),
"upcomingTimeslots": built_upcoming,
"plannedPlaylist": self.build_playlist(active_playlist),
}
url = self.config.api.engine.store_clock
self.logger.info(f"PUT clock info to '{url}': \n{data}")
self.api.put(url, data=data)
def build_timeslot(
self, timeslot: domain.Timeslot, playlist: domain.Playlist | None = None
) -> dict:
"""
Transform a `Timeslot` object to a dictionary digestable by the clock API endpoint.
The posted `playlistType` integer indicates if the playlist was assigned on
(-1) fallback level (no playlist scheduled)
(0) timeslot level (default)
(1) schedule level
(2) show level
(3) station level
Args:
timeslot (Timeslot): The timeslot used for object building.
playlist (Playlist): The optional playlist used for object building.
Returns:
timeslot (dict): JSON representation of the timeslot
"""
if not timeslot:
return
playlist_id = -1
playlist_type = -1
if playlist:
playlist_id = playlist.get_id()
type = playlist.get_type()
if type == domain.PlaylistType.TIMESLOT:
playlist_type = 0
elif type == domain.PlaylistType.SCHEDULE:
playlist_type = 1
elif type == domain.PlaylistType.SHOW:
playlist_type = 2
else:
playlist_type = 3
show_id = -1
show_name = None
if timeslot.show:
show_id = timeslot.show.id
show_name = timeslot.show.name
return {
"timeslotId": timeslot.get_id(),
"timeslotStart": timeslot.get_start(),
"timeslotEnd": timeslot.get_end(),
"showId": show_id,
"showName": show_name,
"playlistId": playlist_id,
"playlistType": playlist_type,
}
def build_playlist(self, playlist: domain.Playlist) -> dict:
"""
Transform a `Playlist` object to a dictionary digestable by the clock API endpoint.
Args:
playlist (Playlist): The playlist domain object.
Returns:
({}): The playlist as dict object for JSON representation.
"""
if not playlist:
return None
items = []
item: domain.PlaylistItem
for item in playlist.items:
content_class = ResourceUtil.get_content_class(item.get_content_type())
track_type = int(content_class.numeric)
dict_item = {
"trackType": track_type,
"trackStart": item.get_start(),
"trackNum": item.get_position(),
"trackDuration": item.duration,
}
if item.metadata:
dict_item |= {
"trackArtist": item.metadata.artist,
"trackAlbum": item.metadata.album,
"trackTitle": item.metadata.title,
}
items.append(dict_item)
pls_dict = {"playlistId": playlist.get_id(), "entries": items}
return pls_dict
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Monitor the scheduling and playout status.
In case of irregularities:
- Send heartbeats
- Send health updates to Engine API
- Log health updates
"""
import datetime
import json
import logging
import os
import platform
import threading
from enum import Enum
from socket import AF_INET, SO_BROADCAST, SOCK_DGRAM, SOL_SOCKET, socket
import confuse
import requests
from aura_engine.base.api import SimpleRestApi
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.resources import ResourceUtil
class EngineMalfunctionException(Exception):
"""
Exception thrown when Engine turns into an status invalid for proper playout.
"""
pass
class MonitorResponseCode(Enum):
"""
Enumeration with status types.
"""
OK = "OK"
INVALID_STATE = "INVALID-STATE"
class AuraMonitor:
"""Monitoring of the Engine.
It is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
api: SimpleRestApi
logger: logging.Logger
config: confuse.Configuration
engine = None
status = None
already_invalid = None
engine_id = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, engine):
"""
Initialize Monitoring.
"""
self.api = SimpleRestApi()
self.logger = logging.getLogger("engine")
self.config = AuraConfig.instance.config
self.engine = engine
self.status = dict()
self.status["engine"] = dict()
self.status["lqs"] = dict()
self.status["api"] = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
self.status["api"]["engine"] = dict()
self.already_invalid = False
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = self.config.monitoring.heartbeat.host
self.heartbeat_port = self.config.monitoring.heartbeat.port
self.heartbeat_frequency = self.config.monitoring.heartbeat.frequency
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
self.engine_id = self.get_engine_id()
def __del__(self):
self.heartbeat_socket.close()
#
# EVENTS
#
def on_boot(self):
"""
Call when the engine is booting.
"""
# Start Monitoring
is_valid = self.has_valid_status(False)
status = self.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not is_valid:
self.logger.info("Engine Status: " + SU.red(status["engine"]["status"]))
self.post_health(status, False)
raise EngineMalfunctionException
else:
self.logger.info("Engine Status: " + SU.green("[OK]"))
self.post_health(status, True)
def on_sick(self, data: dict):
"""
Call when the engine is in some unhealthy state.
"""
self.post_health(data, False)
def on_resurrect(self, data: dict):
"""
Call when the engine turned healthy again after being sick.
"""
self.post_health(data, True)
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieve the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Check if the current status is valid to run engine.
By default it does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the `update_vital` parameter.
Args:
update_vitality_only (bool): Refreshes only the vital parts required for the heartbeat
"""
is_valid = False
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
# liquidsoap mixer is using in_queue_0
if (
self.status["lqs"]["available"]
and self.status["lqs"]["mixer"]["in_queue_0"]
and self.status["audio_source"]["exists"]
):
self.status["engine"]["status"] = MonitorResponseCode.OK.value
is_valid = True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return is_valid
#
# PRIVATE METHODS
#
def post_health(self, data, is_healthy):
"""
Post unhealthy state info to Engine API.
"""
body = dict()
body["logTime"] = datetime.datetime.now()
body["isHealthy"] = is_healthy
body["details"] = json.dumps(data, default=str)
json_data = json.dumps(body, default=str)
timeout = 5
url = self.config.api.engine.store_health
url = url.replace("${ENGINE_NUMBER}", str(self.config.api.engine.number))
headers = {"content-type": "application/json"}
response = requests.Response()
response.status_code = 404
try:
response = requests.post(url, data=json_data, headers=headers, timeout=timeout)
if response.status_code == 204:
self.logger.info(
"Successfully posted healthy=%s state to Engine API!" % is_healthy
)
else:
msg = SU.red(
f"HTTP {response.status_code} | Error while pushing health state to Engine"
f" API: {response.json()}"
)
self.logger.error(msg)
except requests.exceptions.ConnectionError:
self.logger.error(SU.red(f"Bad Request when posting health-status to {url}"))
return response
except requests.exceptions.Timeout:
self.logger.error(SU.red(f"Timeout when posting health-status to {url}"))
return response
except requests.exceptions.RequestException:
self.logger.error(SU.red(f"Unknown Exception when posting health-status to {url}"))
return response
def update_status(self):
"""
Request the current status of all components.
"""
self.engine.init_version()
ctrl_version = AuraConfig.instance.confuse_config["version_control"].get()
core_version = AuraConfig.instance.confuse_config["version_core"].get()
liq_version = AuraConfig.instance.confuse_config["version_liquidsoap"].get()
self.status["engine"]["version"] = ctrl_version
self.status["lqs"]["version"] = {"core": core_version, "liquidsoap": liq_version}
self.status["lqs"]["outputs"] = self.engine.player.mixer.get_outputs()
self.status["lqs"]["mixer"] = self.engine.player.mixer.get_inputs()
self.status["api"]["steering"]["url"] = self.config.api.steering.status
self.status["api"]["steering"]["available"] = self.validate_url_connection(
self.config.api.steering.status
)
self.status["api"]["tank"]["url"] = self.config.api.tank.status
self.status["api"]["tank"]["available"] = self.validate_url_connection(
self.config.api.tank.status
)
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.api.tank.status)
self.status["api"]["engine"]["url"] = self.config.api.engine.status
self.status["api"]["engine"]["available"] = self.validate_url_connection(
self.config.api.engine.status
)
self.update_vitality_status()
def update_vitality_status(self):
"""
Refresh the vital status info which are required for the engine to survive.
"""
self.status["lqs"]["status"] = self.engine.update_playout_state()
self.status["lqs"]["available"] = self.status["lqs"]["status"] is not None
self.status["audio_source"] = self.validate_directory(ResourceUtil.audio_store_path())
# After first update start the Heartbeat Monitor
if not self.heartbeat_running:
self.heartbeat_running = True
self.heartbeat()
def heartbeat(self):
"""
Send heartbeat tick.
Every `heartbeat_frequency` seconds the current vitality status is checked. If it is okay,
a heartbeat is sent to the configured server.
"""
if self.has_valid_status(True):
# Always check status, but only send heartbeat if wanted so
if self.config.monitoring.heartbeat.host != "":
self.heartbeat_socket.sendto(
str.encode("OK"), (self.heartbeat_server, self.heartbeat_port)
)
# Engine resurrected into normal state
if self.already_invalid:
self.already_invalid = False
status = json.dumps(self.get_status())
msg = SU.green("OK - Engine turned back into some healthy state!")
self.logger.info(msg + "\n" + str(status))
# Route call of event via event dispatcher to provide ability for additional hooks
self.engine.event_dispatcher.on_resurrect(
{"engine_id": self.engine_id, "status": status}
)
else:
# Engine turned into invalid state
if not self.already_invalid:
self.already_invalid = True
status = json.dumps(self.get_status())
self.logger.critical(
SU.red("Engine turned into some INVALID STATE!") + "\n" + str(status)
)
# Route call of event via event dispatcher to provide ability for additional hooks
self.engine.event_dispatcher.on_sick(
{"engine_id": self.engine_id, "status": status}
)
heartbeat_frq = self.config.monitoring.heartbeat.frequency # default: 1, disable: 0
if heartbeat_frq > 1:
threading.Timer(heartbeat_frq, self.heartbeat).start()
def validate_url_connection(self, url):
"""
Check if connection to passed URL is successful.
# FIXME This should be refactored to a simple ping
"""
try:
requests.get(url, timeout=5)
except requests.ConnectionError:
return False
return True
def validate_directory(self, dir_path):
"""
Check if a given directory is existing and holds content.
"""
status = dict()
status["path"] = dir_path
status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
if not status["has_content"]:
msg = f"Directory '{dir_path}' has no contents!"
self.logger.warning(SU.red(msg))
else:
msg = f"Directory '{dir_path}' doesn't exist!"
self.logger.error(SU.red(msg))
return status
def get_url_response(self, url):
"""
Fetch JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
try:
result = self.api.get(url)
if 200 == result.response.status_code:
return result.json
except requests.ConnectionError as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return MonitorResponseCode.INVALID_STATE.value
def get_engine_id(self):
"""
Retrieve a String identifier consisting of IP and Hostname.
This is used to identify engine in status broadcasts.
"""
host = platform.node()
return "%s (%s)" % (self.get_ip(), host)
def get_ip(self):
"""
Return the IP of the Engine instance.
"""
try:
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.connect(("<broadcast>", 0))
return s.getsockname()[0]
except OSError:
self.logger.critical(SU.red("Error while accessing network via <broadcast>!"))
return "<UNKNOWN NETWORK>"
finally:
s.close()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Utilities and mappings for media resources.
"""
from __future__ import annotations
import datetime
from enum import Enum
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import ChannelName, ChannelType
from aura_engine.scheduling import domain
class ResourceType(Enum):
"""
Media content types.
"""
FILE = "file:"
STREAM_HTTP = "http"
LINE = "line:"
M3U = "m3u:"
POOL = "pool:"
class ResourceMapping:
"""
Wires source types with channel-types.
"""
resource_mapping = None
def __init__(self):
"""
Initialize the resource mapping.
"""
self.resource_mapping = {
ResourceType.FILE: ChannelType.QUEUE,
ResourceType.STREAM_HTTP: ChannelType.HTTP,
ResourceType.LINE: ChannelType.LIVE,
ResourceType.M3U: ChannelType.QUEUE,
ResourceType.POOL: ChannelType.QUEUE,
}
def type_for_resource(self, resource_type: ResourceType) -> ChannelType:
"""
Retrieve a `ChannelType` for the given `ResourceType`.
Only default mappings can be evaluated. Custom variations
like fallback channels are not respected.
"""
return self.resource_mapping.get(resource_type)
def live_channel_for_resource(self, channel_uri: str) -> ChannelName:
"""
Return the channel enum for a given live channel string from Tank.
Channel URIs from Tank are typically in the format `line://1`, while
channel names in Liquidsoap have names in the format `aura_engine_line_in_1`.
If no valid channel URI is provided, `None` is returned.
Args:
channel (str): Channel URI as provided from Tank.
Returns:
(ChannelName): Liquidsoap name of the channel.
"""
if not channel_uri:
return None
channel_name = "aura_engine_line_in_" + channel_uri.split("line://")[1]
for cn in ChannelName:
if cn.value == channel_name:
return cn
return None
class ResourceClass(Enum):
"""
Media content classes.
"""
FILE = {"id": "fs", "numeric": 0, "types": [ResourceType.FILE]}
STREAM = {
"id": "http",
"numeric": 1,
"types": [ResourceType.STREAM_HTTP],
}
LIVE = {"id": "live", "numeric": 2, "types": [ResourceType.LINE]}
PLAYLIST = {
"id": "playlist",
"numeric": 3,
"types": [ResourceType.M3U, ResourceType.POOL],
}
@property
def types(self):
"""Retrieve allowed types of the resource class."""
return self.value["types"]
@property
def numeric(self):
"""Retrieve a numeric representation of the resource class."""
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class ResourceUtil(Enum):
"""
Utilities for different resource types.
"""
@staticmethod
def get_content_type(uri: str) -> ResourceType:
"""
Return the content type identified by the passed URI.
Args:
uri (str): The URI of the source.
Returns:
(ResourceType): The identified resource type.
"""
if not uri:
return None
if uri.startswith(ResourceType.STREAM_HTTP.value):
return ResourceType.STREAM_HTTP
if uri.startswith(ResourceType.POOL.value):
return ResourceType.POOL
if uri.startswith(ResourceType.M3U.value):
return ResourceType.M3U
if uri.startswith(ResourceType.FILE.value):
return ResourceType.FILE
if uri.startswith(ResourceType.LINE.value):
return ResourceType.LINE
@staticmethod
def get_content_class(content_type: ResourceType) -> ResourceClass:
"""
Return the content class identified by the passed type.
Args:
content_type (ContentType): The resource type.
Returns:
(ResourceClass): The class this type falls in.
"""
if content_type in ResourceClass.FILE.types:
return ResourceClass.FILE
if content_type in ResourceClass.STREAM.types:
return ResourceClass.STREAM
if content_type in ResourceClass.LIVE.types:
return ResourceClass.LIVE
if content_type in ResourceClass.PLAYLIST.types:
return ResourceClass.PLAYLIST
@staticmethod
def generate_m3u_file(target_file_path: str, items: list):
"""
Write a M3U file based on the given playlist object.
Args:
target_file_path (str): The M3U playlist to write.
items ([PlaylistItem]): Items of the playlist.
config (AuraConfig): The config file wrapper.
"""
file = open(target_file_path, "w")
fb = ["#EXTM3U\n"]
for item in items:
if ResourceUtil.get_content_type(item.source) == ResourceType.FILE:
path = ResourceUtil.source_to_filepath(item.source)
fb.append(
f"#EXTINF:{item.duration},{item.metadata.artist} - {item.metadata.title}"
)
fb.append(path + "\n")
file.writelines(fb)
file.close()
@staticmethod
def to_abs_path(path):
"""
Transform any given (relative) path to an absolute path.
Starting at the project root.
"""
if path.startswith("/"):
return path
else:
ac = AuraConfig.instance
return ac.confuse_config["install_dir"].get() + "/" + path
@staticmethod
def audio_store_path() -> str:
"""
Get audio store location.
Returns:
(str): Absolute path to audio store folder.
"""
ac = AuraConfig.instance
return ResourceUtil.to_abs_path(ac.config.scheduler.audio.source_folder)
@staticmethod
def playlist_folder_path() -> str:
"""
Get playlist folder location.
Returns:
(str): Return the absolute path to the playlist folder.
"""
ac = AuraConfig.instance
return ResourceUtil.to_abs_path(ac.config.scheduler.audio.playlist_folder)
@staticmethod
def source_to_filepath(source_uri: str) -> str:
"""
Create path from URI and extension.
Convert a file-system URI starting with "file://" to an actual, absolute path to the file,
appending the extension as provided in "source_extension".
If the path starts with an "/", it indicates that it is already an absolute path including
a valid extension.
Args:
source_uri (str): The URI of the file
Returns:
(str): Absolute file path.
"""
path = source_uri[7:]
if path.startswith("/"):
return path
else:
ac = AuraConfig.instance
base_dir = ResourceUtil.audio_store_path()
extension = ac.config.scheduler.audio.source_extension
return base_dir + "/" + path + extension
@staticmethod
def get_items_string(items: list) -> str:
"""
Return a list of playlist items as String for logging purposes.
Args:
items ([PlaylistItem]): List of playlist items.
Returns:
(str): Stringified version of the items.
"""
s = ""
if isinstance(items, list):
for item in items:
s += str(item)
if item != items[-1]:
s += ", "
else:
s = str(items)
return s
@staticmethod
def generate_track_metadata(
item: domain.PlaylistItem,
assign_track_start: bool = False,
) -> dict:
"""
Generate Liquidsoap track metadata based on an playlist item.
Args:
item (PlaylistItem): The playlist item.
assign_track_start (bool): If true, the track start time metadata is passed through
to Liquidsoap. This is recommended for audio sourced where Liquidsoap does not
generate its own start time (e.g line from live studio).
Returns:
({}): dictionary with structured metadata.
"""
content_type = ResourceUtil.get_content_type(item.source)
content_class = ResourceUtil.get_content_class(content_type)
playlist: domain.Playlist = item.get_playlist()
timeslot: domain.Timeslot = playlist.get_timeslot()
try:
annotations = {
"show_name": str(timeslot.get_show().name),
"show_id": int(timeslot.get_show().id),
"timeslot_id": timeslot.id,
"playlist_id": int(playlist.id),
"playlist_item": str(float(item.get_position())),
"track_type": int(content_class.numeric),
"track_start": "",
"track_duration": -1,
"track_title": "",
"track_album": "",
"track_artist": "",
}
except ValueError as ve:
raise ve
if assign_track_start:
# Convert to current Liquidsoap date/time format
track_start: datetime.datetime = SU.timestamp_to_datetime(item.get_start())
annotations["track_start"] = track_start.strftime("%Y/%m/%d %H:%M:%S")
if item.duration and item.duration > 0:
annotations["track_duration"] = item.duration
if item.metadata:
annotations["track_title"] = str(item.metadata.title)
annotations["track_album"] = str(item.metadata.album)
annotations["track_artist"] = str(item.metadata.artist)
return annotations
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Client classes for scheduling API operations.
- ApiFetcher: Poll data from Steering and Tank API.
- ApiResult: Holding the results from the API, including error codes and message.
"""
from __future__ import annotations
import logging
import queue
import threading
import traceback
from datetime import datetime
from typing import NamedTuple
import confuse
from aura_engine.base.api import SimpleCachedRestApi, SimpleRestApi
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import private
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import (
Episode,
Playlist,
PlaylistItem,
Show,
Timeslot,
)
from aura_engine.scheduling.utils import M3UPlaylistProcessor
from aura_steering_api.models.playout_program_entry import (
PlayoutProgramEntry as API_PLAYOUT_ENTRY,
)
from aura_tank_api.models.playlist import Playlist as API_PLAYLIST
from aura_tank_api.models.playlist_entry import PlaylistEntry as API_PLAYLIST_ENTRY
from aura_tank_api.types import Unset
ApiResult = NamedTuple(
"ApiResult",
[("timeslots", list), ("code", int), ("message", str), ("exception", Exception)],
)
class ApiFetcher(threading.Thread):
"""
Get data from AURA REST APIs.
Retrieve timeslots, playlists and playlist items and other metadata as JSON.
Calls the API endpoints of Steering and Tank.
"""
logging: logging.Logger
config: confuse.Configuration
queue = None
has_already_fetched = False
stop_event = None
api = None
m3u_processor = None
# Config for API Endpoints
url_api_timeslots = None
url_api_playlist = None
tank_headers = None
def __init__(self):
"""
Initialize the API Fetcher.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
cache_location = self.config.general.cache_dir
self.api = SimpleCachedRestApi(SimpleRestApi(), cache_location)
self.url_api_timeslots = self.config.api.steering.calendar
self.url_api_playlist = self.config.api.tank.playlist
self.queue = queue.Queue()
self.stop_event = threading.Event()
tank_session = self.config.api.tank.session
tank_secret = self.config.api.tank.secret
self.tank_headers = {
"Authorization": f"Bearer {tank_session}:{tank_secret}",
"content-type": "application/json",
}
self.m3u_processor = M3UPlaylistProcessor()
threading.Thread.__init__(self)
def run(self):
"""
Fetch timeslot data from the Steering and Tank API.
"""
try:
timeslots = self.fetch_timeslots()
# If nothing is fetched, return an empty list
if not timeslots:
self.queue.put(ApiResult([], 0, "Nothing fetched", None))
# Release the mutex
self.queue.put(ApiResult(timeslots, 0, "Success", None))
except Exception as e:
# Release the mutex
self.logger.error("Error while fetching new data from API", e)
self.queue.put(
ApiResult([], 1, f"Error while fetching: {str(e)}", traceback.format_exc())
)
# Terminate the thread
return
def fetch(self):
"""
Retrieve fetched data from the queue.
"""
return self.queue.get()
def terminate(self):
"""
Terminate the thread.
"""
self.logger.info(SU.yellow("[ApiFetcher] Shutting down..."))
self.stop_event.set()
#
# private
#
@private
def fetch_timeslots(self) -> list[Timeslot]:
"""
Fetch timeslot data from Steering.
This method also:
- Filters invalid and unnecessary timeslots.
- Remaps any API fields to there local modal representation.
Returns:
([Timeslot]): A list of timeslots
@private
"""
self.logger.debug("Fetch timeslots from Steering API...")
url = self.url_api_timeslots
result = self.api.get(url, params={"includeVirtual": "true"})
api_timeslots = result.json
if not api_timeslots:
return
# Transform API timeslots to engine timeslots
timeslots: list[Timeslot] = []
for api_ts in api_timeslots:
api_ts = API_PLAYOUT_ENTRY.from_dict(api_ts)
show = Show(
id=api_ts.show.id,
name=api_ts.show.name,
)
episode = None
memo = None
if api_ts.timeslot:
memo = api_ts.timeslot.memo
if (ep := api_ts.episode) and ep:
episode = Episode(id=ep.id, title=ep.title, memo=memo)
# TODO resolve timeslot referenced by `repetition_id`
timeslot = Timeslot(
id=api_ts.id,
virtual=False if api_ts.timeslot_id else True,
repetition_id=api_ts.timeslot.repetition_of_id if api_ts.timeslot else None,
start=api_ts.start.timestamp(),
end=api_ts.end.timestamp(),
show=show,
episode=episode,
)
# Virtual timeslot do not have media-sources assigned,
# therefore no playlists need to be fetched.
if not timeslot.is_virtual():
# Fetch playlists for timeslot
playlist_timeslot = None
playlist_schedule = None
playlist_show = None
if (ts := api_ts.timeslot) and ts and (plid := ts.playlist_id) and plid:
playlist_timeslot = self.fetch_playlist(plid)
if (sc := api_ts.schedule) and sc and (plid := sc.default_playlist_id) and plid:
playlist_schedule = self.fetch_playlist(plid)
if (sh := api_ts.show) and sh and (plid := sh.default_playlist_id):
playlist_show = self.fetch_playlist(plid)
timeslot.set_playlists(playlist_timeslot, playlist_schedule, playlist_show)
self.expand_item_duration(playlist_timeslot)
self.expand_item_duration(playlist_schedule)
self.expand_item_duration(playlist_show)
# NOTE: we ignore all virtual timeslots since there are some issues
# regarding merging virtual timeslots into the local timetable
timeslots.append(timeslot)
return timeslots
@private
def fetch_playlist(self, playlist_id: int) -> Playlist:
"""
Fetch a playlist from Tank.
Args:
playlist_id (int): The ID of the playlist.
Returns:
(Playlist): The requested playlist or `None` when `playlist_id` is missing.
@private
"""
if not playlist_id:
return None
playlist: Playlist = None
url = self.url_api_playlist.replace("${ID}", str(playlist_id))
self.logger.debug(f"Fetch playlist '{playlist_id}' from Tank API...")
result = self.api.get(url, headers=self.tank_headers)
SU.log_json(self.logger, result.json)
try:
api_playlist = API_PLAYLIST.from_dict(result.json)
playlist = Playlist(
api_playlist.id, api_playlist.description, api_playlist.playout_mode != "linear"
)
entry: API_PLAYLIST_ENTRY
for entry in api_playlist.entries:
metadata = None
# Files do have a metadata object
if entry.file and entry.file.metadata:
metadata = PlaylistItem.Metadata(
artist=(
""
if isinstance(entry.file.metadata.artist, Unset)
else entry.file.metadata.artist
),
album=(
""
if isinstance(entry.file.metadata.album, Unset)
else entry.file.metadata.album
),
title=(
""
if isinstance(entry.file.metadata.title, Unset)
else entry.file.metadata.title
),
)
item = PlaylistItem(entry.uri, entry.duration, 100, metadata)
# Medida sources with the scheme `m3u://...` point to M3U playlists provided by the
# Audio Store. We need to extract the actual playlist items and feed them like any
# other audio file playlist to Engine Core.
#
# TODO In the future this is to be replaced by a generic music pool feature.
items = self.m3u_processor.spread(item)
if len(items) > 1:
self.logger.info(f"M3U playlist spread to {len(items)} items")
for i in items:
playlist.add(i)
except Exception as e:
self.logger.error(SU.red(f"Decode API_PLAYLIST failed: {e}"))
return playlist
@private
def expand_item_duration(self, playlist: Playlist):
"""
Expand item to the timeslot gap left.
If some playlist item doesn't have a duration assigned, its duration is expanded to the
remaining duration of the playlist (= timeslot duration minus playlist items with
duration).
If there is more than one item without duration, such items are removed from the
playlist.
Args:
playlist (Playlist): the playlist with items to expand.
@private
"""
if not playlist:
return
timeslot_duration: float = playlist.get_timeslot().get_duration()
playlist_duration: float = 0.0
items_wo_duration: list[PlaylistItem] = []
for item in playlist.get_items():
if not item.duration:
items_wo_duration.append(item)
else:
playlist_duration += item.duration
if len(items_wo_duration) == 1:
items_wo_duration[0].duration = timeslot_duration - playlist_duration
self.logger.info(f"Expand duration for playlist item #{items_wo_duration[0]}")
elif len(items_wo_duration) > 1:
# This case should actually never happen, as Tank does not allow more than one item w/o
# duration anymore
for item in reversed(items_wo_duration[1:-1]):
msg = f"Delete playlist item without duration: {item}"
self.logger.error(SU.red(msg))
item.remove()
@private
def is_valid_timeslot(self, timeslot: API_PLAYOUT_ENTRY) -> bool:
"""
Check if timeslot is relevant for further processing.
The result contains all timeslots starting with the one currently playing until the first
timeslot starting after 24 hours (excluded). Note, this might influence resuming, like in
case of a crash single timeslots which are longer than 12 hours long. Think e.g. live
broadcasts.
Args:
timeslots (dict): The timeslots to be filtered
Returns:
(bool): `True` if the timeslot should be used for further processing
@private
"""
if "start" not in timeslot:
return False
now = SU.timestamp()
now_plus_24hours = now + (12 * 60 * 60)
start_time = self.str_to_timestamp(timeslot["start"])
end_time = self.str_to_timestamp(timeslot["end"])
if start_time <= now_plus_24hours and (start_time >= now) or (start_time < now < end_time):
return True
return False
@private
def str_to_timestamp(self, dt_string: str) -> int:
"""
Convert a datetime string to a timestamp.
Args:
dt_string (str): datetime string.
Returns:
int: UNIX epoch timestamp.
@private
"""
dt = datetime.strptime(dt_string, "%Y-%m-%dT%H:%M:%S")
return SU.timestamp(dt)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Domain models for scheduling.
These domain models are defined:
- Timeslot: A timetable item.
- Show: Information on the show related to a timeslot.
- Episode: Information on the episode related to a timeslot
- Playlists: Container for all types of playlists assignable to a timeslot.
- Playlist: Definition of what should be broadcasted for one timeslot.
- PlaylistType: The kind of playlist it is; either assigned to timeslot, schedule or show.
- PlaylistItem: An element of the playlist, referencing media sources and their play state.
- PlayState: State information for a given playlist item.
"""
from __future__ import annotations
import enum
from typing import NamedTuple
from aura_engine import resources
from aura_engine.base.lang import synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import GenericChannel
class PlaylistType(enum.Enum):
"""
Playlist type defines on which level the playlist is assigned to the timeslot.
Attributes:
TIMESLOT (str): The playlist is assigned to the timeslot itself.
SCHEDULE (str): The playlist is assigned to the schedule.
SHOW (str): The playlist is assigned to the show.
"""
TIMESLOT = "timeslot"
SCHEDULE = "schedule"
SHOW = "show"
class Playlist:
"""
The playlist assigned to a timeslot.
Attributes:
id (int): Playlist ID as used in Steering and Tank.
timeslot (Timeslot): Back reference to the timeslot.
type (PlaylistType): Indicating type of timeslot playlist.
items (PlaylistItem): List of items.
"""
id: int
timeslot: Timeslot
type: PlaylistType
desc: str
shuffle: bool
items: list[PlaylistItem]
def __init__(self, id: int, desc: str, shuffle: bool = False):
"""
Initialize.
Args:
id (int): The playlist ID.
desc: (str): Description for the playlist.
shuffle (bool): If the playlist should be played in random order.
"""
self.id = id
self.desc = desc
self.shuffle = shuffle
self.items = []
self.timeslot = None
def get_id(self) -> int:
"""
Get the playlist ID.
Returns:
int: ID of the playlist.
"""
return self.id
def get_timeslot(self) -> Timeslot:
"""
Set the timeslot.
Returns:
(Timeslot): The timeslot of the playlist.
"""
return self.timeslot
def set_timeslot(self, timeslot: Timeslot):
"""
Set the timeslot.
Args:
Timeslot: The timeslot of the playlist.
"""
self.timeslot = timeslot
def get_type(self) -> PlaylistType:
"""
Get the playlist type.
Returns:
PlaylistType: Type of the playlist.
"""
return self.type
def get_items(self) -> list[PlaylistItem]:
"""
Get the items of the playlist.
Returns:
list[PlaylistItem]: All playlist items.
"""
return self.items
def get_duration(self) -> float:
"""
Return the total length of the playlist in seconds.
Returns:
float: Length in seconds.
"""
return sum(item.duration for item in self.items)
def get_description(self) -> str:
"""
Get playlist description.
Returns:
str: The description.
"""
return self.desc
def do_shuffle(self) -> bool:
"""
Returns true if the playlist should be played in random order.
Returns:
bool: `False` if playout mode is linear.
"""
return self.shuffle
@synchronized
def add(self, item: PlaylistItem) -> None:
"""
Add an item to the playlist.
It also adds the back-reference to the playlist and performs linking to the previous
playlist item.
Args:
item (PlaylistItem): The playlist item to append.
"""
prev = None
if self.items:
prev = self.items[-1]
item.prev = prev
item.prev.next = item
item.set_playlist(self)
self.items.append(item)
@synchronized
def update_playlist(self, playlist: Playlist):
"""
Update the playlist data with the one passed.
It is important to avoid overwriting any local references, as they might be already in use
of scheduling threads.
Args:
playlist (Playlist): Playlist with fresh data.
"""
self.id = playlist.id
self.desc = playlist.desc
self.shuffle = playlist.shuffle
self.items = playlist.get_items()
item: PlaylistItem
for item in self.get_items():
item.set_playlist(self)
def __str__(self) -> str:
"""
String representation.
Returns:
str: Playlist as String.
"""
duration = "{:.2f}".format(self.get_duration())
s = f"ID#{self.id} | {duration} seconds"
if self.desc:
s += f" ({self.desc})"
return s
class Playlists:
"""
Container to hold all timeslot assignable types of playlists.
Attributes:
timeslot (Playlist): The playlist assigned on timeslot level.
schedule (Playlist): The playlist assigned on schedule level.
show (Playlist): The playlist assigned on show level.
"""
timeslot: Playlist
schedule: Playlist
show: Playlist
def __init__(self, timeslot: Playlist, schedule: Playlist, show: Playlist):
"""
Initialize.
Args:
timeslot (Playlist): The playlist assigned on timeslot level.
schedule (Playlist): The playlist assigned on schedule level.
show (Playlist): The playlist assigned on show level.
"""
self.timeslot = timeslot
self.schedule = schedule
self.show = show
def get(self, type: PlaylistType) -> Playlist:
"""
Retrieve a playlist by type.
Args:
type (PlaylistType): The type of playlist.
Returns:
Playlist: The playlist.
"""
return getattr(self, type.value)
def set(self, type: PlaylistType, playlist: Playlist):
"""
Retrieve a playlist by type.
Args:
type (PlaylistType): The type of playlist.
playlist (Playlist): The playlist to update.
"""
setattr(self, type.value, playlist)
if playlist:
playlist.type = type
Show = NamedTuple(
"Show",
[
("id", int),
("name", str),
],
)
Episode = NamedTuple(
"Episode",
[
("id", int),
("title", str),
("memo", str),
],
)
class Timeslot:
"""
A timeslot in the program calendar.
Beside references to playlists, it holds show and episode metadata.
Note, some episodes values are inherited from the show,
some others are overridden by the episode.
Attributes:
id (int): Timeslot ID as used in Steering and Tank
repetition_id (int): In case of a re-broadcast it holds a reference to another timeslot ID.
start: Beginning of timeslot as UNIX epoch.
end: Ending of timeslot as UNIX epoch.
virtual: True in case of virtual timeslots, triggering the fallback program.
show.id (int): Show ID as used in Steering and Tank.
show.name (str): Name of the show.
episode.id (int): ID of the episode.
episode.title (str): Title of the episode.
episode.memo: Some notes for the episode (internal, non-public use only).
playlists.timeslot (Playlist): Playlist assigned directly to the timeslot.
playlists.schedule (Playlist): Playlist assigned to the schedule.
paylists.show (Playlist): Playlist assigned to the show.
"""
id: str
repetition_id = int
start: int
end: int
virtual: bool
show: Show
episode: Episode
playlists: Playlists
def __init__(
self,
id: str,
repetition_id: int,
start: float,
end: float,
show: Show,
episode: Episode,
virtual: bool = False,
):
"""
Initialize.
Args:
id (int): ID of the timeslot, in case of virtual timeslots it holds the hash of
`end_date + start_date` string.
repetition_id (int): ID of the timeslot this timeslot is a repetition of. None if n/a.
start (int): Start of the timeslot as UNIX timestamp in seconds.
end (int): Start of the timeslot as UNIX timestamp in seconds.
show (Show): Show assigned to the timeslot
episode (Episode): Episode information representing this broadcast.
virtual (bool): True in case of virtual timeslots
"""
self.id = id
self.repetition_id = repetition_id
self.start = start
self.end = end
self.show = show
self.episode = episode
self.playlists = Playlists(None, None, None)
self.virtual = virtual
def get_id(self) -> str:
"""
Get ID.
Returns:
int: The ID.
"""
return self.id
def is_virtual(self) -> bool:
"""
Check for virtual timeslot.
A virtual timeslot is a timetable slot which is non-actively scheduled.
Returns:
bool: True in case of a virtual timeslot.
"""
return self.virtual
def get_repetition_id(self) -> int:
"""
Get ID of the timeslot this timeslot is a repetition of.
Returns:
int: Repetition timeslot ID. None if it is not a repeating timeslot.
"""
return self.repetition_id
def set_playlists(self, pl_timeslot: Playlist, pl_schedule: Playlist, pl_show: Playlist):
"""
Set all playlists and their respective types at once.
Args:
pl_timeslot (Playlist): The timeslot playlist.
pl_schedule (Playlist): The schedule playlist.
pl_show (Playlist): The show playlist.
"""
if pl_timeslot:
pl_timeslot.type = PlaylistType.TIMESLOT
pl_timeslot.set_timeslot(self)
if pl_schedule:
pl_schedule.type = PlaylistType.SCHEDULE
pl_schedule.set_timeslot(self)
if pl_show:
pl_show.type = PlaylistType.SHOW
pl_show.set_timeslot(self)
self.playlists = Playlists(timeslot=pl_timeslot, schedule=pl_schedule, show=pl_show)
@synchronized
def update(self, timeslot: Timeslot):
"""
Update the timeslot with a newer version.
Args:
timeslot (Timeslot): The new timeslot.
"""
self.show = timeslot.show
self.episode = timeslot.episode
self.id = timeslot.id
self.repetition_id = timeslot.repetition_id
self.update_playlist(PlaylistType.TIMESLOT, timeslot.playlists.timeslot)
self.update_playlist(PlaylistType.SCHEDULE, timeslot.playlists.schedule)
self.update_playlist(PlaylistType.SHOW, timeslot.playlists.show)
@synchronized
def update_playlist(self, type: PlaylistType, playlist: Playlist):
"""
Update the playlist data with the one passed.
It is important to avoid overwriting any local references, as they might be already in use
of scheduling threads.
Args:
playlist (Playlist): Playlist with fresh data.
"""
if self.playlists.get(type) and playlist:
pl: Playlist = self.playlists.get(type)
pl.update_playlist(playlist)
else:
self.playlists.set(type, playlist)
def get_current_playlist(self) -> Playlist | None:
"""
Retrieve the playlist to be scheduled.
The playlist to be retrieved is checked in this order:
1. Timeslot playlist
2. Schedule playlist
3. Show playlist
In case of virtual timeslots, `None` is returned, since they don't have playlists assigned.
Here the scheduling is performed by the silence detector in Engine Core.
Returns:
(Playlist | None): The playlist currently valid for being scheduled or None if it is a
virtual timeslot.
"""
# In case of virtual timeslot, do nothing. There is some timeout until the silence
# detector kicks in.
#
# TODO We need to assess leads to issues. To improve the fallback program
# can be actively scheduled in the future. @see aura#392
if self.is_virtual():
return None
playlist = self.playlists.timeslot
if not playlist:
playlist = self.playlists.schedule
if not playlist:
playlist = self.playlists.show
return playlist
def get_start(self) -> float:
"""
Get start of timeslot.
Returns:
float: start as UNIX epoch timestamp.
"""
return self.start
def get_end(self) -> float:
"""
Get end of timeslot.
Returns:
float: end as UNIX epoch timestamp.
"""
return self.end
def get_duration(self) -> float:
"""
Get duration of timeslot in seconds.
Note, this is not the duration of some actual playlist.
Returns:
float: timeslot length in seconds.
"""
return self.get_end() - self.get_start()
def get_show(self) -> Show:
"""
Get show associated with the timeslot.
Returns:
Show: The show.
"""
return self.show
def get_episode(self) -> Episode:
"""
Get episode associated with the timeslot.
Returns:
Episode: The episode.
"""
return self.episode
def __str__(self) -> str:
"""
String representation.
Returns:
str: Timeslot as String.
"""
start = SU.fmt_time(self.start)
end = SU.fmt_time(self.end)
s = f"ID#{self.id}: {start} - {end}"
if self.show:
s += f" [Show#{self.show.id}: {self.show.name}]"
return s
class PlayState:
"""
Play-out state information on a queued or an already playing playlist item.
It holds info on play-state, the chosen channel and the actual start time.
"""
class PlayStateType(str, enum.Enum):
"""
Play-state of a playlist item.
"""
UNKNOWN = "unknown"
LOADING = "loading"
READY = "ready"
PLAYING = "playing"
DONE = "done"
TIMEOUT = "timeout"
state: PlayStateType
play_start: float
channel: GenericChannel
def __init__(self):
"""
Initialize the play info.
"""
# set state to UNKNOWN
self.state = self.PlayStateType.UNKNOWN
self.play_start = None
self.channel = None
def is_loading(self) -> bool:
"""
Check if item is loading.
"""
return self.state == self.PlayStateType.LOADING
def set_loading(self, channel: GenericChannel):
"""
Set item state to loading and stores the used channel.
Args:
channel (GenericChannel): The channel where the item is loaded into.
"""
self.state = self.PlayStateType.LOADING
self.channel = channel
def is_ready(self) -> bool:
"""
Check if item is ready to play.
"""
return self.state == self.PlayStateType.READY
def set_ready(self):
"""
Set item status as ready to play.
"""
self.state = self.PlayStateType.READY
def is_playing(self) -> bool:
"""
Check if item is ready playing.
"""
return self.state == self.PlayStateType.PLAYING
def set_playing(self):
"""
Set item status as playing and assigns current timestamp as start time.
Note the start time is not guaranteed to match the exact play-out time. The actual play-out
start time can only be gathered on the site of Liquidsoap.
"""
self.state = self.PlayStateType.PLAYING
self.play_start = SU.timestamp()
def get_play_start(self) -> float:
"""
Get the time where the item actually started playing.
Important things to note:
- Do not mix this up with the planned start time (See `PlaylistItem.get_start_time()`).
- Some of the play time is set in Engine (line sources), some other natively by
Liquidsoap (audio files).
Returns:
float: Play start time as timestamp.
"""
return self.play_start
def is_done(self) -> bool:
"""
Check if item is done playing.
"""
return self.state == self.PlayStateType.DONE
def set_done(self):
"""
Set item status as done playing.
"""
self.state = self.PlayStateType.DONE
class PlaylistItem:
"""
An item of some playlist.
It holds an reference to the playlist and a double linked list to the previous and next item.
Attributes:
prev (PlaylistItem): The previous playlist item.
next (PlaylistItem): The next playlist item.
playlist (str): Back reference to playlist.
source (str): URI referencing the audio source.
duration (float): Duration in seconds.
volume (int): Value from 0-100% indicating loudness.
play (PlayState): Information on play-out state, start time and used channel.
play_queue_state (QueueState): Info how the item behaves within the timeslot.
"""
prev: PlaylistItem
next: PlaylistItem
playlist: Playlist
source: str
duration: float
volume: int
play: PlayState
Metadata = NamedTuple("Metadata", [("artist", str), ("album", str), ("title", str)])
metadata: Metadata
class QueueState(enum.Enum):
"""
Types of playlist item behaviours.
Items are either fully played, cut or not played at all, because they are past the
timeslot.
It is only used for displaying in the list of queued media in the logs, compare
the logic of `TimetableRenderer`.
TODO Think about refactoring this out of the core business logic.
"""
OKAY = "ok"
CUT = "cut"
PAST_TIMESLOT = "pt"
queue_state: QueueState
def __init__(self, source: str, duration: float, volume: int, metadata: Metadata):
"""
Initialize the playlist item.
Args:
source (str): URI referencing the media source.
duration (float): Duration in seconds.
volume (int): Volume of the media source.
metadata (Metadata): Metadata on the media.
"""
self.prev = None
self.next = None
self.playlist = None
self.source = source
self.duration = duration
self.volume = volume
self.metadata = metadata
self.play = PlayState()
self.play_queue_state = PlaylistItem.QueueState.OKAY
def get_playlist(self) -> Playlist:
"""
Get the playlist of this item.
Returns:
Playlist: The associated playlist.
"""
return self.playlist
def set_playlist(self, playlist: Playlist):
"""
Set the playlist of this item.
Args:
Playlist: The associated playlist.
"""
self.playlist = playlist
def get_position(self) -> int:
"""
Calculate the position in the playlist.
Returns:
int: number in the list of items.
"""
if self.prev:
return 1 + self.prev.get_position()
return 1
def get_start(self) -> float:
"""
Calculate the planned start time of an playlist item as an UNIX epoch timestamp.
The start is the sum of the timeslot start plus all durations of previous items.
Attention: This should not be mixed up with `self.play.get_play_start()` which represents
the actual start time. The two are not guaranteed not match!
Returns:
float: Timestamp when item is expected to start.
"""
if self.prev:
return self.get_prev().get_end()
else:
return self.get_playlist().get_timeslot().get_start()
def get_end(self) -> float:
"""
Calculate the end of the item as UNIX epoch timestamp.
Returns:
float: Timestamp when item is expected to end.
"""
return self.get_start() + self.duration
def get_content_type(self) -> resources.ResourceType:
"""
Get content type for the given source URI.
Returns:
ResourceType: Type of the content.
"""
return resources.ResourceUtil.get_content_type(self.source)
def get_prev(self) -> PlaylistItem:
"""
Get previous playlist item.
Returns:
PlaylistItem: The previous item.
"""
return self.prev
def get_next(self) -> PlaylistItem:
"""
Get next playlist item.
Returns:
PlaylistItem: The next item.
"""
return self.next
def get_all_prev(self) -> list[PlaylistItem]:
"""
Retrieve all previous items as part of the current item's playlist.
Returns:
list[PlaylistItem]: All previous items.
"""
prev = self.get_prev()
if prev:
all_prev = prev.get_all_prev()
all_prev.append(prev)
return all_prev
return []
def get_all_next(self, timeslot_sensitive: bool = True) -> list[PlaylistItem]:
"""
Retrieve all following items as part of the current item's playlist.
Args:
timeslot_sensitive (Boolean): If `True` items which start after \
the end of the timeslot are excluded.
Returns:
list[PlaylistItem]: List of PlaylistItem.
"""
next = self.get_next()
if next:
if timeslot_sensitive:
if next.get_start() > self.get_playlist().get_timeslot().get_end():
return []
all_next = next.get_all_next(timeslot_sensitive)
all_next.insert(0, next)
return all_next
else:
return []
def remove(self):
"""
Delete the item from the playlist.
"""
prev = self.prev
if self.prev:
self.prev.next = self.next
if self.next:
self.next.prev = prev
self.get_playlist().items.remove(self)
self.set_playlist(None)
self.prev = None
self.next = None
def __str__(self):
"""
String representation.
Returns:
str: Playlist item as String.
"""
start = SU.fmt_time(self.get_start())
end = SU.fmt_time(self.get_end())
track = self.source[-25:]
duration = "{:.2f}".format(self.duration)
return f"PlaylistItem [{start} - {end} " + f"| {duration}sec | Source: ...{track}]"