Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • dev-old
  • dev-old-david
  • develop
  • lars-tests
  • master
  • master-old
  • topic/filesystem-fallbacks
  • topic/tank_connection
  • topic/tank_connection_david
  • user/equinox/docker
10 results

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Select Git revision
  • 122-synchronized-ci
  • feat-use-docker-main-tag
  • fix-aura-sysuser
  • fix-broken-pipe-153
  • fix-docker-release
  • fix-push-latest-with-tag
  • fix-streamchannel-retries
  • gitlab-templates
  • improve-test-coverage-137
  • improve-test-coverage-143
  • main
  • orm-less-scheduling
  • remove-mailer
  • update-changelog-alpha3
  • virtual-timeslots-131
  • 1.0.0-alpha1
  • 1.0.0-alpha2
  • 1.0.0-alpha3
  • 1.0.0-alpha4
  • 1.0.0-alpha5
20 results
Show changes
Showing
with 4552 additions and 0 deletions
# Running Engine with Docker
Docker provides a simple way to get your engine with all dependencies running.
Here you can find the official AURA Engine Docker images:
https://hub.docker.com/repository/docker/autoradio/engine
> Note: The Engine Docker image is in *POC* state and waiting to be fully implemented.
It's not yet ready to be offically used. If you want to try AURA Engine meanwhile
try the [Standard Installation](docs/installation-development).
<!-- TOC -->
- [Running Engine with Docker](#running-engine-with-docker)
- [Basic configuration](#basic-configuration)
- [Start an image](#start-an-image)
- [Configure an image](#configure-an-image)
- [Making Docker Releases](#making-docker-releases)
- [Build your own, local Docker image](#build-your-own-local-docker-image)
- [Releasing a new version to DockerHub](#releasing-a-new-version-to-dockerhub)
- [Read more](#read-more)
<!-- /TOC -->
## Basic configuration
Create a default configuration and edit according to your settings
```shell
cp configuration/sample-docker.engine.ini configuration/docker/engine.ini
```
Create a symlink in `./audio` to point to the audio source of tank
```shell
ln -s /path/to/tank/audio-store ./audio/source
```
## Start an image
```shell
./run.sh docker:engine
```
*To be extended ...*
## Configure an image
*To be extended ...*
## Making Docker Releases
This section is only relevant if you are an Engine Developer.
### Build your own, local Docker image
```shell
./run.sh docker:build
```
### Releasing a new version to DockerHub
```shell
./run.sh docker:push
```
## Read more
- [Overview](/README.md)
- [Installation for Development](installation-development.md)
- [Installation for Production](installation-production.md)
- [Running with Docker](running-docker.md)
- [Setup the Audio Store](docs/setup-audio-store.md)
- [Developer Guide](developer-guide.md)
- [Engine Features](engine-features.md)
- [Frequently Asked Questions (FAQ)](docs/frequently-asked-questions.md)
\ No newline at end of file
# Setting up the Audio Store
The *Audio Store* is a folder which is utilized by AURA Tank and Engine to exchange audio files.
Assuming AURA Engine and Tank are hosted on different machines, the `audio_source_folder` must by shared
using some network share.
In case you are hosting Engine and Tank on the same machine (e.g. in development), you can skip
this documentation. Just think about pointing them to the same directory.
<!-- TOC -->
- [Setting up the Audio Store](#setting-up-the-audio-store)
- [Share Location](#share-location)
- [Share Type](#share-type)
- [Setting up SSHFS](#setting-up-sshfs)
- [Configuring Engine](#configuring-engine)
- [Configuring Tank](#configuring-tank)
- [Read more](#read-more)
<!-- /TOC -->
By default Engine expects audio files shared by Tank in `/var/audio/source`.
This can be configurated in `engine.ini`:
```ini
[audiosource]
audio_source_folder="/var/audio/source"
```
Now, this folder must be somehow writable by Tank.
## Share Location
You have following options where your share can be located:
1. **Engine and all other AURA components (Tank, Dashboard, Steering) are running on the same instance.** This is the most simple solution,
as Engine and Tank can share the same directory locally. But this scenario requires some more sophisticated tuning of the system resources
to avoid e.g. some overload of multiple Uploads in Tank may affect the performance of engine. You can eliminate this risk by setting CPU and
memory limits for Steering, Dashboard and Tank using Docker or `systemd-cgroups`. A disadvantage here is the case of maintainence of system
reboot. This would mean that all components are offline at once.
2. **Physical directory where the Engine lives, mounted to Tank**. This may cause an issue with the mount, when no network connection to Engine
is unavailable or the instance is rebooting.
3. **Physical directory where the Tank lives, mounted to Engine.** This may cause an issue with the mount, when no network connection to Tank
is unavailable or the instance is rebooting.
4. **Central Data Store or *Storage Box*** which is mountet to Engine and Tank. In this case a downtime of the store make both, Engine and Tank
dysfunctional.
5. **Replicated storage solution using [Gluster](https://www.gluster.org/), both Engine and Tank have their virtual audio directory mounted.**
That's the ideal approach, because if any of the instances is down, the other has all the data available.
In any case, you should think about some backup solution involving this directory.
## Share Type
Then, there's the question how the share is managed. Beside other you have the options
to use [NFS](https://en.wikipedia.org/wiki/Network_File_System), [SSHFS](https://en.wikipedia.org/wiki/SSHFS) or even something like [Gluster](https://www.gluster.org/).
For our initial setup we have chosen to use *SSHFS*.
Please share your experience with other share types, and we will include it in further releases of
this documentation.
## Setting up SSHFS
SSHFS allows you to access the filesystem on a remote computer via SSH. Interaction with files and folders behaves similar to any local data.
This example is setting up the `audio_source_folder` on the Engine instance.
### Configuring Engine
First, you'll need to create an user which enables Tank to access the `audio_source_folder` on Engine:
```shell
adduser tankuser
chown tankuser:engineuser /var/audio/source
```
Ensure that `engineuser` has no permissions to write the directory:
```shell
chmod u=+rwx,go=+rx-w /var/audio/source
```
### Configuring Tank
On the Tank side you need to install `sshfs`:
```shell
sudo apt-get install sshfs
```
Then create an `audio-store` folder inside the AURA home:
```shell
:/opt/aura/$ mkdir audio-store
```
Try if you can connect to the engine over SSH using your `tankuser`:
```shell
ssh tankuser@192.168.0.111 -p22
```
Replace `-p22` with the actual port number your SSH service is running with. For security reasons it's recommended
to run SSH not over the default port 22.
Uncomment following setting in `/etc/fuse.conf` to allow the tank-user access the share with write permissions:
```conf
# Allow non-root users to specify the allow_other or allow_root mount options.
user_allow_other
```
Now create the mount:
```shell
sudo sshfs -o allow_other -o IdentityFile=~/.ssh/id_rsa tankuser@192.168.0.111:/var/audio /opt/aura/audio-store -p22
```
Replace `192.168.0.111` with the actual IP for your Engine and `-p22` with the actual port number your Engine's SSH service
is running with.
To make this mount persistent i.e. keep it alive even after a system reboot, you'll need to add a configuration
in the `/etc/fstab` file by adding this at the end:
```yaml
# Audio Store @ AURA Engine
sshfs#tankuser@192.168.0.111:/var/audio /opt/aura/audio-store fuse auto,port=22,identityfile=~/.ssh/id_rsa,allow_other,_netdev 0 0
```
Again, check for the correct port number in the line above.
To take this into effect you'll need to remount the filesystem with `sudo mount -a` or reboot the machine. When mounting
you'll need to authenticate with the `tankuser` password once.
Then review if your Tank's Docker configuration mounts the exact same volume (`/opt/aura/audio-store`).
If not edit the tank configuration `/etc/aura/tank.env` and set following property:
```shell
TANK_STORE_PATH=/opt/aura/audio-store
```
Finally, do some testing if the directory is writable from Tank's system (`touch some-file`) and if the Engine's side can read
this file. Then restart your Tank Docker container and you should be good to go.
## Read more
- [Overview](/README.md)
- [Installation for Development](installation-development.md)
- [Installation for Production](installation-production.md)
- [Running with Docker](running-docker.md)
- [Setup the Audio Store](docs/setup-audio-store.md)
- [Developer Guide](developer-guide.md)
- [Engine Features](engine-features.md)
- [Frequently Asked Questions (FAQ)](docs/frequently-asked-questions.md)
#!/bin/sh
''''which python3.8 >/dev/null 2>&1 && exec python3.8 "$0" "$@" # '''
''''which python3.7 >/dev/null 2>&1 && exec python3.7 "$0" "$@" # '''
''''exec echo "Error: Snaaakey Python, where are you?" # '''
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import signal
import logging
import subprocess
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from modules.base.logger import AuraLogger
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil
config = AuraConfig()
def configure_flask():
app.config["SQLALCHEMY_DATABASE_URI"] = config.get_database_uri()
app.config['BABEL_DEFAULT_LOCALE'] = 'de'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# FIXME Instatiate SQLAlchemy without the need for Flask
app = Flask(__name__)
configure_flask()
DB = SQLAlchemy(app)
class AuraEngine:
"""
AuraEngine does the following:
1. Initialize the engine and scheduler
2. Initialize Redis
3. Start Liquidsoap in a separate thread which connects to the engine
"""
logger = None
config = None
server = None
messenger = None
controller = None
engine = None
scheduler = None
lqs = None
lqs_startup = None
def __init__(self):
"""
Initializes Engine Core.
"""
self.config = config
AuraLogger(self.config)
self.logger = logging.getLogger("AuraEngine")
def startup(self, lqs_startup):
"""
Starts Engine Core.
"""
from modules.scheduling.scheduler import AuraScheduler
from modules.core.engine import Engine
from modules.cli.redis.adapter import ServerRedisAdapter
# If Liquidsoap should be started automatically
self.lqs_startup = lqs_startup
# Check if the database has to be re-created
if self.config.get("recreate_db") is not None:
AuraScheduler(self.config, None, None)
# Create scheduler and Liquidsoap communicator
self.engine = Engine(self.config)
self.scheduler = AuraScheduler(self.config, self.engine, self.on_initialized)
# Create the Redis adapter
self.messenger = ServerRedisAdapter(self.config)
self.messenger.scheduler = self.scheduler
self.messenger.engine = self.engine
# And finally wait for redis message / start listener thread
self.messenger.start()
def on_initialized(self):
"""
Called when the engine is initialized, before the Liquidsoap connection is established."
"""
self.logger.info(SimpleUtil.green("Engine Core initialized - Waiting for Liquidsoap connection ..."))
if self.lqs_startup:
self.start_lqs(False, False)
else:
self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually."))
def start_lqs(self, debug_output, verbose_output):
"""
Starts Liquidsoap.
"""
lqs_path = self.config.get("liquidsoap_path")
lqs_cwd = os.getcwd() + "/" + self.config.get("liquidsoap_working_dir")
lqs_output = ""
lqs_output = self.get_debug_flags(debug_output, verbose_output)
self.lqs = subprocess.Popen([lqs_path, lqs_output, "engine.liq"], \
cwd=lqs_cwd, \
stdout=subprocess.PIPE, \
shell=False)
def get_lqs_cmd(self, debug_output, verbose_output):
"""
Returns a shell command string to start Liquidsoap
"""
lqs_path = self.config.get("liquidsoap_path")
lqs_cwd = os.getcwd() + "/" + self.config.get("liquidsoap_working_dir")
lqs_output = self.get_debug_flags(debug_output, verbose_output)
return "(cd %s && %s %s ./engine.liq)" % (lqs_cwd, lqs_path, lqs_output)
def get_debug_flags(self, debug_output, verbose_output):
"""
Build Liquidsoap debug parameters.
"""
output = ""
if debug_output:
output += "--debug "
if verbose_output:
output += "--verbose "
return output
def exit_gracefully(self, signum, frame):
"""
Shutdown of the engine. Also terminates the Liquidsoap thread.
"""
if self.lqs:
self.lqs.terminate()
self.logger.info("Terminated Liquidsoap")
if self.engine:
self.engine.terminate()
if self.messenger:
self.messenger.terminate()
self.logger.info("Gracefully terminated Aura Engine! (signum:%s, frame:%s)" % (signum, frame))
sys.exit(0)
#
# START THE ENGINE
#
if __name__ == "__main__":
engine = AuraEngine()
start_lqs = True
lqs_cmd = False
signal.signal(signal.SIGINT, engine.exit_gracefully)
signal.signal(signal.SIGTERM, engine.exit_gracefully)
if len(sys.argv) >= 2:
if "--without-lqs" in sys.argv:
start_lqs = False
if "--get-lqs-command" in sys.argv:
lqs_cmd = True
if "--use-test-data" in sys.argv:
engine.config.set("use_test_data", True)
if "--recreate-database" in sys.argv:
engine.config.set("recreate_db", True)
if lqs_cmd:
print(engine.get_lqs_cmd(True, True))
else:
engine.startup(start_lqs)
#!/bin/sh
''''which python3.8 >/dev/null 2>&1 && exec python3.8 "$0" "$@" # '''
''''which python3.7 >/dev/null 2>&1 && exec python3.7 "$0" "$@" # '''
''''exec echo "Error: Snaaakey Python, where are you?" # '''
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import sys
import redis
from argparse import ArgumentParser
from modules.cli.padavan import Padavan
from modules.base.exceptions import PlaylistException
from modules.base.config import AuraConfig
class Guru():
"""
Command Line Interface (CLI) for Aura Engine.
"""
# config_path = "%s/configuration/engine.ini" % Path(__file__).parent.absolute()
config = AuraConfig()
parser = None
args = None
# ------------------------------------------------------------------------------------------ #
def __init__(self):
self.init_argument_parser()
self.handle_arguments()
def handle_arguments(self):
if self.args.stoptime:
start = time.time()
if not self.args.quiet:
print("Guru thinking...")
try:
p = Padavan(self.args, self.config)
p.meditate()
except PlaylistException as pe:
# typically there is no next file found
if not self.args.quiet:
print(pe)
else:
print("")
exit(4)
except redis.exceptions.TimeoutError:
print("Timeout when waiting for redis message. Is AURA daemon running? Exiting...")
exit(3)
if not self.args.quiet:
print("...result: ")
if p.stringreply != "":
#print(p.stringreply)
if p.stringreply[len(p.stringreply)-1] == "\n":
print(p.stringreply[0:len(p.stringreply) - 1])
else:
print(p.stringreply[0:len(p.stringreply)])
if self.args.stoptime:
end = time.time()
exectime = end-start
print("execution time: "+str(exectime)+"s")
def init_argument_parser(self):
try:
self.create_parser()
self.args = self.parser.parse_args()
except (ValueError, TypeError) as e:
if self.parser is not None:
self.parser.print_help()
print()
print(e)
exit(1)
def create_parser(self):
self.parser = ArgumentParser()
# options
self.parser.add_argument("-sep", "--stop-execution-time", action="store_true", dest="stoptime", default=False, help="Prints the execution time at the end of the skript")
self.parser.add_argument("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Just the result will outputed to stout")
self.parser.add_argument("-rd", "--recreate-database", action="store_true", dest="recreatedb", default=False, help="Do you want to recreate the database?")
# getter
self.parser.add_argument("-pcs", "--print-connection-status", action="store_true", dest="get_connection_status", default=False, help="Prints the status of the connection to liquidsoap, pv and tank")
self.parser.add_argument("-gam", "--get-active-mixer", action="store_true", dest="mixer_channels_selected",default=False, help="Which mixer channels are selected?")
self.parser.add_argument("-pms", "--print-mixer-status", action="store_true", dest="mixer_status", default=False, help="Prints all mixer sources and their states")
self.parser.add_argument("-pap", "--print-act-programme", action="store_true", dest="get_act_programme", default=False, help="Prints the actual Programme, the controller holds")
self.parser.add_argument("-s", "--status", action="store_true", dest="get_status", default=False, help="Returns the Engine Status as JSON")
# liquid manipulation
self.parser.add_argument("-am", "--select-mixer", action="store", dest="select_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-dm", "--de-select-mixer", action="store", dest="deselect_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-vm", "--volume", action="store", dest="set_volume", default=0, metavar=("MIXERNUM", "VOLUME"), nargs=2, help="Set volume of a mixer source", type=int)
# shutdown server
self.parser.add_argument("-sd", "--shutdown", action="store_true", dest="shutdown", default=False, help="Shutting down aura server")
# playlist in/output
self.parser.add_argument("-fnp", "--fetch-new-programmes", action="store_true", dest="fetch_new_programme", default=False, help="Fetch new programmes from api_steering_calendar in engine.ini")
self.parser.add_argument("-pmq", "--print-message-queue", action="store_true", dest="print_message_queue", default=False, help="Prints message queue")
# send a redis message
self.parser.add_argument("-rm", "--redis-message", action="store", dest="redis_message", default=False, metavar=("CHANNEL", "MESSAGE"), nargs=2, help="Send a redis message to the Listeners")
# calls from liquidsoap
self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?")
self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?")
self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing")
self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?")
self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing")
if len(sys.argv) == 1:
raise ValueError("No Argument passed!")
def valid_playlist_entry(argument):
from datetime import datetime
try:
index = int(argument[0])
fromtime = datetime.strptime(argument[1], "%Y-%m-%d")
source = argument[2]
return index, fromtime, source
except:
msg = "Not a valid date: '{0}'.".format(argument[0])
print(msg)
raise
# # ## ## ## ## ## # #
# # ENTRY FUNCTION # #
# # ## ## ## ## ## # #
def main():
Guru()
# # ## ## ## ## ## ## # #
# # End ENTRY FUNCTION # #
# # ## ## ## ## ## ## # #
if __name__ == "__main__":
main()
#!/bin/bash
mode="dev"
if [[ $* =~ ^(prod)$ ]]; then
mode="prod"
fi
if [ $mode == "dev" ]; then
echo "[Installing AURA ENGINE for Development]"
fi
if [ $mode == "prod" ]; then
echo "[Installing AURA ENGINE for Production]"
fi
# Find the correct Python version (3.7 or 3.8)
if hash python3.8 2>/dev/null; then
PYTHON_EXEC="python3.8"
echo "[ Using Python 3.8 ]"
else
PYTHON_EXEC="python3.7"
echo "[ Using Python 3.7 ]"
fi
# Development and Production
echo "Installing OPAM Packages ..."
bash script/install-opam-packages.sh
echo "Installing Python Requirements ..."
$PYTHON_EXEC $(which pip3) install -r requirements.txt
# Development
if [ $mode == "dev" ]; then
echo "Create local 'logs' Folder ..."
mkdir -p logs
echo "Copy configuration to './configuration/engine.ini'"
cp -n configuration/sample-development.engine.ini configuration/engine.ini
fi
# Production
if [ $mode == "prod" ]; then
echo "Create local 'tmp' Folder ..."
mkdir -p tmp
echo "Copy default Engine configuration to '/etc/aura/engine.ini'"
cp -n configuration/sample-production.engine.ini /etc/aura/engine.ini
fi
echo
echo "+++ Installation of AURA Engine finished! +++"
echo
\ No newline at end of file
# Meta
__author__ = "David Trattnig and Gottfried Gaisbauer"
__copyright__ = "Copyright 2017-2020, Aura Engine Team"
__credits__ = ["David Trattnig", "Gottfried Gaisbauer", "Michael Liebler"]
__license__ = "GNU Affero General Public License (AGPL) Version 3"
__version__ = "0.8.2"
__version_info__ = (0, 8, 2)
__maintainer__ = "David Trattnig"
__email__ = "david.trattnig@subsquare.at"
__status__ = "Development"
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import sys
import logging
from pathlib import Path
from configparser import ConfigParser
class AuraConfig:
"""
AuraConfig Class
Holds the Engine Configuration as in the file `engine.ini`.
"""
ini_path = ""
logger = None
def __init__(self, ini_path="/etc/aura/engine.ini"):
"""
Initializes the configuration, defaults to `/etc/aura/engine.ini`.
If this file doesn't exist it uses `./configuration/engine.ini` from
the project directory.
Args:
ini_path(String): The path to the configuration file `engine.ini`
"""
self.logger = logging.getLogger("AuraEngine")
config_file = Path(ini_path)
if not config_file.is_file():
ini_path = "%s/configuration/engine.ini" % Path(__file__).parent.parent.parent.absolute()
self.ini_path = ini_path
self.load_config()
# Defaults
self.set("config_dir", os.path.dirname(ini_path))
self.set("install_dir", os.path.realpath(__file__ + "../../../.."))
self.set("use_test_data", False) # TODO Still needed?
def set(self, key, value):
"""
Setter for some specific config property.
Args:
key (String): key
default (*): value
"""
try:
self.__dict__[key] = int(value)
except:
self.__dict__[key] = str(value)
def get(self, key, default=None):
"""
Getter for some specific config property.
Args:
key (String): key
default (*): value
"""
if key not in self.__dict__:
if default:
self.set(key, default)
else:
self.logger.warning("Key " + key + " not found in configfile " + self.ini_path + "!")
return None
if key == "loglevel":
loglvl = self.__dict__[key]
if loglvl == "debug":
return logging.DEBUG
elif loglvl == "info":
return logging.INFO
elif loglvl == "warning":
return logging.WARNING
elif loglvl == "error":
return logging.ERROR
else:
return logging.CRITICAL
if key == "debug":
return self.__dict__[key].count("y")
return self.__dict__[key]
def load_config(self):
"""
Set config defaults and load settings from file
"""
if not os.path.isfile(self.ini_path):
self.logger.critical(self.ini_path + " not found :(")
sys.exit(1)
# Read the file
f = open(self.ini_path, 'r')
ini_str = f.read()
f.close()
# Parse the values
config_parser = ConfigParser()
try:
config_parser.read_string(ini_str)
except Exception as e:
self.logger.critical("Cannot read " + self.ini_path + "! Reason: " + str(e))
sys.exit(0)
for section in config_parser.sections():
for key, value in config_parser.items(section):
v = config_parser.get(section, key).replace('"', '').strip()
self.set(key, v)
def get_database_uri(self):
"""
Retrieves the database connection string.
"""
db_name = self.get("db_name")
db_user = self.get("db_user")
db_pass = str(self.get("db_pass"))
db_host = self.get("db_host")
db_charset = self.get("db_charset", "utf8")
return "mysql://" + db_user + ":" + db_pass + "@" + db_host + "/" + db_name + "?charset=" + db_charset
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Scheduler Exceptions
class NoProgrammeLoadedException(Exception):
pass
class NoActiveScheduleException(Exception):
pass
# Soundsystem and Mixer Exceptions
class LoadSourceException(Exception):
pass
class InvalidChannelException(Exception):
pass
class PlaylistException(Exception):
pass
class NoActiveEntryException(Exception):
pass
# Liquidsoap Execeptions
class LQConnectionError(Exception):
pass
class LQStreamException(Exception):
pass
class RedisConnectionException(Exception):
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.base.config import AuraConfig
class AuraLogger():
"""
AuraLogger Class
Logger for all Aura Engine components. The default
logger is `AuraEngine`. Other loggers are defined
by passing a custom name on instantiation.
The logger respects the log-level as defined in the
engine's configuration file.
"""
config = None
logger = None
def __init__(self, config, name="AuraEngine"):
"""
Constructor to create a new logger defined by
the passed name.
Args:
name (String): The name of the logger
"""
self.config = config
self.__create_logger(name)
def __create_logger(self, name):
"""
Creates the logger instance for the given name.
Args:
name (String): The name of the logger
"""
lvl = self.config.get("loglevel")
# create logger
self.logger = logging.getLogger(name)
self.logger.setLevel(lvl)
if not self.logger.hasHandlers():
# create file handler for logger
file_handler = logging.FileHandler(self.config.get("logdir") + "/"+name+".log")
file_handler.setLevel(lvl)
# create stream handler for logger
stream_handler = logging.StreamHandler()
stream_handler.setLevel(lvl)
# set format of log
datepart = "%(asctime)s:%(name)s:%(levelname)s"
message = " - %(message)s - "
filepart = "[%(filename)s:%(lineno)s-%(funcName)s()]"
formatter = logging.Formatter(datepart + message + filepart)
# set log of handlers
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# add handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(stream_handler)
self.logger.critical("ADDED HANDLERS")
else:
self.logger.critical("REUSED LOGGER")
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import smtplib
from email.message import EmailMessage
class MailingException(Exception):
"""
Thrown when some mail cannot be sent.
"""
class AuraMailer():
"""
Service to send emails to Aura administrators.
"""
config = None
def __init__(self, config):
"""
Constructor to initialize service with Aura `config`.
Args:
config (AuraConfig): The configuration with the mail server details
"""
self.config = config
self.admin_mails = config.get("admin_mail")
#
# PUBLIC METHODS
#
def send_admin_mail(self, subject, body):
"""
Sends an email to the administrator as defined in the configuration.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
admin_mails = self.admin_mails.split()
for mail_to in admin_mails:
self.__send(mail_to, subject, body)
#
# PRIVATE METHODS
#
def __send(self, mail_to, subject, body):
"""
Sends an email to the given address.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
# read config
mail_server = self.config.get("mail_server")
mail_port = self.config.get("mail_server_port")
mail_user = self.config.get("mail_user")
mail_pass = self.config.get("mail_pass")
from_mail = self.config.get("from_mail")
# check settings
if mail_server == "":
raise MailingException("Mail Server not set")
if mail_port == "":
raise MailingException("Mailserver Port not set")
if mail_user == "":
raise MailingException("Mail user not set")
if mail_pass == "":
raise MailingException("No Password for mailing set")
if from_mail == "":
raise MailingException("From Mail not set")
# stuff the message together and ...
msg = EmailMessage()
msg.set_content(body)
mailsubject_prefix = self.config.get("mailsubject_prefix")
if mailsubject_prefix == "":
msg["Subject"] = subject
else:
msg["Subject"] = mailsubject_prefix + " " + subject
msg["From"] = from_mail
msg["To"] = mail_to
# ... send the mail
try:
server = smtplib.SMTP(mail_server, int(mail_port))
server.starttls()
server.login(mail_user, mail_pass)
server.send_message(msg)
server.quit()
except Exception as e:
raise MailingException(str(e))
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import logging
import datetime
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm
from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from modules.scheduling.types import PlaylistType
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil, EngineUtil
# Init Config
config = AuraConfig()
# Initialize DB Model and session
engine = sa.create_engine(config.get_database_uri())
Base = declarative_base()
Base.metadata.bind = engine
class DB():
session = orm.scoped_session(orm.sessionmaker())(bind=engine)
Model = Base
class AuraDatabaseModel():
"""
AuraDataBaseModel.
Holding all tables and relationships for the engine.
"""
logger = None
def __init__(self):
"""
Constructor.
"""
self.logger = logging.getLogger("AuraEngine")
def store(self, add=False, commit=False):
"""
Store to the database
"""
if add:
DB.session.add(self)
else:
DB.session.merge(self)
if commit:
DB.session.commit()
def delete(self, commit=False):
"""
Delete from the database
"""
DB.session.delete(self)
if commit:
DB.session.commit()
def refresh(self):
"""
Refreshes the currect record
"""
DB.session.expire(self)
DB.session.refresh(self)
def _asdict(self):
return self.__dict__
@staticmethod
def recreate_db(systemexit = False):
"""
Re-creates the database for developments purposes.
"""
manualschedule = Schedule()
manualschedule.schedule_id = 0
manualschedule.show_name = "Manual Show"
Base.metadata.drop_all()
Base.metadata.create_all()
# self.logger.debug("inserting manual scheduling possibility and fallback trackservice schedule")
# DB.session.add(manualschedule)
# db.session.add(fallback_trackservice_schedule)
# self.logger.debug("all created. commiting...")
DB.session.commit()
if systemexit:
sys.exit(0)
#
# SCHEDULES & PLAYLISTS
#
class Schedule(DB.Model, AuraDatabaseModel):
"""
One specific Schedule for a show on a timeslot.
Holding references to playlists and fallback-playlists.
"""
__tablename__ = 'schedule'
# Primary keys
id = Column(Integer, primary_key=True, autoincrement=True)
schedule_start = Column(DateTime, unique=True, index=True)
schedule_end = Column(DateTime, unique=True, index=True)
schedule_id = Column(Integer, unique=True)
show_id = Column(Integer)
show_name = Column(String(256))
show_hosts = Column(String(256))
funding_category = Column(String(256))
comment = Column(String(512))
languages = Column(String(256))
type = Column(String(256))
category = Column(String(256))
topic = Column(String(256))
musicfocus = Column(String(256))
is_repetition = Column(Boolean())
playlist_id = Column(Integer) #, ForeignKey("playlist.playlist_id"))
schedule_fallback_id = Column(Integer)
show_fallback_id = Column(Integer)
station_fallback_id = Column(Integer)
fallback_state = PlaylistType.DEFAULT
fadeouttimer = None # Used to fade-out the schedule, even when entries are longer
playlist = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.playlist_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
schedule_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.schedule_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
show_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.show_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
station_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.station_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
@staticmethod
def select_show_on_datetime(date_time):
return DB.session.query(Schedule).filter(Schedule.schedule_start == date_time).first()
@staticmethod
def select_programme(date_from=datetime.date.today()):
"""
Select all schedules starting from `date_from` or from today if no
parameter is passed.
Args:
date_from (datetime): Select schedules from this date and time on
Returns:
([Schedule]): List of schedules
"""
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start >= date_from).\
order_by(Schedule.schedule_start).all()
return schedules
@staticmethod
def select_upcoming(n):
"""
Selects the (`n`) upcoming schedules.
"""
now = datetime.datetime.now()
DB.session.commit() # Required since independend session is used.
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start > str(now)).\
order_by(Schedule.schedule_start.asc()).limit(n).all()
return schedules
@hybrid_property
def start_unix(self):
"""
Start time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_end.timetuple())
def as_dict(self):
"""
Returns the schedule as a dictionary for serialization.
"""
playlist = self.playlist
return {
"schedule_id": self.schedule_id,
"schedule_start": self.schedule_start.isoformat(),
"schedule_end": self.schedule_end.isoformat(),
"topic": self.topic,
"musicfocus": self.musicfocus,
"funding_category": self.funding_category,
"is_repetition": self.is_repetition,
"category": self.category,
"languages": self.languages,
"comment": self.comment,
"playlist_id": self.playlist_id,
"schedule_fallback_id": self.schedule_fallback_id,
"show_fallback_id": self.show_fallback_id,
"station_fallback_id": self.station_fallback_id,
"show": {
"name": self.show_name,
"type": self.get_type(),
"host": self.show_hosts
},
"playlist": playlist
}
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [Show: %s, ShowID: %s | %s - %s ]" % (str(self.schedule_id), self.show_name, str(self.show_id), time_start, time_end)
class Playlist(DB.Model, AuraDatabaseModel):
"""
The playlist containing playlist entries.
"""
__tablename__ = 'playlist'
# pk,fk
artificial_id = Column(Integer, primary_key=True)
schedule_start = Column(DateTime, ForeignKey("schedule.schedule_start"))
# relationships
schedule = relationship("Schedule", uselist=False, back_populates="playlist")
entries = relationship("PlaylistEntry", back_populates="playlist")
# data
playlist_id = Column(Integer, autoincrement=False) # , ForeignKey("schedule.playlist_id"))
show_name = Column(String(256))
fallback_type = Column(Integer)
entry_count = Column(Integer)
@staticmethod
def select_all():
"""
Fetches all entries
"""
all_entries = DB.session.query(Playlist).filter(Playlist.fallback_type == 0).all()
cnt = 0
for entry in all_entries:
entry.programme_index = cnt
cnt = cnt + 1
return all_entries
@staticmethod
def select_playlist_for_schedule(start_date, playlist_id):
"""
Retrieves the playlist for the given schedule identified by `start_date` and `playlist_id`
Args:
start_date (datetime): Date and time when the playlist is scheduled
playlist_id (Integer): The ID of the playlist
Returns:
(Playlist): The playlist, if existing for schedule
Raises:
Exception: In case there a inconsistent database state, such es multiple playlists for given date/time.
"""
playlist = None
playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == start_date).all()
# FIXME There are unknown issues with the native SQL query by ID
# playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == datetime and Playlist.playlist_id == playlist_id).all()
for p in playlists:
if p.playlist_id == playlist_id:
playlist = p
return playlist
@staticmethod
def select_playlist(playlist_id):
"""
Retrieves all paylists for that given playlist ID.
Args:
playlist_id (Integer): The ID of the playlist
Returns:
(Array<Playlist>): An array holding the playlists
"""
return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.schedule_start).all()
@staticmethod
def is_empty():
"""
Checks if the given is empty
"""
try:
return not DB.session.query(Playlist).one_or_none()
except sa.orm.exc.MultipleResultsFound:
return False
@hybrid_property
def start_unix(self):
"""
Start time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple()) + self.duration
@hybrid_property
def duration(self):
"""
Returns the total length of the playlist in seconds.
Returns:
(Integer): Length in seconds
"""
total = 0
for entry in self.entries:
total += entry.duration
return total
def as_dict(self):
"""
Returns the playlist as a dictionary for serialization.
"""
entries = []
for e in self.entries:
entries.append(e.as_dict())
playlist = {
"playlist_id": self.playlist_id,
"fallback_type": self.fallback_type,
"entry_count": self.entry_count,
"entries": entries
}
return playlist
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [items: %s | %s - %s]" % (str(self.playlist_id), str(self.entry_count), str(time_start), str(time_end))
class PlaylistEntry(DB.Model, AuraDatabaseModel):
"""
Playlist entries are the individual items of a playlist such as audio files.
"""
__tablename__ = 'playlist_entry'
# primary keys
artificial_id = Column(Integer, primary_key=True)
# foreign keys
artificial_playlist_id = Column(Integer, ForeignKey("playlist.artificial_id"))
entry_num = Column(Integer) # , primary_key=True)
uri = Column(String(1024))
duration = Column(BigInteger)
source = Column(String(1024))
entry_start = Column(DateTime)
entry_start_actual = None # Assigned when the entry is actually played
channel = None # Assigned when entry is actually played
queue_state = None # Assigned when entry is about to be queued
status = None # Assigned when state changes
switchtimer = None
loadtimer = None
fadeouttimer = None
# relationships
playlist = relationship("Playlist", uselist=False, back_populates="entries")
meta_data = relationship("PlaylistEntryMetaData", uselist=False, back_populates="entry")
@staticmethod
def select_playlistentry_for_playlist(artificial_playlist_id, entry_num):
"""
Selects one entry identified by `playlist_id` and `entry_num`.
"""
return DB.session.query(PlaylistEntry).filter(PlaylistEntry.artificial_playlist_id == artificial_playlist_id, PlaylistEntry.entry_num == entry_num).first()
@staticmethod
def delete_entry(artificial_playlist_id, entry_num):
"""
Deletes the playlist entry and associated metadata.
"""
entry = PlaylistEntry.select_playlistentry_for_playlist(artificial_playlist_id, entry_num)
metadata = PlaylistEntryMetaData.select_metadata_for_entry(entry.artificial_id)
metadata.delete()
entry.delete()
DB.session.commit()
@staticmethod
def count_entries(artificial_playlist_id):
"""
Returns the count of all entries.
"""
result = DB.session.query(PlaylistEntry).filter(PlaylistEntry.artificial_playlist_id == artificial_playlist_id).count()
return result
@hybrid_property
def entry_end(self):
return self.entry_start + datetime.timedelta(seconds=self.duration)
@hybrid_property
def start_unix(self):
return time.mktime(self.entry_start.timetuple())
@hybrid_property
def end_unix(self):
return time.mktime(self.entry_end.timetuple())
@hybrid_property
def volume(self):
return 100 # FIXME Make DB Column
def get_type(self):
return EngineUtil.get_channel_type(self.uri)
def get_prev_entries(self):
"""
Retrieves all previous entries as part of the current entry's playlist.
Returns:
(List): List of PlaylistEntry
"""
prev_entries = []
for entry in self.playlist.entries:
if entry.entry_start < self.entry_start:
prev_entries.append(entry)
return prev_entries
def get_next_entries(self, schedule_sensitive=True):
"""
Retrieves all following entries as part of the current entry's playlist.
Args:
schedule_sensitive (Boolean): If `True` entries which start after \
the end of the schedule are excluded
Returns:
(List): List of PlaylistEntry
"""
next_entries = []
for entry in self.playlist.entries:
if entry.entry_start > self.entry_start:
if schedule_sensitive:
if entry.entry_start < self.playlist.schedule.schedule_end:
next_entries.append(entry)
else:
next_entries.append(entry)
return next_entries
def as_dict(self):
"""
Returns the entry as a dictionary for serialization.
"""
if self.meta_data:
return {
"id": self.artificial_id,
"duration": self.duration,
"artist": self.meta_data.artist,
"album": self.meta_data.album,
"title": self.meta_data.title
}
return None
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.source[-25:]
return "PlaylistEntry #%s [%s - %s | %ssec | Source: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track)
class PlaylistEntryMetaData(DB.Model, AuraDatabaseModel):
"""
Metadata for a playlist entry such as the artist and track name.
"""
__tablename__ = "playlist_entry_metadata"
artificial_id = Column(Integer, primary_key=True)
artificial_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"))
artist = Column(String(256))
title = Column(String(256))
album = Column(String(256))
entry = relationship("PlaylistEntry", uselist=False, back_populates="meta_data")
@staticmethod
def select_metadata_for_entry(artificial_playlistentry_id):
return DB.session.query(PlaylistEntryMetaData).filter(PlaylistEntryMetaData.artificial_entry_id == artificial_playlistentry_id).first()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
from enum import Enum
from modules.core.channels import ChannelType
from modules.scheduling.types import PlaylistType
class EngineUtil:
"""
A class for Engine utilities.
"""
@staticmethod
def get_channel_type(uri):
"""
Returns the channel type, depending on the passed URI and source.
Args:
uri (String): The URI of the source
"""
if uri.startswith("https"):
return ChannelType.HTTPS
if uri.startswith("http"):
return ChannelType.HTTP
if uri.startswith("pool") or uri.startswith("playlist") or uri.startswith("file"):
return ChannelType.FILESYSTEM
if uri.startswith("line://"):
return ChannelType.LIVE
@staticmethod
def uri_to_filepath(base_dir, uri):
"""
Converts a file-system URI to an actual, absolute path to the file.
Args:
basi_dir (String): The location of the audio store.
uri (String): The URI of the file
Returns:
path (String): Absolute file path
"""
return base_dir + "/" + uri[7:] + ".flac"
@staticmethod
def get_playlist_type(fallback_id):
"""
Converts an playlist type ID to the playlist type object.
Args:
id (String): playlist type ID
Returns:
type (PlaylistType): The type
"""
if fallback_id == 0:
return PlaylistType.DEFAULT
elif fallback_id == 1:
return PlaylistType.SHOW
elif fallback_id == 2:
return PlaylistType.TIMESLOT
else:
return PlaylistType.STATION
@staticmethod
def get_entries_string(entries):
"""
Returns a list of entries as String for logging purposes.
"""
s = ""
if isinstance(entries, list):
for entry in entries:
s += str(entry)
if entry != entries[-1]: s += ", "
else:
s = str(entries)
return s
@staticmethod
def lqs_annotate_cuein(uri, cue_in):
"""
Wraps the given URI with a Liquidsoap Cue In annotation.
Args:
uri (String): The path to the audio source
cue_in (Float): The value in seconds wher the cue in should start
Returns:
(String): The annotated URI
"""
if cue_in > 0.0:
uri = "annotate:liq_cue_in=\"%s\":%s" % (str(cue_in), uri)
return uri
@staticmethod
def engine_info(component, version):
"""
Prints the engine logo and version info.
"""
return """\n
█████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗
██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝
███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗
██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝
██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗
╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝
%s v%s - Ready to play!
\n""" % (component, version)
class SimpleUtil:
"""
A container class for simple utility methods.
"""
@staticmethod
def clean_dictionary(data):
"""
Delete keys with the value `None` in a dictionary, recursively.
This alters the input so you may wish to `copy` the dict first.
Args:
data (dict): The dicationary
Returns:
(dict):
"""
for key, value in list(data.items()):
if value is None:
del data[key]
elif isinstance(value, dict):
SimpleUtil.clean_dictionary(value)
return data
@staticmethod
def fmt_time(timestamp):
"""
Formats a UNIX timestamp to a String displaying time in the format '%H:%M:%S'.
Args:
(Integer) timestamp: Unix epoch
Returns:
(String): Displaying the time
"""
return datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
@staticmethod
def nano_to_seconds(nanoseconds):
"""
Converts nano-seconds to senconds
Args:
(Integer) nanoseconds
Returns:
(Float): seconds
"""
return float(nanoseconds / 1000000000)
@staticmethod
def seconds_to_nano(seconds):
"""
Converts senconds to nano-seconds
Args:
(Integer) seconds
Returns:
(Float): nanoseconds
"""
return int(seconds * 1000000000)
@staticmethod
def timestamp(date_and_time=None):
"""
Transforms the given `datetime` into a UNIX epoch timestamp.
If no parameter is passed, the current timestamp is returned.
Args:
(Datetime) date_and_time: the date and time to transform.
Returns:
(Integer): timestamp in seconds.
"""
if not date_and_time:
date_and_time = datetime.datetime.now()
return time.mktime(date_and_time.timetuple())
@staticmethod
def strike(text):
"""
Creates a strikethrough version of the given text.
Args:
(String) text: the text to strike.
Returns:
(String): the striked text.
"""
result = ""
for c in str(text):
result += c + TerminalColors.STRIKE.value
return result
@staticmethod
def bold(text):
"""
Creates a bold version of the given text.
"""
return TerminalColors.BOLD.value + text + TerminalColors.ENDC.value
@staticmethod
def underline(text):
"""
Creates a underlined version of the given text.
"""
return TerminalColors.UNDERLINE.value + text + TerminalColors.ENDC.value
@staticmethod
def blue(text):
"""
Creates a blue version of the given text.
"""
return TerminalColors.BLUE.value + text + TerminalColors.ENDC.value
@staticmethod
def red(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.RED.value + text + TerminalColors.ENDC.value
@staticmethod
def pink(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.PINK.value + text + TerminalColors.ENDC.value
@staticmethod
def yellow(text):
"""
Creates a yellow version of the given text.
"""
return TerminalColors.YELLOW.value + text + TerminalColors.ENDC.value
@staticmethod
def green(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.GREEN.value + text + TerminalColors.ENDC.value
@staticmethod
def cyan(text):
"""
Creates a cyan version of the given text.
"""
return TerminalColors.CYAN.value + text + TerminalColors.ENDC.value
class TerminalColors(Enum):
"""
Colors for formatting terminal output.
"""
HEADER = "\033[95m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
PINK = "\033[35m"
CYAN = "\033[36m"
WARNING = "\033[31m"
FAIL = "\033[41m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
STRIKE = "\u0336"
ENDC = "\033[0m"
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
from modules.cli.redis.adapter import ClientRedisAdapter, ServerRedisAdapter
from modules.base.models import AuraDatabaseModel
class Padavan:
args = None
config = None
ss = None
zmqclient = None
redisclient = None
stringreply = ""
# ------------------------------------------------------------------------------------------ #
def __init__(self, args, config):
self.args = args
self.config = config
# ------------------------------------------------------------------------------------------ #
def meditate(self):
if self.args.fetch_new_programme:
self.fetch_new_programme()
elif self.args.mixer_channels_selected:
self.mixer_channels_selected()
elif self.args.mixer_status:
self.mixer_status()
elif self.args.get_act_programme:
self.get_act_programme()
elif self.args.get_status:
self.get_status()
elif self.args.get_connection_status:
self.get_connection_status()
elif self.args.shutdown:
self.shutdown()
elif self.args.redis_message:
self.redis_message(self.args.redis_message[0], self.args.redis_message[1])
elif self.args.select_mixer != -1:
self.select_mixer(self.args.select_mixer)
elif self.args.deselect_mixer != -1:
self.select_mixer(self.args.deselect_mixer, False)
elif self.args.set_volume:
self.set_volume(self.args.set_volume[0], self.args.set_volume[1])
elif self.args.print_message_queue:
self.print_message_queue()
elif self.args.get_file_for:
self.get_next_file(self.args.get_file_for)
elif self.args.set_file_for:
self.set_next_file(self.args.set_file_for[0], self.args.set_file_for[1])
elif self.args.now_playing:
print("")
elif self.args.init_player:
self.init_player()
elif self.args.on_play:
self.on_play(self.args.on_play)
elif self.args.recreatedb:
self.recreatedb()
# else:
# raise Exception("")
# init liquid => faster exec time, when loading at runtime just what is needed
# ------------------------------------------------------------------------------------------ #
def init_liquidsoap_communication(self):
# import
from modules.core.engine import SoundSystem
# init liquidsoap communication
self.ss = SoundSystem(self.config)
# enable connection
self.ss.enable_transaction()
# ------------------------------------------------------------------------------------------ #
def destroy_liquidsoap_communication(self):
# enable connection
self.ss.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def init_redis_communication(self, with_server=False):
self.redisclient = ClientRedisAdapter(self.config)
if with_server:
self.redisserver = ServerRedisAdapter(self.config)
# ------------------------------------------------------------------------------------------ #
def send_redis(self, channel, message):
self.init_redis_communication()
self.redisclient.publish(channel, message)
# ------------------------------------------------------------------------------------------ #
def send_and_wait_redis(self, channel, message, reply_channel):
self.init_redis_communication(True)
self.redisclient.publish(channel, message)
return self.redisserver.listen_for_one_message(reply_channel.value)
# ------------------------------------------------------------------------------------------ #
def shutdown(self):
self.send_redis("aura", "shutdown")
self.stringreply = "Shutdown message sent!"
# ------------------------------------------------------------------------------------------ #
def fetch_new_programme(self):
json_reply = self.send_and_wait_redis("aura", "fetch_new_programme", RedisChannel.FNP_REPLY)
if json_reply != "":
actprogramme = json.loads(json_reply)
self.print_programme(actprogramme)
else:
print("No programme fetched")
# ------------------------------------------------------------------------------------------ #
def get_act_programme(self):
json_reply = self.send_and_wait_redis("aura", "get_act_programme", RedisChannel.GAP_REPLY)
actprogramme = json.loads(json_reply)
self.print_programme(actprogramme)
def get_status(self):
"""
Retrieves the Engine's status information.
"""
json_reply = self.send_and_wait_redis("aura", "get_status", RedisChannel.GS_REPLY)
# status = json.loads(json_reply)
self.stringreply = json_reply
# ------------------------------------------------------------------------------------------ #
def get_connection_status(self):
json_reply = self.send_and_wait_redis("aura", "get_connection_status", RedisChannel.GCS_REPLY)
connection_status = json.loads(json_reply)
self.print_connection_status(connection_status)
# ------------------------------------------------------------------------------------------ #
def print_programme(self, programme):
cnt = 1
for show in programme:
for entry in show["playlist"]:
self.stringreply += str(cnt) + \
" --- schedule id #" + str(show["schedule_id"]) + "." + str(entry["entry_num"]) + \
" - show: " + show["show_name"] + \
" - starts @ " + entry["entry_start"] + \
" - plays " + str(entry["source"]) + "\n"
cnt = cnt + 1
# ------------------------------------------------------------------------------------------ #
def print_connection_status(self, connection_status):
if connection_status["pv"]:
self.stringreply = "Connection to pv: " + TerminalColors.GREEN.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
else:
self.stringreply = "Connection to pv: " + TerminalColors.RED.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
if connection_status["db"]:
self.stringreply += "\nConnection to db: " + TerminalColors.GREEN.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to db: " + TerminalColors.RED.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
if connection_status["lqs"]:
self.stringreply += "\nConnection to lqs: " + TerminalColors.GREEN.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqs: " + TerminalColors.RED.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
if connection_status["lqsr"]:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.GREEN.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.RED.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
if connection_status["tank"]:
self.stringreply += "\nConnection to tank: " + TerminalColors.GREEN.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to tank: " + TerminalColors.RED.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
if connection_status["redis"]:
self.stringreply += "\nConnection to redis: " + TerminalColors.GREEN.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to redis: " + TerminalColors.RED.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
# ------------------------------------------------------------------------------------------ #
def init_player(self):
"""
Initializes the player on Liquidsaop startup.
"""
self.stringreply = self.send_and_wait_redis("aura", "init_player", RedisChannel.IP_REPLY)
def on_play(self, info):
"""
Event handler to be called when some entry started playing.
"""
self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY)
# ------------------------------------------------------------------------------------------ #
def recreatedb(self):
print("YOU WILL GET PROBLEMS DUE TO DATABASE BLOCKING IF aura.py IS RUNNING! NO CHECKS IMPLEMENTED SO FAR!")
x = AuraDatabaseModel()
x.recreate_db()
self.stringreply = "Database recreated!"
# ------------------------------------------------------------------------------------------ #
def redis_message(self, channel, message):
self.send_redis(channel, message)
self.stringreply = "Message '"+message+"' sent to channel '"+channel+"'"
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
self.stringreply = self.send_and_wait_redis("aura", "print_message_queue", RedisChannel.PMQ_REPLY)
# LIQUIDSOAP #
# ------------------------------------------------------------------------------------------ #
def select_mixer(self, mixername, activate=True):
# init lqs
self.init_liquidsoap_communication()
# select mixer and return the feedback
self.stringreply = self.ss.channel_activate(mixername, activate)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def set_volume(self, mixernumber, volume):
# init lqs and enable comm
self.init_liquidsoap_communication()
self.stringreply = self.ss.channel_volume(mixernumber, volume)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_channels_selected(self):
self.init_liquidsoap_communication()
am = self.ss.mixer_channels_selected()
if len(am) == 0:
self.destroy_liquidsoap_communication()
raise Exception("Guru recognized a problem: No active source!!!")
self.stringreply = str(am)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_status(self):
self.init_liquidsoap_communication()
status = self.ss.mixer_status()
for k, v in status.items():
self.stringreply += "source: " + k + "\t status: " + v + "\n"
# disable connection
self.destroy_liquidsoap_communication()
# REDIS #
# ------------------------------------------------------------------------------------------ #
def get_next_file(self, type):
# redis = RedisMessenger()
# next_file = redis.get_next_file_for(type)
# if next_file == "":
# next_file = "/var/audio/blank.flac"
# self.stringreply = next_file
#self.send_redis("aura", "set_next_file " + type)
next_file = self.send_and_wait_redis("aura", "get_next_file " + type, RedisChannel.GNF_REPLY)
self.stringreply = next_file
# ------------------------------------------------------------------------------------------ #
def set_next_file(self, type, file):
#from modules.cli.redis.messenger import RedisMessenger
#redis = RedisMessenger()
#redis.set_next_file_for(type, file)
self.send_redis("aura", "set_next_file " + type + " " + file)
self.stringreply = "Set "+file+" for fallback '"+type+"'"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import json
from datetime import datetime
from threading import Event
import threading
import redis
from modules.cli.redis.messenger import RedisMessenger
from modules.cli.redis.statestore import RedisStateStore
# from modules.communication.connection_tester import ConnectionTester
from modules.base.exceptions import RedisConnectionException
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
# ------------------------------------------------------------------------------------------ #
class ServerRedisAdapter(threading.Thread, RedisMessenger):
debug = False
pubsub = None
config = None
redisdb = None
channel = ""
scheduler = None
redisclient = None
# connection_tester = None
engine = None
socket = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
threading.Thread.__init__(self)
RedisMessenger.__init__(self, config)
# init
#threading.Thread.__init__ (self)
self.config = config
self.shutdown_event = Event()
self.channel = RedisChannel.STANDARD.value
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
self.can_send = None
self.redisclient = ClientRedisAdapter(config)
# self.connection_tester = ConnectionTester()
# ------------------------------------------------------------------------------------------ #
def run(self):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"))
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(self.channel)
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
# listener loop
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item))
item["channel"] = self.decode_if_needed(item["channel"])
item["data"] = self.decode_if_needed(item["data"])
try:
self.work(item)
except RedisConnectionException as rce:
self.logger.error(str(rce))
if not self.shutdown_event.is_set():
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
self.pubsub.unsubscribe()
if not self.shutdown_event.is_set():
self.logger.warning("unsubscribed from " + self.channel + " and finished")
# ------------------------------------------------------------------------------------------ #
def decode_if_needed(self, val):
if isinstance(val, bytes):
return val.decode("utf-8")
return val
# ------------------------------------------------------------------------------------------ #
def listen_for_one_message(self, channel, socket_timeout=2):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout)
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(channel)
try:
self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds")
for item in self.pubsub.listen():
it = self.receive_message(item)
if it is not None:
break
except redis.exceptions.TimeoutError as te:
raise te
return item["data"]
# ------------------------------------------------------------------------------------------ #
def receive_message(self, item):
if item["type"] == "subscribe":
self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8"))
return None
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
self.pubsub.unsubscribe()
return item
# ------------------------------------------------------------------------------------------ #
def work(self, item):
if item["data"] == "fetch_new_programme":
#self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme)
self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "shutdown":
self.terminate()
elif item["data"] == "init_player":
self.execute(RedisChannel.IP_REPLY.value, self.engine.init_player)
elif item["data"] == "get_act_programme":
self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "get_status":
def get_status_string():
status = "No monitoring plugin available!"
if "monitor" in self.engine.plugins:
status = self.engine.plugins["monitor"].get_status()
return json.dumps(status)
self.execute(RedisChannel.GS_REPLY.value, get_status_string)
# elif item["data"] == "get_connection_status":
# self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status)
elif item["data"] == "print_message_queue":
self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue)
elif item["data"].find("set_next_file") >= 0:
playlist = item["data"].split()[1]
playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist)
elif item["data"].find("get_next_file") >= 0:
playlist = item["data"].split()[1]
#playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)
elif item["data"].find("on_play") >= 0:
source = item["data"].split("on_play ")[1]
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.engine.player.on_play, source)
elif item["data"] == "recreate_db":
self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)
elif item["data"] == "status":
return True
else:
raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"])
# ------------------------------------------------------------------------------------------ #
def execute(self, channel, f, param1=None, param2=None, param3=None):
if param1 != None:
if param2 != None:
if param3 != None:
reply = f(param1, param2, param3)
else:
reply = f(param1, param2)
else:
reply = f(param1)
else:
reply = f()
if reply is None:
reply = ""
# sometimes the sender is faster than the receiver. redis messages would be lost
time.sleep(0.1)
self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value)
# publish
self.redisclient.publish(channel, reply)
# ------------------------------------------------------------------------------------------ #
def join_comm(self):
try:
while self.is_alive():
self.logger.debug(str(datetime.now())+" joining")
self.join()
self.logger.warning("join out")
except (KeyboardInterrupt, SystemExit):
# Dem Server den Shutdown event setzen
# server.shutdown_event.set()
# Der Server wartet auf Eingabe
# Daher einen Client initiieren, der eine Nachricht schickt
self.halt()
sys.exit('Terminated')
# ------------------------------------------------------------------------------------------ #
def halt(self):
"""
Stop the server
"""
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
try:
self.socket.unbind("tcp://"+self.ip+":"+self.port)
except:
pass
# self.socket.close()
# ------------------------------------------------------------------------------------------ #
def send(self, message):
"""
Send a message to the client
:param message: string
"""
# FIXME Review logic
if not self.can_send:
self.logger.debug("sending a "+str(len(message))+" long message via REDIS.")
self.socket.send(message.encode("utf-8"))
self.can_send = False
else:
self.logger.warning("cannot send message via REDIS: "+str(message))
def terminate(self):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.shutdown_event.set()
self.scheduler.terminate()
self.pubsub.close()
self.logger.info("Shutdown event received. Bye bye ...")
# ------------------------------------------------------------------------------------------ #
class ClientRedisAdapter(RedisMessenger):
def __init__(self, config):
RedisMessenger.__init__(self, config)
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
if type(channel) == RedisChannel:
channel = channel.value
self.rstore.publish(channel, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
class RedisChannel(Enum):
STANDARD = "aura"
DPE_REPLY = "delete_playlist_entry_reply"
FNP_REPLY = "fetch_new_programme_reply"
GAP_REPLY = "get_act_programme_reply"
GS_REPLY = "get_status_reply"
GCS_REPLY = "get_connection_status_reply"
GNF_REPLY = "get_next_file_reply"
IPE_REPLY = "insert_playlist_entry_reply"
IP_REPLY = "init_player_reply"
TS_REPLY = "track_service_reply"
MPE_REPLY = "move_playlist_entry_reply"
PMQ_REPLY = "print_message_queue_reply"
RDB_REPLY = "recreate_database_reply"
SNF_REPLY = "get_next_file_reply"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.cli.redis.statestore import RedisStateStore
from modules.cli.redis.channels import RedisChannel
"""
Send and receive redis messages
"""
# ------------------------------------------------------------------------------------------ #
class RedisMessenger():
logger = None
rstore = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
super(RedisMessenger, self).__init__()
"""
Constructor
"""
self.logger = logging.getLogger("AuraEngine")
self.channel = RedisChannel.STANDARD
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Einen "Kanal" setzen - zb scheduling
@type channel: string
@param channel: Kanal/Name der Komponente
"""
self.channel = channel
if channel in self.components:
self.errnr = self.components[channel]
self.rstore.set_channel(channel)
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
@type section: string
@param section: Gültigkeitsbereich
"""
self.section = section
# # ------------------------------------------------------------------------------------------ #
# def set_mail_addresses(self, fromMail, adminMails):
# """
# Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
# @type section: string
# @param section: Gültigkeitsbereich
# """
# self.fromMail = fromMail
# self.adminMails = adminMails
# # ------------------------------------------------------------------------------------------ #
# def send(self, message, code, level, job, value='', section=''):
# """
# Eine Message senden
# @type message: string
# @param message: menschenverständliche Nachricht
# @type code: string
# @param code: Fehlercode - endet mit 00 bei Erfolg
# @type level: string
# @param level: Error-Level - info, warning, error, fatal
# @type job: string
# @param job: Name der ausgeführten Funktion
# @type value: string
# @param value: Ein Wert
# @type section: string
# @param section: Globale Sektion überschreiben
# """
# section = self.section if section == '' else section
# self.time = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
# self.utime = time.time()
# state = {'message':message.strip().replace("'","\\'"), 'code':self.errnr + str(code),'job':job,'value':value}
# self.rstore.set_section(section)
# self.rstore.store(level, state)
# if level == 'info' or level == 'success':
# self.logger.info(message)
# elif level == 'warning':
# self.logger.warning(message)
# elif level == 'error':
# self.logger.error(message)
# self.send_admin_mail(level, message, state)
# elif level == 'fatal':
# self.logger.critical(message)
# self.send_admin_mail(level, message, state)
# # ------------------------------------------------------------------------------------------ #
# def say_alive(self):
# """
# Soll alle 20 Sekunden von den Komponenten ausgeführt werden,
# um zu melden, dass sie am Leben sind
# """
# self.rstore.set_alive_state()
# # ------------------------------------------------------------------------------------------ #
# def get_alive_state(self, channel):
# """
# Live State abfragen
# @type channel: string
# @param channel: Channel/Komponente
# """
# return self.rstore.get_alive_state(channel)
# # ------------------------------------------------------------------------------------------ #
# def set_state(self, name, value, expires=None, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: Name des state
# @type value: string
# @param value: Wert
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.set_state(name, value, expires, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_add_event(self, name, eventtime, value, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: der Name des Events
# @type eventtime: string|datetime.datetime
# @param eventtime: Datum und Zeit des events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# if type(eventtime) == type(str()):
# eventtime_str = datetime.datetime.strptime(eventtime[0:16].replace(' ','T'), "%Y-%m-%dT%H:%M").strftime("%Y-%m-%dT%H:%M")
# elif type(eventtime) is datetime.datetime:
# eventtime_str = eventtime.strftime("%Y-%m-%dT%H:%M")
# else:
# raise TypeError('eventtime must be a datetime.date or a string, not a %s' % type(eventtime))
# self.rstore.queue_add_event(eventtime_str, name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_remove_events(self, name, channel=None):
# """
# Löscht Events
# @type name: string
# @param name: der Name des Events
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.queue_remove_events(name, channel)
# # ------------------------------------------------------------------------------------------ #
# def fire_event(self, name, value, channel=None):
# """
# Feuert einen Event
# @type name: string
# @param name: der Name des Events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.fire_event(name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def get_event_queue(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# queue = self.rstore.get_event_queue(name, channel)
# return queue
# # ------------------------------------------------------------------------------------------ #
# def get_events(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# events = self.rstore.get_events(name, channel)
# return events
# # ------------------------------------------------------------------------------------------ #
# def get_event(self, name=None, channel=None):
# """
# Holt event eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: dict
# @return: Event
# """
# events = self.rstore.get_events(name, channel)
# result = False
# if events:
# result = events.pop(0)
# return result
# # ------------------------------------------------------------------------------------------ #
# def send_admin_mail(self, level, message, state):
# """
# Sendent mail an Admin(s),
# @type message: string
# @param message: Die Message
# @type state: dict
# @param state: Der State
# @return result
# """
# # FIXME Make Mailer functional: Invalid constructor
# if self.fromMail and self.adminMails:
# #subject = "Possible comba problem on job " + state['job'] + " - " + level
# mailmessage = "Hi Admin,\n comba reports a possible problem\n\n"
# mailmessage = mailmessage + level + "!\n"
# mailmessage = mailmessage + message + "\n\n"
# mailmessage = mailmessage + "Additional information:\n"
# mailmessage = mailmessage + "##################################################\n"
# mailmessage = mailmessage + "Job:\t" + state['job'] + "\n"
# mailmessage = mailmessage + "Code:\t" + state['code'] + "\n"
# mailmessage = mailmessage + "Value:\t" + str(state['value']) + "\n"
# #mailer = AuraMailer(self.adminMails, self.fromMail)
# #mailer.send_admin_mail(subject, mailmessage)
# else:
# return False
# # ------------------------------------------------------------------------------------------ #
# def receive(self):
# """
# Bisher wird nichts empfangen
# """
# return ""
# # ------------------------------------------------------------------------------------------ #
# def get_next_file_for(self, playlisttype):
# next = self.rstore.db.get('next_'+playlisttype+'file')
# if next is None:
# next = b""
# return next.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def on_play(self, info):
# result = self.rstore.db.get('on_play')
# if result is None:
# result = b""
# return result.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def set_next_file_for(self, playlisttype, file):
# self.rstore.db.set("next_" + playlisttype + "file", file)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import datetime
import json
import re
import uuid
import redis
class RedisStateStore(object):
"""Store and get Reports from redis"""
def __init__(self, config, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
self.channel = '*'
self.section = '*'
self.separator = '_'
self.daily = False
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Kanal setzen
@type channel: string
@param channel: Kanal
"""
self.channel = channel
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Sektion setzen
@type section: string
@param section: Sektion
"""
self.section = section
# ------------------------------------------------------------------------------------------ #
def set_alive_state(self):
"""
Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
"""
self.set_state('alive', 'Hi', 21)
# ------------------------------------------------------------------------------------------ #
def get_alive_state(self, channel):
"""
Alive Status eines Channels ermitteln
@type channel: string
@param channel: der Channel
@rtype: string/None
@return: Ein String, oder None, bei negativem Ergebnis
"""
return self.get_state('alive', channel)
# ------------------------------------------------------------------------------------------ #
def set_state(self, name, value, expires=None, channel=None):
"""
Setzt einen Status
@type name: string
@param name: Name des state
@type value: string
@param value: Wert
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
key = self.__create_key__(channel + 'State', name)
if value == "":
self.db.delete(key)
else:
# publish on channel
message = json.dumps({'eventname':name, 'value': value})
self.db.publish(channel + 'Publish', message)
# store in database
self.db.set(key, value)
if(expires):
self.db.expire(key, 21)
# ------------------------------------------------------------------------------------------ #
def get_state(self, name, channel):
"""
Holt einen Status
@type name: string
@param name: Name des state
@type channel: string
@param channel: Kanal (optional)
"""
key = self.__create_key__(channel + 'State', name)
return self.db.get(key)
# ------------------------------------------------------------------------------------------ #
def queue_add_event(self, eventtime, name, value, channel=None):
"""
Kündigt einen Event an
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
# ------------------------------------------------------------------------------------------ #
def queue_remove_events(self, name=None, channel=None):
"""
Löscht Events
@type name: string
@param name: Name des Events
@type channel: string
@param channel: Kanal (optional)
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
for delkey in keys:
self.db.delete(delkey)
# ------------------------------------------------------------------------------------------ #
def fire_event(self, name, value, channel=None):
"""
Feuert einen Event
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
# ------------------------------------------------------------------------------------------ #
def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
"""
Feuert einen Event
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
key = self.__create_key__(channel + type, eventtime, name)
value['starts'] = eventtime[0:16]
value['eventchannel'] = channel
value['eventname'] = name
self.db.hset(key, namespace, value)
self.db.expire(key, expire)
# ------------------------------------------------------------------------------------------ #
def get_event_queue(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'evqueue')
return entries
# ------------------------------------------------------------------------------------------ #
def get_events(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Event_' if channel else '*Event_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'events')
return entries
# ------------------------------------------------------------------------------------------ #
def get_next_event(self, name=None, channel=None):
"""
Holt den aktuellsten Event
@type channel: string
@param channel: Kanal (optional)
@rtype: dict/boolean
@return: ein Event oder False
"""
events = self.get_event_queue(name, channel)
if len(events) > 0:
result = events.pop(0)
else:
result = False
return result
# ------------------------------------------------------------------------------------------ #
def store(self, level, value):
"""
Hash speichern
@type level: string
@param level: der errorlevel
@type value: dict
@param value: Werte als dict
"""
microtime = str(time.time())
value['microtime'] = microtime
value['level'] = level
key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
self.db.hset(key, self.channel, value)
self.db.expire(key, 864000)
# ------------------------------------------------------------------------------------------ #
def __get_keys__(self, level ='*'):
"""
Redis-Keys nach Suchkriterium ermitteln
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Die Keys auf die das Suchkriterium zutrifft
"""
key = self.__create_key__(self.channel, self.section, level)
microtime = str(time.time())
search = microtime[0:4] + '*' if self.daily else '*'
return self.db.keys(key + self.separator + '*')
# ------------------------------------------------------------------------------------------ #
def __create_key__(self, *args):
"""
Key erschaffen - beliebig viele Argumente
@rtype: string
@return: Der key
"""
return self.separator.join(args)
def get_entries(self, level ='*'):
"""
Liste von Hashs nach Suchkriterium erhalten
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Redis Hashs
"""
def tsort(x,y):
if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
return 1
elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
return -1
else:
return 0
keys = self.__get_keys__(level)
keys.sort(tsort)
entries = self.__get_entries__(keys, self.channel)
entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
return entries
# ------------------------------------------------------------------------------------------ #
def __get_entries__(self, keys, channel):
entries = []
for key in keys:
entry = self.db.hget(key,channel)
entry = json.dumps(entry.decode('utf-8'))
if not (entry is None):
try:
entry = entry.decode('utf-8').replace('None','"None"')
entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
entry = json.loads(entry)
entry['key'] = key
entries.append(entry)
except:
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
from modules.base.utils import SimpleUtil as SU
from modules.core.resources import ResourceType
class TransitionType(Enum):
"""
Types of fade-in and fade-out transition.
"""
INSTANT = "instant"
FADE = "fade"
class Channel(Enum):
"""
Channel name mappings to the Liqidsoap channel IDs.
"""
QUEUE_A = "in_filesystem_0"
QUEUE_B = "in_filesystem_1"
FALLBACK_QUEUE_A = "in_fallback_scheduled_0"
FALLBACK_QUEUE_B = "in_fallback_scheduled_1"
HTTP_A = "in_http_0"
HTTP_B = "in_http_1"
HTTPS_A = "in_https_0"
HTTPS_B = "in_https_1"
LIVE_0 = "linein_0"
LIVE_1 = "linein_1"
LIVE_2 = "linein_2"
LIVE_3 = "linein_3"
LIVE_4 = "linein_4"
def __str__(self):
return str(self.value)
class ChannelType(Enum):
"""
Engine channel types mapped to `Entry` source types.
"""
QUEUE = {
"id": "fs",
"numeric": 0,
"channels": [Channel.QUEUE_A, Channel.QUEUE_B]
}
FALLBACK_QUEUE = {
"id": "fallback_queue",
"numeric": 0,
"channels": [Channel.FALLBACK_QUEUE_A, Channel.FALLBACK_QUEUE_B]
}
HTTP = {
"id": "http",
"numeric": 1,
"channels": [Channel.HTTP_A, Channel.HTTP_B]
}
HTTPS = {
"id": "https",
"numeric": 2,
"channels": [Channel.HTTPS_A, Channel.HTTPS_B]
}
LIVE = {
"id": "live",
"numeric": 3,
"channels": [
Channel.LIVE_0,
Channel.LIVE_1,
Channel.LIVE_2,
Channel.LIVE_3,
Channel.LIVE_4
]
}
@property
def channels(self):
return self.value["channels"]
@property
def numeric(self):
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class EntryPlayState(Enum):
UNKNOWN = "unknown"
LOADING = "loading"
READY = "ready_to_play"
PLAYING = "playing"
FINISHED = "finished"
class LiquidsoapResponse(Enum):
SUCCESS = "Done"
STREAM_STATUS_POLLING = "polling"
STREAM_STATUS_STOPPED = "stopped"
STREAM_STATUS_CONNECTED = "connected"
class ChannelRouter():
"""
Wires source types with channels and channel-types.
"""
config = None
logger = None
resource_mapping = None
active_channels = None
def __init__(self, config, logger):
"""
Constructor
Args:
config (AuraConfig): The configuration
logger (Logger): The logger
"""
self.config = config
self.logger = logger
self.resource_mapping = {
ResourceType.FILE: ChannelType.QUEUE,
ResourceType.STREAM_HTTP: ChannelType.HTTP,
ResourceType.STREAM_HTTPS: ChannelType.HTTPS,
ResourceType.LINE: ChannelType.LIVE,
ResourceType.PLAYLIST: ChannelType.QUEUE,
ResourceType.POOL: ChannelType.QUEUE
}
self.active_channels = {
ChannelType.QUEUE: Channel.QUEUE_A,
ChannelType.FALLBACK_QUEUE: Channel.FALLBACK_QUEUE_A,
ChannelType.HTTP: Channel.HTTP_A,
ChannelType.HTTPS: Channel.HTTPS_A,
ChannelType.LIVE: Channel.LIVE_0
}
def set_active(self, channel_type, channel):
"""
Set the channel for the given resource type active
"""
self.active_channels[channel_type] = channel
def get_active(self, channel_type):
"""
Retrieves the active channel for the given resource type
"""
return self.active_channels[channel_type]
def type_of_channel(self, channel):
"""
Retrieves a `ChannelType` for the given `Channel`.
"""
if channel in ChannelType.QUEUE.channels:
return ChannelType.QUEUE
elif channel in ChannelType.FALLBACK_QUEUE.channels:
return ChannelType.FALLBACK_QUEUE
elif channel in ChannelType.HTTP.channels:
return ChannelType.HTTP
elif channel in ChannelType.HTTPS.channels:
return ChannelType.HTTPS
elif channel in ChannelType.LIVE.channels:
return ChannelType.LIVE
else:
return None
def type_for_resource(self, resource_type):
"""
Retrieves a `ChannelType` for the given `ResourceType`.
Only default mappings can be evaluatated. Custom variations
like fallback channels are not respected.
"""
return self.resource_mapping.get(resource_type)
def channel_swap(self, channel_type):
"""
Returns the currently inactive channel for a given type. For example if the currently some
file on channel QUEUE A is playing, the channel QUEUE B is returned for being used
to queue new entries.
Args:
entry_type (ResourceType): The resource type such es file, stream or live source
Returns:
(Channel, Channel): The previous and new channel
"""
previous_channel = self.active_channels[channel_type]
new_channel = None
msg = None
if channel_type == ChannelType.QUEUE:
if previous_channel == Channel.QUEUE_A:
new_channel = Channel.QUEUE_B
msg = "Swapped queue channel from A > B"
else:
new_channel = Channel.QUEUE_A
msg = "Swapped queue channel from B > A"
elif channel_type == ChannelType.FALLBACK_QUEUE:
if previous_channel == Channel.FALLBACK_QUEUE_A:
new_channel = Channel.FALLBACK_QUEUE_B
msg = "Swapped fallback queue channel from A > B"
else:
new_channel = Channel.FALLBACK_QUEUE_A
msg = "Swapped fallback channel from B > A"
elif channel_type == ChannelType.HTTP:
if previous_channel == Channel.HTTP_A:
new_channel = Channel.HTTP_B
msg = "Swapped HTTP Stream channel from A > B"
else:
new_channel = Channel.HTTP_A
msg = "Swapped HTTP Stream channel from B > A"
elif channel_type == ChannelType.HTTPS:
if previous_channel == Channel.HTTPS_A:
new_channel = Channel.HTTPS_B
msg = "Swapped HTTPS Stream channel from A > B"
else:
new_channel = Channel.HTTPS_A
msg = "Swapped HTTPS Stream channel from B > A"
else:
self.logger.warning(SU.red(f"No channel to swap - invalid entry_type '{channel_type}'"))
if msg: self.logger.info(SU.pink(msg))
return (previous_channel, new_channel)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import logging
from contextlib import suppress
from threading import Thread
import meta
from modules.base.utils import SimpleUtil as SU
from modules.base.exceptions import LQConnectionError, InvalidChannelException, LQStreamException, \
LoadSourceException
from modules.core.resources import ResourceClass, ResourceUtil
from modules.core.channels import ChannelType, TransitionType, LiquidsoapResponse, \
EntryPlayState, ResourceType, ChannelRouter
from modules.core.startup import StartupThread
from modules.core.events import EngineEventDispatcher
from modules.core.control import SocketControlInterface
from modules.core.mixer import Mixer, MixerType
from modules.core.liquidsoap.connector import PlayerConnector
class Engine():
"""
The Engine.
"""
engine_time_offset = 0.0
logger = None
sci = None
channels = None
channel_router = None
scheduler = None
event_dispatcher = None
is_liquidsoap_running = False
plugins = None
# Mixer
mixer = None
mixer_fallback = None
connector = None
def __init__(self, config):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = config
self.plugins = dict()
self.logger = logging.getLogger("AuraEngine")
# self.sci = SocketControlInterface.get_instance(self.config, self.logger)
# self.sci.attach(self)
self.is_active() # TODO Check if it makes sense to move it to the boot-phase
self.channel_router = ChannelRouter(self.config, self.logger)
Engine.engine_time_offset = self.config.get("lqs_delay_offset")
def start(self):
"""
Starts the engine. Called when the connection to the sound-system implementation
has been established.
"""
self.event_dispatcher = EngineEventDispatcher(self, self.scheduler)
# Sleep needed, because the socket is created too slowly by Liquidsoap
time.sleep(1)
self.player = Player(self.config, self.event_dispatcher)
# self.mixer = Mixer(self.config, MixerType.MAIN, self.connector)
# self.mixer_fallback = Mixer(self.config, MixerType.FALLBACK, self.connector)
self.is_liquidsoap_running = True
self.event_dispatcher.on_initialized()
self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap"))
self.event_dispatcher.on_boot()
self.logger.info(EngineSplash.splash_screen("Engine Core", meta.__version__))
self.event_dispatcher.on_ready()
#
# Basic Methods
#
def init_player(self):
"""
Initializes the LiquidSoap Player after startup of the engine.
Returns:
(String): Message that the player is started.
"""
t = StartupThread(self)
t.start()
return "Engine Core startup done!"
def is_active(self):
"""
Checks if Liquidsoap is running
"""
try:
self.uptime()
self.is_liquidsoap_running = True
except LQConnectionError as e:
self.logger.info("Liquidsoap is not running so far")
self.is_liquidsoap_running = False
except Exception as e:
self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e))
self.is_liquidsoap_running = False
return self.is_liquidsoap_running
def engine_state(self):
"""
Retrieves the state of all inputs and outputs.
"""
state = self.player.connector.send_lqc_command("engine", "state")
return state
def version(self):
"""
Get the version of Liquidsoap.
"""
data = self.player.connector.send_lqc_command("version", "")
return data
def uptime(self):
"""
Retrieves the uptime of Liquidsoap.
"""
data = self.player.connector.send_lqc_command("uptime", "")
return data
@staticmethod
def engine_time():
"""
Liquidsoap is slow in executing commands, therefore it's needed to schedule
actions by (n) seconds in advance, as defined in the configuration file by
the property `lqs_delay_offset`. it's important to note that this method
requires the class variable `EngineUtil.engine_time_offset` to be set on
Engine initialization.
Returns:
(Integer): the Unix epoch timestamp including the offset
"""
return SU.timestamp() + Engine.engine_time_offset
def terminate(self):
"""
Terminates the engine and all related processes.
"""
if self.sci: self.sci.terminate()
#
# PLAYER
#
class Player:
"""
Engine Player.
"""
config = None
logger = None
connector = None
channels = None
channel_router = None
event_dispatcher = None
# Mixer
mixer = None
mixer_fallback = None
def __init__(self, config, event_dispatcher):
"""
Constructor
Args:
config (AuraConfig): The configuration
"""
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.event_dispatcher = event_dispatcher
self.connector = PlayerConnector(self.config, self.event_dispatcher)
self.channel_router = ChannelRouter(self.config, self.logger)
self.mixer = Mixer(self.config, MixerType.MAIN, self.connector)
self.mixer_fallback = Mixer(self.config, MixerType.FALLBACK, self.connector)
def preroll(self, entry):
"""
Pre-Rolls/Pre-Loads the entry. This is required before the actual `play(..)` can happen.
Be aware when using this method to queue a very short entry (shorter than ``) this may
result in sitations with incorrect timing. In this case bundle multiple short entries as
one queue using `preroll_playlist(self, entries)`.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
`entry.state`.
Args:
entries ([Entry]): An array holding filesystem entries
"""
entry.status = EntryPlayState.LOADING
self.logger.info("Loading entry '%s'" % entry)
is_ready = False
# LIVE
if entry.get_content_type() in ResourceClass.LIVE.types:
entry.channel = "linein_" + entry.source.split("line://")[1]
is_ready = True
else:
channel_type = self.channel_router.type_for_resource(entry.get_content_type())
entry.previous_channel, entry.channel = self.channel_router.channel_swap(channel_type)
# QUEUE
if entry.get_content_type() in ResourceClass.FILE.types:
is_ready = self.queue_push(entry.channel, entry.source)
# STREAM
elif entry.get_content_type() in ResourceClass.STREAM.types:
is_ready = self.stream_load_entry(entry)
if is_ready:
entry.status = EntryPlayState.READY
self.event_dispatcher.on_queue([entry])
def preroll_group(self, entries, channel_type=ChannelType.QUEUE):
"""
Pre-Rolls/Pre-Loads multiple filesystem entries at once. This call is required before the
actual `play(..)` can happen. Due to their nature, non-filesystem entries cannot be queued
using this method. In this case use `preroll(self, entry)` instead. This method also allows
queuing of very short files, such as jingles.
It's important to note, that his method is blocking until loading has finished. If this
method is called asynchronously, the progress on the preloading state can be looked up in
`entry.state`.
Args:
entries ([Entry]): An array holding filesystem entries
channel_type (ChannelType): The type of channel where it should be queued (optional)
"""
channels = None
# Validate entry type
for entry in entries:
if entry.get_content_type() != ResourceType.FILE:
raise InvalidChannelException
# Determine channel
channels = self.channel_router.channel_swap(channel_type)
# Queue entries
for entry in entries:
entry.status = EntryPlayState.LOADING
self.logger.info("Loading entry '%s'" % entry)
# Choose and save the input channel
entry.previous_channel, entry.channel = channels
if self.queue_push(entry.channel, entry.source) == True:
entry.status = EntryPlayState.READY
self.event_dispatcher.on_queue(entries)
return channels
def play(self, entry, transition):
"""
Plays a new `Entry`. In case of a new schedule (or some intented, immediate transition),
a clean channel is selected and transitions between old and new channel is performed.
This method expects that the entry is pre-loaded using `preroll(..)` or `preroll_group(self, entries)`
before being played. In case the pre-roll has happened for a group of entries, only the
first entry of the group needs to be passed.
Args:
entry (PlaylistEntry): The audio source to be played
transition (TransitionType): The type of transition to use e.g. fade-in or instant volume level.
queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so;
otherwise a new channel of the same type is activated
"""
with suppress(LQConnectionError):
channel_type = self.channel_router.type_of_channel(entry.channel)
mixer = self.mixer
if channel_type == ChannelType.FALLBACK_QUEUE:
mixer = self.mixer_fallback
# Instant activation or fade-in
self.connector.enable_transaction()
if transition == TransitionType.FADE:
mixer.channel_select(entry.channel.value, True)
mixer.fade_in(entry.channel, entry.volume)
else:
mixer.channel_activate(entry.channel.value, True)
self.connector.disable_transaction()
# Update active channel for the current channel type
self.channel_router.set_active(channel_type, entry.channel)
# Dear filesystem channels, please leave the room as you would like to find it!
if entry.previous_channel and \
entry.previous_channel in ChannelType.QUEUE.channels and \
entry.previous_channel in ChannelType.FALLBACK_QUEUE.channels:
def clean_up():
# Wait a little, if there is some long fade-out. Note, this also means,
# this channel should not be used for at least some seconds (including clearing time).
time.sleep(2)
self.connector.enable_transaction()
mixer.channel_activate(entry.previous_channel.value, False)
res = self.queue_clear(entry.previous_channel)
self.logger.info("Clear Queue Response: " + res)
self.connector.disable_transaction()
Thread(target=clean_up).start()
# Filesystem meta-changes trigger the event via Liquidsoap
if not entry.channel in ChannelType.QUEUE.channels:
self.on_play(entry)
def on_play(self, source):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
Args:
source (String): The `Entry` or URI or of the media source currently being played
"""
self.event_dispatcher.on_play(source)
def stop(self, entry, transition):
"""
Stops the currently playing entry.
Args:
entry (Entry): The entry to stop playing
transition (TransitionType): The type of transition to use e.g. fade-out.
"""
with suppress(LQConnectionError):
self.connector.enable_transaction()
if not entry.channel:
self.logger.warn(SU.red("Trying to stop entry %s, but it has no channel assigned" % entry))
return
if transition == TransitionType.FADE:
self.mixer.fade_out(entry.channel, entry.volume)
else:
self.mixer.channel_volume(entry.channel, 0)
self.logger.info(SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry)))
self.connector.disable_transaction()
self.event_dispatcher.on_stop(entry)
def start_fallback_playlist(self, entries):
"""
Sets any scheduled fallback playlist and performs a fade-in.
Args:
entries ([Entry]): The playlist entries
"""
self.preroll_group(entries, ChannelType.FALLBACK_QUEUE)
self.play(entries[0], TransitionType.FADE)
def stop_fallback_playlist(self):
"""
Performs a fade-out and clears any scheduled fallback playlist.
"""
dirty_channel = self.channel_router.get_active(ChannelType.FALLBACK_QUEUE)
self.logger.info(f"Fading out channel '{dirty_channel}'")
self.connector.enable_transaction()
self.mixer_fallback.fade_out(dirty_channel, 100)
self.connector.disable_transaction()
def clean_up():
# Wait a little, if there is some long fade-out. Note, this also means,
# this channel should not be used for at least some seconds (including clearing time).
time.sleep(2)
self.connector.enable_transaction()
self.mixer_fallback.channel_activate(dirty_channel.value, False)
res = self.queue_clear(dirty_channel)
self.logger.info("Clear Fallback Queue Response: " + res)
self.connector.disable_transaction()
self.event_dispatcher.on_fallback_cleaned(dirty_channel)
Thread(target=clean_up).start()
#
# Channel Type - Stream
#
def stream_load_entry(self, entry):
"""
Loads the given stream entry and updates the entries's status codes.
Args:
entry (Entry): The entry to be pre-loaded
Returns:
(Boolean): `True` if successfull
"""
self.stream_load(entry.channel, entry.source)
time.sleep(1)
retry_delay = self.config.get("input_stream_retry_delay")
max_retries = self.config.get("input_stream_max_retries")
retries = 0
while not self.stream_is_ready(entry.channel, entry.source):
self.logger.info("Loading Stream ...")
if retries >= max_retries:
raise LoadSourceException("Could not connect to stream while waiting for %s seconds!" % str(retries*retry_delay))
time.sleep(retry_delay)
retries += 1
return True
def stream_load(self, channel, url):
"""
Preloads the stream URL on the given channel. Note this method is blocking
some serious amount of time; hence it's worth being called asynchroneously.
Args:
channel (Channel): The stream channel
uri (String): The stream URL
Returns:
(Boolean): `True` if successful
"""
result = None
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "stream_stop")
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("%s.stop result: %s" % (channel, result))
raise LQStreamException("Error while stopping stream!")
result = self.connector.send_lqc_command(channel, "stream_set_url", url)
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("%s.set_url result: %s" % (channel, result))
raise LQStreamException("Error while setting stream URL!")
# Liquidsoap ignores commands sent without a certain timeout
time.sleep(2)
result = self.connector.send_lqc_command(channel, "stream_start")
self.logger.info("%s.start result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def stream_is_ready(self, channel, url):
"""
Checks if the stream on the given channel is ready to play. Note this method is blocking
some serious amount of time even when successfull; hence it's worth being called asynchroneously.
Args:
channel (Channel): The stream channel
uri (String): The stream URL
Returns:
(Boolean): `True` if successful
"""
result = None
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "stream_status")
self.logger.info("%s.status result: %s" % (channel, result))
if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value):
return False
lqs_url = result.split(" ")[1]
if not url == lqs_url:
self.logger.error("Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url))
return False
self.connector.disable_transaction()
stream_buffer = self.config.get("input_stream_buffer")
self.logger.info("Ready to play stream, but wait %s seconds until the buffer is filled..." % str(stream_buffer))
time.sleep(round(float(stream_buffer)))
return True
#
# Channel Type - Queue
#
def queue_push(self, channel, uri):
"""
Adds an filesystem URI to the given `ChannelType.QUEUE` channel.
Args:
channel (Channel): The channel to push the file to
uri (String): The URI of the file
Returns:
(Boolean): `True` if successful
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.logger.info(SU.pink("queue.push('%s', '%s'" % (channel, uri)))
self.connector.enable_transaction()
audio_store = self.config.get("audio_source_folder")
extension = self.config.get("audio_source_extension")
filepath = ResourceUtil.uri_to_filepath(audio_store, uri, extension)
result = self.connector.send_lqc_command(channel, "queue_push", filepath)
self.logger.info("%s.queue_push result: %s" % (channel, result))
self.connector.disable_transaction()
# If successful, Liquidsoap returns a resource ID of the queued track
return int(result) >= 0
def queue_seek(self, channel, seconds_to_seek):
"""
Forwards the player of the given `ChannelType.QUEUE` channel by (n) seconds.
Args:
channel (Channel): The channel to push the file to
seconds_to_seeks (Float): The seconds to skip
Returns:
(String): Liquidsoap response
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "queue_seek", str(seconds_to_seek))
self.logger.info("%s.seek result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def queue_clear(self, channel):
"""
Removes all tracks currently queued in the given `ChannelType.QUEUE` channel.
Args:
channel (Channel): The channel to push the file to
Returns:
(String): Liquidsoap response
"""
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "queue_clear")
self.logger.info("%s.clear result: %s" % (channel, result))
self.connector.disable_transaction()
return result
#
# Channel Type - Playlist
#
def playlist_set_uri(self, channel, playlist_uri):
"""
Sets the URI of a playlist.
Args:
channel (Channel): The channel to push the file to
playlist_uri (String): The path to the playlist
Returns:
(String): Liquidsoap response
"""
self.logger.info(SU.pink("Setting URI of playlist '%s' to '%s'" % (channel, playlist_uri)))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "playlist_uri_set", playlist_uri)
self.logger.info("%s.playlist_uri result: %s" % (channel, result))
self.connector.disable_transaction()
return result
def playlist_clear_uri(self, channel):
"""
Clears the URI of a playlist.
Args:
channel (Channel): The channel to push the file to
Returns:
(String): Liquidsoap response
"""
self.logger.info(SU.pink("Clearing URI of playlist '%s'" % (channel)))
self.connector.enable_transaction()
result = self.connector.send_lqc_command(channel, "playlist_uri_clear")
self.logger.info("%s.playlist_uri_clear result: %s" % (channel, result))
self.connector.disable_transaction()
return result
class EngineSplash:
@staticmethod
def splash_screen(component, version):
"""
Prints the engine logo and version info.
"""
return """\n
█████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗
██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝
███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗
██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝
██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗
╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝
%s v%s - Ready to play!
\n""" % (component, version)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import datetime
from threading import Thread
from modules.base.utils import SimpleUtil as SU
from modules.base.exceptions import NoActiveEntryException
from modules.base.mail import AuraMailer
from modules.plugins.monitor import AuraMonitor
from modules.core.state import PlayerStateService
from modules.plugins.trackservice import TrackServiceHandler
class EventBinding():
"""
A binding between the event dispatcher and some event handler.
It allows you to subscribe to events in a chained way:
```
binding = dispatcher.attach(AuraMonitor)
binding.subscribe("on_boot").subscribe("on_play")
```
"""
dispatcher = None
instance = None
def __init__(self, dispatcher, instance):
self.dispatcher = dispatcher
self.instance = instance
def subscribe(self, event_type):
"""
Subscribes the instance to some event identified by the `event_type` string.
"""
self.dispatcher.subscribe(self.instance, event_type)
return self
def get_instance(self):
"""
Returns the object within that binding.
"""
return self.instance
class EngineEventDispatcher():
"""
Executes handlers for engine events.
"""
logger = None
config = None
subscriber_registry = None
mailer = None
soundsystem = None
player_state = None
scheduler = None
monitor = None
def __init__(self, soundsystem, scheduler):
"""
Initialize EventDispatcher
"""
self.subscriber_registry = dict()
self.logger = logging.getLogger("AuraEngine")
self.config = soundsystem.config
self.mailer = AuraMailer(self.config)
self.soundsystem = soundsystem
self.scheduler = scheduler
self.player_state = PlayerStateService(self.config)
binding = self.attach(AuraMonitor)
binding.subscribe("on_boot")
binding.subscribe("on_sick")
binding.subscribe("on_resurrect")
binding = self.attach(TrackServiceHandler)
binding.subscribe("on_play")
def attach(self, clazz):
"""
Creates an intance of the given Class.
"""
instance = clazz(self.config, self.soundsystem)
return EventBinding(self, instance)
def subscribe(self, instance, event_type):
"""
Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
"""
if not event_type in self.subscriber_registry:
self.subscriber_registry[event_type] = []
self.subscriber_registry[event_type].append(instance)
def call_event(self, event_type, args):
"""
Calls all subscribers for the given event type.
"""
if not event_type in self.subscriber_registry:
return
listeners = self.subscriber_registry[event_type]
if not listeners:
return
for listener in listeners:
method = getattr(listener, event_type)
if method:
if args:
method(args)
else:
method()
#
# Events
#
def on_initialized(self):
"""
Called when the engine is initialized e.g. connected to Liquidsoap.
"""
self.logger.debug("on_initialized(..)")
self.scheduler.on_initialized()
self.call_event("on_initialized", None)
def on_boot(self):
"""
Called when the engine is starting up. This happens after the initialization step.
"""
self.logger.debug("on_boot(..)")
self.call_event("on_boot", None)
def on_ready(self):
"""
Called when the engine is booted and ready to play.
"""
self.logger.debug("on_ready(..)")
self.scheduler.on_ready()
def on_play(self, source):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing. Note that this event resolves the source URI
and passes an `PlaylistEntry` to event handlers.
Args:
source (String): The `Entry` object *or* the URI of the media source currently playing
"""
def func(self, source):
self.logger.debug("on_play(..)")
entry = None
if isinstance(source, str):
try:
self.logger.info(SU.pink("Source '%s' started playing. Resolving ..." % source))
entry = self.player_state.resolve_entry(source)
except NoActiveEntryException:
self.logger.error("Cannot resolve '%s'" % source)
else:
entry = source
# Assign timestamp for play time
entry.entry_start_actual = datetime.datetime.now()
self.call_event("on_play", entry)
thread = Thread(target = func, args = (self, source))
thread.start()
def on_stop(self, entry):
"""
The entry on the assigned channel has been stopped playing.
"""
def func(self, entry):
self.logger.debug("on_stop(..)")
self.call_event("on_stop", entry)
thread = Thread(target = func, args = (self, entry))
thread.start()
def on_fallback_updated(self, playlist_uri):
"""
Called when the scheduled fallback playlist has been updated.
"""
self.logger.debug("on_fallback_updated(..)")
self.call_event("on_fallback_updated", playlist_uri)
def on_fallback_cleaned(self, cleaned_channel):
"""
Called when the scheduled fallback queue has been cleaned up.
"""
self.logger.debug("on_fallback_cleaned(..)")
self.call_event("on_fallback_cleaned", cleaned_channel)
def on_idle(self):
"""
Callend when no entry is playing
"""
def func(self):
self.logger.debug("on_idle(..)")
self.logger.error(SU.red("Currently there's nothing playing!"))
self.call_event("on_idle", None)
thread = Thread(target = func, args = (self, ))
thread.start()
def on_schedule_change(self, schedule):
"""
Called when the playlist or entries of the current schedule have changed.
"""
def func(self, schedule):
self.logger.debug("on_schedule_change(..)")
self.call_event("on_schedule_change", schedule)
thread = Thread(target = func, args = (self, schedule))
thread.start()
def on_queue(self, entries):
"""
One or more entries have been queued and are currently pre-loaded.
"""
def func(self, entries):
self.logger.debug("on_queue(..)")
self.player_state.add_to_history(entries)
self.call_event("on_queue", entries)
thread = Thread(target = func, args = (self, entries))
thread.start()
def on_sick(self, data):
"""
Called when the engine is in some unhealthy state.
"""
def func(self, data):
self.logger.debug("on_sick(..)")
self.call_event("on_sick", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_resurrect(self, data):
"""
Called when the engine turned healthy again after being sick.
"""
def func(self, data):
self.logger.debug("on_resurrect(..)")
self.call_event("on_resurrect", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_critical(self, subject, message, data=None):
"""
Callend when some critical event occurs
"""
def func(self, subject, message, data):
self.logger.debug("on_critical(..)")
if not data: data = ""
self.mailer.send_admin_mail(subject, message + "\n\n" + str(data))
self.call_event("on_critical", (subject, message, data))
thread = Thread(target = func, args = (self, subject, message, data))
thread.start()
\ No newline at end of file