Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Select Git revision
Show changes
Showing
with 4744 additions and 0 deletions
# Install for Production
<!-- TOC -->
- [Install for Production](#install-for-production)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Configuration](#configuration)
- [Running Engine](#running-engine)
- [The API Server](#the-api-server)
- [Maintanence using Supervisor](#maintanence-using-supervisor)
- [Logging](#logging)
- [Read more](#read-more)
<!-- /TOC -->
## Prerequisites
Aura Engine runs on any modern Debian-based OS. It requires at least
- `Python 3.7`
- `git`
Additionally you'll need these system packages below.
```shell
sudo apt-get update
sudo apt-get install \
supervisor \
opam \
redis-server \
libsndfile1 \
ffmpeg \
quelcom \
python3-pip \
virtualenv \
libssl-dev
```
Depending on the database management system you gonna use, you'll also need to install those packages.
In case of MariaDB this is:
```shell
sudo apt-get install \
python-dev \
default-libmysqlclient-dev \
mariadb-server \
libmariadbclient-dev
```
**Create an user**
While previous packages need superuser rights to be installed, the following ones are installed for the user which is
executing the engine. In your development environment you can skip this step. In production you first need to create
a user called `engineuser`.
```shell
sudo adduser engineuser
sudo adduser engineuser audio sudo
```
And switch to that user
```shell
su engineuser
```
**Liquidsoap Repository**
Engine requires at least `Liquidsoap 1.4.2` or newer, installed using [OPAM (OCaml Package Manager)](https://opam.ocaml.org/).
Add the current Liquidsoap repository from [this installation guide](https://www.liquidsoap.info/doc-1.4.2/install.html).
The other steps required for the Liquidsoap installation are handled by the `install.sh` script. If you experience any
errors, carefully review them and consult the official documentation for installing Liquidsoap.
**Cloning the project**
Create the folder `/opt/aura` and clone the engine project from there:
```shell
engineuser:/opt/aura/$ git clone https://gitlab.servus.at/autoradio/engine
```
Now you should have `/opt/aura/engine/`.
Let's move inside the home of engine:
```shell
engineuser:/opt/aura/$ cd engine
```
**Setup the database**
The following installation script sets up the database. You either need to be logged in as root
or have sudo rights.
```shell
root:/opt/aura/engine/$ bash script/setup-db.sh
```
By default Aura Engine uses MariaDB for persistence. When starting this script, please
ensure you have root access to your database instance. The installation script automatically
creates a database plus an associated user with password. If you want to use your own database
system, select "Other / Manually" during the database installation step.
If you have chosen to setup your database automatically, note the relevant credentials.
**Initialize folders and permissions**
Call this script to create the required log folders and update all permissions.
```bash
root:/opt/aura/engine$ bash script/initialize.sh
```
## Installation
The following installation script also sets up the database.
By default Aura Engine uses MariaDB for persistence. When starting the installation, please
ensure you have root access to your database instance. The installation script automatically
creates a database plus an associated user with password. If you want to use your own database
system, select "Other / Manually" during the database installation step.
```shell
engineuser:/opt/aura/engine$ ./install.sh prod
```
This script does the following:
- Install Liquidsoap components using OPAM (`script/install-opam-packages`)
- Python Packages (`requirements.txt`)
- Creates a default Engine configuration file in `/etc/aura/engine.ini`
- Creates a default Gunicorn configuration file in `gunicorn.conf.py`
When this is completed, carefully check if any error occured. In case your database has been setup
automatically, note the relevant credentials for later use in your `engine.ini` configuration.
## Configuration
In your production environment edit following file to configure the engine:
```shell
engineuser:/opt/aura/engine$ nano /etc/aura/engine.ini
```
Now, specify at least following settings to get started:
```ini
[database]
db_user="aura"
db_name="aura_engine"
db_pass="---SECRET--PASSWORD---"
```
Set the URLs to the *Steering* and *Tank* API:
```ini
[api]
# STEERING
api_steering_status = "http://localhost:8000/api/v1/"
# The URL to get the Calendar via Steering
api_steering_calendar="http://localhost:8000/api/v1/playout"
# The URL to get show details via Steering
api_steering_show="http://localhost:8000/api/v1/shows/${ID}/"
# TANK
api_tank_status = "http://localhost:8040/healthz"
# The URL to get playlist details via Tank
api_tank_playlist="http://localhost:8040/api/v1/shows/${SLUG}/playlists"
```
Ensure that the Liquidsoap installation path is valid:
```ini
[lqs]
liquidsoap_path="/home/engineuser/.opam/4.08.0/bin/liquidsoap"
```
**Configuring the API Server**
Set the correct IP in `/opt/aura/engine/configuration# nano gunicorn.conf.py` and
the exposed `exposed_api_url` in `engine.ini`.
Also open the `api_port` defined in `engine.ini` in your `iptables` (Default is 3333)
```shell
iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 3333 -j ACCEPT
```
**Configuring the Audio Store**
Finally Engine needs to be able to access the audio folder, where all the tracks of the playlists
are stored via *Tank*:
```ini
[audiofolder]
audiofolder="/var/audio"
```
There is some document on how to [Setup the Audio Store](docs/setup-audio-store.md).
If the audio device desired for playback is set as `default`, the Engine now should be ready to play
sound. You can check the default audio hardware by executing `aplay -L` on the command line. If that's
not the case you can set the default device in `/etc/asound.conf`. More advanced audio device settings
can be looked up in the [Configuration Guide](docs/configuration-guide.md).
Read about all other available settings in the [Configuration Guide](docs/configuration-guide.md).
## Running Engine
In production the process of starting the engine is slightly different compared to some development environment.
This is due to the need of ensuring the engine's components are always running i.e. letting them to restart
automatically after some system restart or crash has happened.
For this we utilize [Supervisor](http://supervisord.org/).
Also note, while running the engine might also work using a `systemd` service, the
recommened option to use in combination with Gunicorn ([API server](Running the API Server), see below),
is Supervisor. Beside others pros, Supervisor has the advantage that you are able to run services without
having superuser rights.
Now, given you are in the engine's home directory `/opt/aura/engine/`, simply type following to start
the services:
```shell
supervisord
```
This picks up the supervisor configuration provided in the local `supervisord.conf` and the service configurations
located in `configuration/supervisor/*.conf`.
Experience has shown it might be helpful to reload the supervisor configuration using `sudo`:
```shell
sudo supervisorctl reload
```
Note that the supervisor daemon starts all (both) services at once. If you want more fine-grained control for
starting services individually, please check-out the next section.
**Listing available Services**
```shell
engineuser:/opt/aura/engine$ supervisorctl avail
```
You should get these two services with their actual state listed:
```c++
aura-engine in use auto 666:666
aura-engine-api in use auto 999:999
```
## The API Server
For production Engine API uses the WSGI HTTP Server [`Gunicorn`](https://gunicorn.org/).
In production Gunicorn is used in combination with some proxy server, such as Nginx.
> Although there are many HTTP proxies available, we strongly advise that you use Nginx. If you choose another proxy
server you need to make sure that it buffers slow clients when you use default Gunicorn workers. Without this buffering
Gunicorn will be easily susceptible to denial-of-service attacks. You can use Hey to check if your proxy is behaving properly.
[**Gunicorn Docs**](http://docs.gunicorn.org/en/latest/deploy.html).
## Maintanence using Supervisor
Please remember to call all `supervisorctl` commands from within your engine home directory (`/opt/aura/engine/`),
to pickup the correct `supervisord.conf`.
**Starting Services**
```shell
supervisorctl start <service-name>
```
**Stopping Services**
```shell
supervisorctl stop <service-name>
```
**Restarting Services**
```shell
supervisorctl restart <service-name>
```
**Refresh after changing configurations**
```shell
supervisorctl restart <service-name>
```
**Start the API service with Supervisor**
```shell
sudo supervisorctl update
sudo supervisorctl restart engine-api
```
In case you want to reload whole supervisor service
```shell
sudo service supervisor restart
```
## Logging
All Engine logs for production can be found under:
```shell
`/var/log/aura`
```
This includes individual logs from Engine Core, Liquidsoap and the API.
But also `stdout` outputs from supervisor services are written there.
Additionally you'll finde Supervisor specific logs under:
```
`/var/log/supervisor`
```
## Read more
- [Overview](/README.md)
- [Installation for Development](installation-development.md)
- [Installation for Production](installation-production.md)
- [Running with Docker](running-docker.md)
- [Setup the Audio Store](docs/setup-audio-store.md)
- [Configuration Guide](configuration-guide.md)
- [Developer Guide](developer-guide.md)
- [Engine Features](engine-features.md)
- [Frequently Asked Questions (FAQ)](docs/frequently-asked-questions.md)
# Running Engine with Docker
Docker provides a simple way to get your engine with all dependencies running.
Here you can find the official AURA Engine Docker images:
https://hub.docker.com/repository/docker/autoradio/engine
> Note: The Engine Docker image is in *POC* state and waiting to be fully implemented.
It's not yet ready to be offically used. If you want to try AURA Engine meanwhile
try the [Standard Installation](docs/installation-development).
<!-- TOC -->
- [Running Engine with Docker](#running-engine-with-docker)
- [Start an image](#start-an-image)
- [Configure an image](#configure-an-image)
- [Making Docker Releases](#making-docker-releases)
- [Build your own, local Docker image](#build-your-own-local-docker-image)
- [Releasing a new version to DockerHub](#releasing-a-new-version-to-dockerhub)
- [Read more](#read-more)
<!-- /TOC -->
## Start an image
*To be extended ...*
## Configure an image
*To be extended ...*
## Making Docker Releases
This section is only relevant if you are an Engine Developer.
### Build your own, local Docker image
```shell
./run.sh docker:build
```
### Releasing a new version to DockerHub
```shell
./run.sh docker:push
```
## Read more
- [Overview](/README.md)
- [Installation for Development](installation-development.md)
- [Installation for Production](installation-production.md)
- [Running with Docker](running-docker.md)
- [Setup the Audio Store](docs/setup-audio-store.md)
- [Configuration Guide](configuration-guide.md)
- [Developer Guide](developer-guide.md)
- [Engine Features](engine-features.md)
- [Frequently Asked Questions (FAQ)](docs/frequently-asked-questions.md)
# Setting up the Audio Store
The *Audio Store* is a folder which is utilized by AURA Tank and Engine to exchange audio files.
Assuming AURA Engine and Tank are hosted on different machines, the `audiofolder` must by shared
using some network share.
In case you are hosting Engine and Tank on the same machine (e.g. in development), you can skip
this documentation. Just think about pointing them to the same directory.
<!-- TOC -->
- [Setting up the Audio Store](#setting-up-the-audio-store)
- [Share Location](#share-location)
- [Share Type](#share-type)
- [Setting up SSHFS](#setting-up-sshfs)
- [Configuring Engine](#configuring-engine)
- [Configuring Tank](#configuring-tank)
- [Read more](#read-more)
<!-- /TOC -->
By default Engine expects audio files shared by Tank in `/var/audio`.
This can be configurated in `engine.ini`:
```ini
[audiofolder]
audiofolder="/var/audio"
```
Now, this folder must be somehow writable by Tank.
## Share Location
You have following options where your share can be located:
1. **Engine and all other AURA components (Tank, Dashboard, Steering) are running on the same instance.** This is the most simple solution,
as Engine and Tank can share the same directory locally. But this scenario requires some more sophisticated tuning of the system resources
to avoid e.g. some overload of multiple Uploads in Tank may affect the performance of engine. You can eliminate this risk by setting CPU and
memory limits for Steering, Dashboard and Tank using Docker or `systemd-cgroups`. A disadvantage here is the case of maintainence of system
reboot. This would mean that all components are offline at once.
2. **Physical directory where the Engine lives, mounted to Tank**. This may cause an issue with the mount, when no network connection to Engine
is unavailable or the instance is rebooting.
3. **Physical directory where the Tank lives, mounted to Engine.** This may cause an issue with the mount, when no network connection to Tank
is unavailable or the instance is rebooting.
4. **Central Data Store or *Storage Box*** which is mountet to Engine and Tank. In this case a downtime of the store make both, Engine and Tank
dysfunctional.
5. **Replicated storage solution using [Gluster](https://www.gluster.org/), both Engine and Tank have their virtual audio directory mounted.**
That's the ideal approach, because if any of the instances is down, the other has all the data available.
In any case, you should think about some backup solution involving this directory.
## Share Type
Then, there's the question how the share is managed. Beside other you have the options
to use [NFS](https://en.wikipedia.org/wiki/Network_File_System), [SSHFS](https://en.wikipedia.org/wiki/SSHFS) or even something like [Gluster](https://www.gluster.org/).
For our initial setup we have chosen to use *SSHFS*.
Please share your experience with other share types, and we will include it in further releases of
this documentation.
## Setting up SSHFS
SSHFS allows you to access the filesystem on a remote computer via SSH. Interaction with files and folders behaves similar to any local data.
This example is setting up the `audiofolder` on the Engine instance.
### Configuring Engine
First, you'll need to create an user which enables Tank to access the `audiofolder` on Engine:
```shell
adduser tankuser
chown tankuser:engineuser /var/audio
```
Ensure that `engineuser` has no permissions to write the directory:
```shell
chmod u=+rwx,go=+rx-w /var/audio
```
### Configuring Tank
On the Tank side you need to install `sshfs`:
```shell
sudo apt-get install sshfs
```
Then create an `audio-store` folder inside the AURA home:
```shell
:/opt/aura/$ mkdir audio-store
```
Try if you can connect to the engine over SSH using your `tankuser`:
```shell
ssh tankuser@192.168.0.111 -p22
```
Replace `-p22` with the actual port number your SSH service is running with. For security reasons it's recommended
to run SSH not over the default port 22.
Uncomment following setting in `/etc/fuse.conf` to allow the tank-user access the share with write permissions:
```conf
# Allow non-root users to specify the allow_other or allow_root mount options.
user_allow_other
```
Now create the mount:
```shell
sudo sshfs -o allow_other -o IdentityFile=~/.ssh/id_rsa tankuser@192.168.0.111:/var/audio /opt/aura/audio-store -p22
```
Replace `192.168.0.111` with the actual IP for your Engine and `-p22` with the actual port number your Engine's SSH service
is running with.
To make this mount persistent i.e. keep it alive even after a system reboot, you'll need to add a configuration
in the `/etc/fstab` file by adding this at the end:
```yaml
# Audio Store @ AURA Engine
sshfs#tankuser@192.168.0.111:/var/audio /opt/aura/audio-store fuse auto,port=22,identityfile=~/.ssh/id_rsa,allow_other,_netdev 0 0
```
Again, check for the correct port number in the line above.
To take this into effect you'll need to remount the filesystem with `sudo mount -a` or reboot the machine. When mounting
you'll need to authenticate with the `tankuser` password once.
Then review if your Tank's Docker configuration mounts the exact same volume (`/opt/aura/audio-store`).
If not edit the tank configuration `/etc/aura/tank.env` and set following property:
```shell
TANK_STORE_PATH=/opt/aura/audio-store
```
Finally, do some testing if the directory is writable from Tank's system (`touch some-file`) and if the Engine's side can read
this file. Then restart your Tank Docker container and you should be good to go.
## Read more
- [Overview](/README.md)
- [Installation for Development](installation-development.md)
- [Installation for Production](installation-production.md)
- [Running with Docker](running-docker.md)
- [Setup the Audio Store](docs/setup-audio-store.md)
- [Configuration Guide](configuration-guide.md)
- [Developer Guide](developer-guide.md)
- [Engine Features](engine-features.md)
- [Frequently Asked Questions (FAQ)](docs/frequently-asked-questions.md)
#!/usr/bin/env python3.7
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os, os.path
import subprocess
import json
from datetime import datetime, date, timedelta
from flask import Flask, Response
from flask_caching import Cache
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from marshmallow import Schema, fields, post_dump
from flask_restful import Api, Resource, abort
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from apispec_webframeworks.flask import FlaskPlugin
from werkzeug.exceptions import HTTPException, default_exceptions, Aborter
from modules.base.logger import AuraLogger
from modules.base.config import AuraConfig
from modules.base.models import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData
#
# Initialize the Aura Web App and API.
#
config = AuraConfig()
app = Flask(__name__,
static_url_path='',
static_folder='web/')
# static_folder='contrib/aura-player/public/')
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = config.get_database_uri()
app.config["CACHE_TYPE"] = "simple"
app.config["CACHE_DEFAULT_TIMEOUT"] = 0
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
cache = Cache(app)
cors = CORS(app, resources={r"/*": {"origins": "*"}}) # FIXME Update CORS for production use
db = SQLAlchemy(app)
ma = Marshmallow(app)
api = Api(app)
#
# Werkzeug HTTP code mappings
#
class NoDataAvailable(HTTPException):
code = 204
description = "There is currently no content available."
default_exceptions[204] = NoDataAvailable
abort = Aborter()
class EngineApi:
"""
Provides the Aura Engine API services.
"""
config = None
api = None
logger = None
# trackservice_schema = None
def __init__(self, config, api):
"""
Initializes the API.
Args:
config (AuraConfig): The Engine configuration.
api (Api): The Flask restful API object.
"""
self.config = config
self.logger = AuraLogger(self.config, "engine-api")
self.logger = logging.getLogger("engine-api")
self.api = api
# Generate HTML files
self.generate_html("web/templates/clock.html", "web/clock.html")
self.generate_html("web/templates/trackservice.html", "web/trackservice.html")
# API Spec
spec.components.schema("TrackService", schema=TrackServiceSchema)
spec.components.schema("Report", schema=ReportSchema)
spec.components.schema("Schedule", schema=ScheduleSchema)
spec.components.schema("Clock", schema=ClockDataSchema)
spec.components.schema("Status", schema=StatusSchema)
# TODO Generates HTML for specification
#self.logger.info(spec.to_yaml())
# Schema instances
EngineApi.trackservice_schema = TrackServiceSchema(many=True)
EngineApi.track_schema = TrackServiceSchema()
EngineApi.report_schema = ReportSchema(many=True)
EngineApi.schedule_schema = ScheduleSchema(many=True)
EngineApi.clockdata_schema = ClockDataSchema()
EngineApi.status_schema = StatusSchema()
# Define API routes
self.api.add_resource(TrackServiceResource, config.api_prefix + "/trackservice/")
self.api.add_resource(TrackResource, config.api_prefix + "/trackservice/<int:track_id>")
self.api.add_resource(CurrentTrackResource, config.api_prefix + "/trackservice/current")
self.api.add_resource(TracksByDayResource, config.api_prefix + "/trackservice/date/<string:date_string>")
self.api.add_resource(ReportResource, config.api_prefix + "/report/<string:year_month>")
self.api.add_resource(UpcomingSchedulesResource, config.api_prefix + "/schedule/upcoming")
self.api.add_resource(ClockDataResource, config.api_prefix + "/clock")
self.api.add_resource(StatusResource, "/status")
self.logger.info("Engine API routes successfully set!")
# Static resources
@app.route('/app/trackservice', methods=['GET'])
def trackservice():
content = open(os.path.join("web/", "trackservice.html"))
return Response(content, mimetype="text/html")
# Static resources
@app.route('/app/clock', methods=['GET'])
def clock():
content = open(os.path.join("web/", "clock.html"))
return Response(content, mimetype="text/html")
def generate_html(self, src_file, target_file):
"""
Generates HTML based on the configuration options and templates.
Args:
src_file (String): The template file
target_file (String): The HTML file to be generated
"""
src_file = open(src_file, "r")
target_file = open(target_file, "w")
content = src_file.read()
config_options = {
"CONFIG-STATION-NAME": config.get("station_name"),
"CONFIG-STATION-LOGO-URL": config.get("station_logo_url"),
"CONFIG-STATION-LOGO-SIZE": config.get("station_logo_size"),
"CONFIG-API-URL": config.get("exposed_api_url")
}
for key, value in config_options.items():
content = content.replace(":::"+key+":::", value)
target_file.write(content)
src_file.close()
target_file.close()
def run(self):
"""
Starts the API server.
"""
# Set debug=False if you want to use your native IDE debugger
self.api.app.run(port=self.config.api_port, debug=False)
#
# API SPEC
#
spec = APISpec(
title="Swagger API Specification for Aura Engine",
version="1.0.0",
openapi_version="3.0.2",
plugins=[FlaskPlugin(), MarshmallowPlugin()],
)
#
# API SCHEMA
#
class TrackServiceSchema(ma.Schema):
class Meta:
fields = (
"id",
"schedule.schedule_id",
"schedule.schedule_start",
"schedule.schedule_end",
"schedule.languages",
"schedule.type",
"schedule.category",
"schedule.topic",
"schedule.musicfocus",
"schedule.is_repetition",
"track",
"track_start",
"show"
)
class ClockDataSchema(ma.Schema):
class Meta:
fields = (
"current",
"next",
"track_id",
"track_start",
"track"
)
class ScheduleSchema(ma.Schema):
class Meta:
fields = (
"id",
"schedule_id",
"schedule_start",
"schedule",
"show_id",
"show_name",
"show_hosts",
"show_type"
)
class ReportSchema(ma.Schema):
class Meta:
fields = (
"id",
"schedule.schedule_id",
"schedule.schedule_start",
"schedule.schedule_end",
"schedule.languages",
"schedule.type",
"schedule.category",
"schedule.topic",
"schedule.musicfocus",
"schedule.is_repetition",
"schedule.show_id",
"schedule.show_name",
"schedule.show_hosts",
"schedule.show_type",
"schedule.show_funding_category",
"track",
"track_start",
"playlist_id",
"fallback_type",
"schedule_fallback_id",
"show_fallback_id",
"station_fallback_id"
)
class StatusSchema(ma.Schema):
class Meta:
fields = (
"engine",
"soundsystem",
"api",
"redis_ready",
"audio_store"
)
#
# API RESOURCES
#
class TrackServiceResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self):
today = date.today()
today = datetime(today.year, today.month, today.day)
tracks = TrackService.select_by_day(today)
return EngineApi.trackservice_schema.dump(tracks)
class TrackResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self, track_id):
track = TrackService.select_one(track_id)
return EngineApi.track_schema.dump(track)
class ClockDataResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self):
item = TrackService.select_current()
next_schedule = Schedule.select_upcoming(1)
if next_schedule:
next_schedule = next_schedule[0].as_dict()
next_schedule["playlist"] = None
else:
next_schedule = {}
clockdata = {
"track_id": item.id,
"track_start": item.track_start,
"track": item.track,
"current": {},
"next": next_schedule
}
if item.schedule:
clockdata["current"] = item.schedule.as_dict()
if item.schedule.playlist:
clockdata["current"]["playlist"] = item.schedule.playlist[0].as_dict()
clockdata["current"]["show"] = item.show
return EngineApi.clockdata_schema.dump(clockdata)
class CurrentTrackResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self):
track = TrackService.select_current()
if not track:
return abort(204) # No content available
return EngineApi.track_schema.dump(track)
class TracksByDayResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self, date_string):
date = datetime.strptime(date_string, "%Y-%m-%d")
self.logger.debug("Query track-service by day: %s" % str(date))
tracks = TrackService.select_by_day(date)
if not tracks:
return abort(204) # No content available
return EngineApi.trackservice_schema.dump(tracks)
class UpcomingSchedulesResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self):
now = datetime.now()
self.logger.debug("Query upcoming schedules after %s" % str(now))
schedules = Schedule.select_upcoming(3)
if not schedules:
return abort(204) # No content available
return EngineApi.schedule_schema.dump(schedules)
class ReportResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self, year_month):
year = int(year_month.split("-")[0])
month = int(year_month.split("-")[1])
first_day = datetime(year, month, 1)
next_month = first_day.replace(day=28) + timedelta(days=4)
next_month - timedelta(days=next_month.day)
self.logger.debug("Query report for month: %s - %s" % (str(first_day), str(next_month)))
report = TrackService.select_by_range(first_day, next_month)
if not report:
return abort(204) # No content available
return EngineApi.report_schema.dump(report)
class StatusResource(Resource):
logger = None
def __init__(self):
self.logger = logging.getLogger("engine-api")
def get(self):
status = subprocess.check_output(["python3", "guru.py", "-s", "-q"])
status = status.decode("utf-8").replace("'", '"')
status = json.loads(status, strict=False)
if not status:
return abort(204) # No content available
return EngineApi.status_schema.dump(status)
#
# Initialization calls
#
engine_api = EngineApi(config, api)
if __name__ == "__main__":
engine_api.run()
#!/usr/bin/env python3.7
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import signal
import logging
import subprocess
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from modules.base.logger import AuraLogger
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil
config = AuraConfig()
def configure_flask():
app.config["SQLALCHEMY_DATABASE_URI"] = config.get_database_uri()
app.config['BABEL_DEFAULT_LOCALE'] = 'de'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# FIXME Instatiate SQLAlchemy without the need for Flask
app = Flask(__name__)
configure_flask()
DB = SQLAlchemy(app)
class AuraEngine:
"""
AuraEngine does the following:
1. Initialize the soundsystem and scheduler
2. Initialize Redis
3. Start Liquidsoap in a separate thread which connects to the engine
"""
logger = None
config = None
server = None
messenger = None
controller = None
soundsystem = None
scheduler = None
lqs = None
lqs_startup = None
def __init__(self):
"""
Initializes Engine Core.
"""
self.config = config
def startup(self, lqs_startup):
"""
Starts Engine Core.
"""
AuraLogger(self.config)
self.logger = logging.getLogger("AuraEngine")
from modules.scheduling.scheduler import AuraScheduler
from modules.core.engine import SoundSystem
from modules.cli.redis.adapter import ServerRedisAdapter
# If Liquidsoap should be started automatically
self.lqs_startup = lqs_startup
# Check if the database has to be re-created
if self.config.get("recreate_db") is not None:
AuraScheduler(self.config, None, None)
# Create scheduler and Liquidsoap communicator
self.soundsystem = SoundSystem(self.config)
self.scheduler = AuraScheduler(self.config, self.soundsystem, self.on_initialized)
# Create the Redis adapter
self.messenger = ServerRedisAdapter(self.config)
self.messenger.scheduler = self.scheduler
self.messenger.soundsystem = self.soundsystem
# And finally wait for redis message / start listener thread
self.messenger.start()
def on_initialized(self):
"""
Called when the engine is initialized, before the Liquidsoap connection is established."
"""
self.logger.info(SimpleUtil.green("Engine Core initialized - Waiting for Liquidsoap connection ..."))
if self.lqs_startup:
self.start_lqs(False, False)
else:
self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually."))
def start_lqs(self, debug_output, verbose_output):
"""
Starts Liquidsoap.
"""
lqs_path = self.config.get("liquidsoap_path")
lqs_cwd = os.getcwd() + "/" + self.config.get("liquidsoap_working_dir")
lqs_output = ""
lqs_output = self.get_debug_flags(debug_output, verbose_output)
self.lqs = subprocess.Popen([lqs_path, lqs_output, "engine.liq"], \
cwd=lqs_cwd, \
stdout=subprocess.PIPE, \
shell=False)
def get_lqs_cmd(self, debug_output, verbose_output):
"""
Returns a shell command string to start Liquidsoap
"""
lqs_path = self.config.get("liquidsoap_path")
lqs_cwd = os.getcwd() + "/" + self.config.get("liquidsoap_working_dir")
lqs_output = self.get_debug_flags(debug_output, verbose_output)
return "(cd %s && %s %s ./engine.liq)" % (lqs_cwd, lqs_path, lqs_output)
def get_debug_flags(self, debug_output, verbose_output):
"""
Build Liquidssoap debug parameters.
"""
output = ""
if debug_output:
output += "--debug "
if verbose_output:
output += "--verbose "
return output
def exit_gracefully(self, signum, frame):
"""
Shutdown of the engine. Also terminates the Liquidsoap thread.
"""
if self.lqs:
self.lqs.terminate()
self.logger.info("Terminated Liquidsoap")
if self.messenger:
self.messenger.terminate()
self.logger.info("Gracefully terminated Aura Engine!" + str(self.lqs))
sys.exit(0)
#
# START THE ENGINE
#
if __name__ == "__main__":
engine = AuraEngine()
start_lqs = True
lqs_cmd = False
signal.signal(signal.SIGINT, engine.exit_gracefully)
signal.signal(signal.SIGTERM, engine.exit_gracefully)
if len(sys.argv) >= 2:
if "--without-lqs" in sys.argv:
start_lqs = False
if "--get-lqs-command" in sys.argv:
lqs_cmd = True
if "--use-test-data" in sys.argv:
engine.config.set("use_test_data", True)
if "--recreate-database" in sys.argv:
engine.config.set("recreate_db", True)
if lqs_cmd:
print(engine.get_lqs_cmd(True, True))
else:
engine.startup(start_lqs)
#!/usr/bin/env python3.7
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import sys
import redis
from argparse import ArgumentParser
from modules.cli.padavan import Padavan
from modules.base.exceptions import PlaylistException
from modules.base.config import AuraConfig
class Guru():
"""
Command Line Interface (CLI) for Aura Engine.
"""
# config_path = "%s/configuration/engine.ini" % Path(__file__).parent.absolute()
config = AuraConfig()
parser = None
args = None
# ------------------------------------------------------------------------------------------ #
def __init__(self):
self.init_argument_parser()
self.handle_arguments()
def handle_arguments(self):
if self.args.stoptime:
start = time.time()
if not self.args.quiet:
print("Guru thinking...")
try:
p = Padavan(self.args, self.config)
p.meditate()
except PlaylistException as pe:
# typically there is no next file found
if not self.args.quiet:
print(pe)
else:
print("")
exit(4)
except redis.exceptions.TimeoutError:
print("Timeout when waiting for redis message. Is AURA daemon running? Exiting...")
exit(3)
if not self.args.quiet:
print("...result: ")
if p.stringreply != "":
#print(p.stringreply)
if p.stringreply[len(p.stringreply)-1] == "\n":
print(p.stringreply[0:len(p.stringreply) - 1])
else:
print(p.stringreply[0:len(p.stringreply)])
if self.args.stoptime:
end = time.time()
exectime = end-start
print("execution time: "+str(exectime)+"s")
def init_argument_parser(self):
try:
self.create_parser()
self.args = self.parser.parse_args()
except (ValueError, TypeError) as e:
if self.parser is not None:
self.parser.print_help()
print()
print(e)
exit(1)
def create_parser(self):
self.parser = ArgumentParser()
# options
self.parser.add_argument("-sep", "--stop-execution-time", action="store_true", dest="stoptime", default=False, help="Prints the execution time at the end of the skript")
self.parser.add_argument("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Just the result will outputed to stout")
self.parser.add_argument("-rd", "--recreate-database", action="store_true", dest="recreatedb", default=False, help="Do you want to recreate the database?")
# getter
self.parser.add_argument("-pcs", "--print-connection-status", action="store_true", dest="get_connection_status", default=False, help="Prints the status of the connection to liquidsoap, pv and tank")
self.parser.add_argument("-gam", "--get-active-mixer", action="store_true", dest="mixer_channels_selected",default=False, help="Which mixer channels are selected?")
self.parser.add_argument("-pms", "--print-mixer-status", action="store_true", dest="mixer_status", default=False, help="Prints all mixer sources and their states")
self.parser.add_argument("-pap", "--print-act-programme", action="store_true", dest="get_act_programme", default=False, help="Prints the actual Programme, the controller holds")
self.parser.add_argument("-s", "--status", action="store_true", dest="get_status", default=False, help="Returns the Engine Status as JSON")
# liquid manipulation
self.parser.add_argument("-am", "--select-mixer", action="store", dest="select_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-dm", "--de-select-mixer", action="store", dest="deselect_mixer", default=-1, metavar="MIXERNAME", help="Which mixer should be activated?")
self.parser.add_argument("-vm", "--volume", action="store", dest="set_volume", default=0, metavar=("MIXERNUM", "VOLUME"), nargs=2, help="Set volume of a mixer source", type=int)
# shutdown server
self.parser.add_argument("-sd", "--shutdown", action="store_true", dest="shutdown", default=False, help="Shutting down aura server")
# playlist in/output
self.parser.add_argument("-fnp", "--fetch-new-programmes", action="store_true", dest="fetch_new_programme", default=False, help="Fetch new programmes from api_steering_calendar in engine.ini")
self.parser.add_argument("-pmq", "--print-message-queue", action="store_true", dest="print_message_queue", default=False, help="Prints message queue")
# send a redis message
self.parser.add_argument("-rm", "--redis-message", action="store", dest="redis_message", default=False, metavar=("CHANNEL", "MESSAGE"), nargs=2, help="Send a redis message to the Listeners")
# calls from liquidsoap
self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?")
self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?")
self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing")
self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?")
self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing")
if len(sys.argv) == 1:
raise ValueError("No Argument passed!")
def valid_playlist_entry(argument):
from datetime import datetime
try:
index = int(argument[0])
fromtime = datetime.strptime(argument[1], "%Y-%m-%d")
source = argument[2]
return index, fromtime, source
except:
msg = "Not a valid date: '{0}'.".format(argument[0])
print(msg)
raise
# # ## ## ## ## ## # #
# # ENTRY FUNCTION # #
# # ## ## ## ## ## # #
def main():
Guru()
# # ## ## ## ## ## ## # #
# # End ENTRY FUNCTION # #
# # ## ## ## ## ## ## # #
if __name__ == "__main__":
main()
#!/bin/bash
mode="dev"
if [[ $* =~ ^(prod)$ ]]; then
mode="prod"
fi
if [ $mode == "dev" ]; then
echo "[Installing AURA ENGINE for Development]"
fi
if [ $mode == "prod" ]; then
echo "[Installing AURA ENGINE for Production]"
fi
# Development and Production
echo "Installing OPAM Packages ..."
bash script/install-opam-packages.sh
echo "Installing Python Requirements ..."
python3.7 $(which pip3) install -r requirements.txt
# Development
if [ $mode == "dev" ]; then
echo "Create local 'logs' Folder ..."
mkdir -p logs
echo "Copy configuration to './configuration/engine.ini'"
cp -n configuration/sample-development.engine.ini configuration/engine.ini
echo "Installing Web Application Packages ..."
bash script/install-web.sh
fi
# Production
if [ $mode == "prod" ]; then
echo "Create local 'tmp' Folder ..."
mkdir -p tmp
echo "Copy default Engine configuration to '/etc/aura/engine.ini'"
cp -n configuration/sample-production.engine.ini /etc/aura/engine.ini
echo "Copy default Gunicorn configuration to '/etc/aura/engine.ini'"
cp -n configuration/sample-production.gunicorn.conf.py configuration/gunicorn.conf.py
echo "Create Virtual Env for Gunicorn"
virtualenv -p /usr/bin/python3.7 ../python-env
source ../python-env/bin/activate
echo "Install Requirements to Virtual Env"
pip3 install -r requirements.txt
fi
echo
echo "+++ Installation of AURA Engine finished! +++"
echo
\ No newline at end of file
# Meta
__author__ = "David Trattnig and Gottfried Gaisbauer"
__copyright__ = "Copyright 2017-2020, Aura Engine Team"
__credits__ = ["David Trattnig", "Gottfried Gaisbauer", "Michael Liebler"]
__license__ = "GNU Affero General Public License (AGPL) Version 3"
__version__ = "0.8.2"
__version_info__ = (0, 8, 2)
__maintainer__ = "David Trattnig"
__email__ = "david.trattnig@subsquare.at"
__status__ = "Development"
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import sys
import logging
from pathlib import Path
from configparser import ConfigParser
class AuraConfig:
"""
AuraConfig Class
Holds the Aura Configuration as in the file `engine.ini`.
"""
ini_path = ""
logger = None
def __init__(self, ini_path="/etc/aura/engine.ini"):
"""
Initializes the configuration, defaults to `/etc/aura/engine.ini`.
If this file doesn't exist it uses `./configuration/engine.ini` from
the project directory.
Args:
ini_path(String): The path to the configuration file `engine.ini`
"""
config_file = Path(ini_path)
if not config_file.is_file():
ini_path = "%s/configuration/engine.ini" % Path(__file__).parent.parent.parent.absolute()
self.ini_path = ini_path
self.logger = logging.getLogger("AuraEngine")
self.load_config()
def set(self, key, value):
"""
Setter for some specific config property.
Args:
key (String): key
default (*): value
"""
try:
self.__dict__[key] = int(value)
except:
self.__dict__[key] = str(value)
def get(self, key, default=None):
"""
Getter for some specific config property.
Args:
key (String): key
default (*): value
"""
if key not in self.__dict__:
if default:
self.set(key, default)
else:
self.logger.warning("Key " + key + " not found in configfile " + self.ini_path + "!")
return None
if key == "loglevel":
loglvl = self.__dict__[key]
if loglvl == "debug":
return logging.DEBUG
elif loglvl == "info":
return logging.INFO
elif loglvl == "warning":
return logging.WARNING
elif loglvl == "error":
return logging.ERROR
else:
return logging.CRITICAL
if key == "debug":
return self.__dict__[key].count("y")
return self.__dict__[key]
def load_config(self):
"""
Set config defaults and load settings from file
"""
if not os.path.isfile(self.ini_path):
self.logger.critical(self.ini_path + " not found :(")
sys.exit(1)
# Read the file
f = open(self.ini_path, 'r')
ini_str = f.read()
f.close()
# Parse the values
config_parser = ConfigParser()
try:
config_parser.read_string(ini_str)
except Exception as e:
self.logger.critical("Cannot read " + self.ini_path + "! Reason: " + str(e))
sys.exit(0)
for section in config_parser.sections():
for key, value in config_parser.items(section):
v = config_parser.get(section, key).replace('"', '').strip()
self.set(key, v)
# Custom overrides and defaults
self.set("install_dir", os.path.realpath(__file__ + "../../../.."))
self.set("use_test_data", False)
self.set("api_prefix", "/api/v1")
def get_database_uri(self):
"""
Retrieves the database connection string.
"""
db_name = self.get("db_name")
db_user = self.get("db_user")
db_pass = self.get("db_pass")
db_host = self.get("db_host")
db_charset = self.get("db_charset", "utf8")
return "mysql://" + db_user + ":" + db_pass + "@" + db_host + "/" + db_name + "?charset=" + db_charset
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Scheduler Exceptions
class NoProgrammeLoadedException(Exception):
pass
class NoActiveScheduleException(Exception):
pass
# Soundsystem and Mixer Exceptions
class LoadSourceException(Exception):
pass
class InvalidChannelException(Exception):
pass
class PlaylistException(Exception):
pass
class NoActiveEntryException(Exception):
pass
# Liquidsoap Execeptions
class LQConnectionError(Exception):
pass
class LQStreamException(Exception):
pass
class RedisConnectionException(Exception):
pass
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.base.config import AuraConfig
class AuraLogger():
"""
AuraLogger Class
Logger for all Aura Engine components. The default
logger is `AuraEngine`. Other loggers are defined
by passing a custom name on instantiation.
The logger respects the log-level as defined in the
engine's configuration file.
"""
config = None
logger = None
def __init__(self, config, name="AuraEngine"):
"""
Constructor to create a new logger defined by
the passed name.
Args:
name (String): The name of the logger
"""
self.config = config
self.__create_logger(name)
def __create_logger(self, name):
"""
Creates the logger instance for the given name.
Args:
name (String): The name of the logger
"""
lvl = self.config.get("loglevel")
# create logger
self.logger = logging.getLogger(name)
self.logger.setLevel(lvl)
if not self.logger.hasHandlers():
# create file handler for logger
file_handler = logging.FileHandler(self.config.get("logdir") + "/"+name+".log")
file_handler.setLevel(lvl)
# create stream handler for logger
stream_handler = logging.StreamHandler()
stream_handler.setLevel(lvl)
# set format of log
datepart = "%(asctime)s:%(name)s:%(levelname)s"
message = " - %(message)s - "
filepart = "[%(filename)s:%(lineno)s-%(funcName)s()]"
formatter = logging.Formatter(datepart + message + filepart)
# set log of handlers
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# add handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(stream_handler)
self.logger.critical("ADDED HANDLERS")
else:
self.logger.critical("REUSED LOGGER")
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import smtplib
from email.message import EmailMessage
class MailingException(Exception):
"""
Thrown when some mail cannot be sent.
"""
class AuraMailer():
"""
Service to send emails to Aura administrators.
"""
config = None
def __init__(self, config):
"""
Constructor to initialize service with Aura `config`.
Args:
config (AuraConfig): The configuration with the mail server details
"""
self.config = config
self.admin_mails = config.get("admin_mail")
#
# PUBLIC METHODS
#
def send_admin_mail(self, subject, body):
"""
Sends an email to the administrator as defined in the configuration.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
admin_mails = self.admin_mails.split()
for mail_to in admin_mails:
self.__send(mail_to, subject, body)
#
# PRIVATE METHODS
#
def __send(self, mail_to, subject, body):
"""
Sends an email to the given address.
Args:
subject (String): The email's subject
body (String): The email's body text
"""
# read config
mail_server = self.config.get("mail_server")
mail_port = self.config.get("mail_server_port")
mail_user = self.config.get("mail_user")
mail_pass = self.config.get("mail_pass")
from_mail = self.config.get("from_mail")
# check settings
if mail_server == "":
raise MailingException("Mail Server not set")
if mail_port == "":
raise MailingException("Mailserver Port not set")
if mail_user == "":
raise MailingException("Mail user not set")
if mail_pass == "":
raise MailingException("No Password for mailing set")
if from_mail == "":
raise MailingException("From Mail not set")
# stuff the message together and ...
msg = EmailMessage()
msg.set_content(body)
mailsubject_prefix = self.config.get("mailsubject_prefix")
if mailsubject_prefix == "":
msg["Subject"] = subject
else:
msg["Subject"] = mailsubject_prefix + " " + subject
msg["From"] = from_mail
msg["To"] = mail_to
# ... send the mail
try:
server = smtplib.SMTP(mail_server, int(mail_port))
server.starttls()
server.login(mail_user, mail_pass)
server.send_message(msg)
server.quit()
except Exception as e:
raise MailingException(str(e))
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import logging
import datetime
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm
from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from modules.scheduling.types import PlaylistType
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil, EngineUtil
# Init Config
config = AuraConfig()
# Initialize DB Model and session
engine = sa.create_engine(config.get_database_uri())
Base = declarative_base()
Base.metadata.bind = engine
class DB():
session = orm.scoped_session(orm.sessionmaker())(bind=engine)
Model = Base
class AuraDatabaseModel():
"""
AuraDataBaseModel.
Holding all tables and relationships for the engine.
"""
logger = None
def __init__(self):
"""
Constructor.
"""
self.logger = logging.getLogger("AuraEngine")
def store(self, add=False, commit=False):
"""
Store to the database
"""
if add:
DB.session.add(self)
if commit:
DB.session.commit()
def delete(self, commit=False):
"""
Delete from the database
"""
DB.session.delete(self)
if commit:
DB.session.commit()
def _asdict(self):
return self.__dict__
@staticmethod
def recreate_db(systemexit = False):
"""
Re-creates the database for developments purposes.
"""
manualschedule = Schedule()
manualschedule.schedule_id = 0
manualschedule.show_name = "Manual Show"
Base.metadata.drop_all()
Base.metadata.create_all()
# self.logger.debug("inserting manual scheduling possibility and fallback trackservice schedule")
# DB.session.add(manualschedule)
# db.session.add(fallback_trackservice_schedule)
# self.logger.debug("all created. commiting...")
DB.session.commit()
if systemexit:
sys.exit(0)
#
# SCHEDULES & PLAYLISTS
#
class Schedule(DB.Model, AuraDatabaseModel):
"""
One specific Schedule for a show on a timeslot.
Holding references to playlists and fallback-playlists.
"""
__tablename__ = 'schedule'
# Primary keys
id = Column(Integer, primary_key=True, autoincrement=True)
schedule_start = Column(DateTime, unique=True, index=True)
schedule_end = Column(DateTime, unique=True, index=True)
schedule_id = Column(Integer, unique=True)
show_id = Column(Integer)
show_name = Column(String(256))
show_hosts = Column(String(256))
funding_category = Column(String(256))
comment = Column(String(512))
languages = Column(String(256))
type = Column(String(256))
category = Column(String(256))
topic = Column(String(256))
musicfocus = Column(String(256))
is_repetition = Column(Boolean())
playlist_id = Column(Integer) #, ForeignKey("playlist.playlist_id"))
schedule_fallback_id = Column(Integer)
show_fallback_id = Column(Integer)
station_fallback_id = Column(Integer)
fallback_state = PlaylistType.DEFAULT
fadeouttimer = None # Used to fade-out the schedule, even when entries are longer
playlist = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.playlist_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
schedule_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.schedule_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
show_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.show_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
station_fallback = relationship("Playlist",
primaryjoin="and_(Schedule.schedule_start==Playlist.schedule_start, Schedule.station_fallback_id==Playlist.playlist_id, Schedule.show_name==Playlist.show_name)",
back_populates="schedule")
@staticmethod
def select_show_on_datetime(date_time):
return DB.session.query(Schedule).filter(Schedule.schedule_start == date_time).first()
@staticmethod
def select_programme(date_from=datetime.date.today()):
"""
Select all schedules starting from `date_from` or from today if no
parameter is passed.
Args:
date_from (datetime): Select schedules from this date and time on
Returns:
([Schedule]): List of schedules
"""
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start >= date_from).\
order_by(Schedule.schedule_start).all()
return schedules
@staticmethod
def select_upcoming(n):
"""
Selects the (`n`) upcoming schedules.
"""
now = datetime.datetime.now()
DB.session.commit() # Required since independend session is used.
schedules = DB.session.query(Schedule).\
filter(Schedule.schedule_start > str(now)).\
order_by(Schedule.schedule_start.asc()).limit(n).all()
return schedules
@hybrid_property
def start_unix(self):
"""
Start time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the schedule in UNIX time.
"""
return time.mktime(self.schedule_end.timetuple())
def as_dict(self):
"""
Returns the schedule as a dictionary for serialization.
"""
playlist = self.playlist
return {
"schedule_id": self.schedule_id,
"schedule_start": self.schedule_start.isoformat(),
"schedule_end": self.schedule_end.isoformat(),
"topic": self.topic,
"musicfocus": self.musicfocus,
"funding_category": self.funding_category,
"is_repetition": self.is_repetition,
"category": self.category,
"languages": self.languages,
"comment": self.comment,
"playlist_id": self.playlist_id,
"schedule_fallback_id": self.schedule_fallback_id,
"show_fallback_id": self.show_fallback_id,
"station_fallback_id": self.station_fallback_id,
"show": {
"name": self.show_name,
"type": self.get_type(),
"host": self.show_hosts
},
"playlist": playlist
}
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [Show: %s, ShowID: %s | %s - %s ]" % (str(self.schedule_id), self.show_name, str(self.show_id), time_start, time_end)
class Playlist(DB.Model, AuraDatabaseModel):
"""
The playlist containing playlist entries.
"""
__tablename__ = 'playlist'
# pk,fk
artificial_id = Column(Integer, primary_key=True)
schedule_start = Column(DateTime, ForeignKey("schedule.schedule_start"))
# relationships
schedule = relationship("Schedule", uselist=False, back_populates="playlist")
entries = relationship("PlaylistEntry", back_populates="playlist")
# data
playlist_id = Column(Integer, autoincrement=False) # , ForeignKey("schedule.playlist_id"))
show_name = Column(String(256))
fallback_type = Column(Integer)
entry_count = Column(Integer)
@staticmethod
def select_all():
"""
Fetches all entries
"""
all_entries = DB.session.query(Playlist).filter(Playlist.fallback_type == 0).all()
cnt = 0
for entry in all_entries:
entry.programme_index = cnt
cnt = cnt + 1
return all_entries
@staticmethod
def select_playlist_for_schedule(datetime, playlist_id):
"""
Retrieves the playlist for the given schedule identified by `start_date` and `playlist_id`
Args:
start_date (datetime): Date and time when the playlist is scheduled
playlist_id (Integer): The ID of the playlist
Returns:
(Playlist): The playlist, if existing for schedule
Raises:
Exception: In case there a inconsistent database state, such es multiple playlists for given date/time.
"""
playlist = None
playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == datetime).all()
# FIXME There are unknown issues with the native SQL query by ID
# playlists = DB.session.query(Playlist).filter(Playlist.schedule_start == datetime and Playlist.playlist_id == playlist_id).all()
for p in playlists:
if p.playlist_id == playlist_id:
playlist = p
# if playlists and len(playlists) > 1:
# raise Exception("Inconsistent Database State: Multiple playlists for given schedule '%s' and playlist id#%d available!" % (str(datetime), playlist_id))
# if not playlists:
# return None
# return playlists[0]
return playlist
@staticmethod
def select_playlist(playlist_id):
"""
Retrieves all paylists for that given playlist ID.
Args:
playlist_id (Integer): The ID of the playlist
Returns:
(Array<Playlist>): An array holding the playlists
"""
return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.schedule_start).all()
@staticmethod
def is_empty():
"""
Checks if the given is empty
"""
try:
return not DB.session.query(Playlist).one_or_none()
except sa.orm.exc.MultipleResultsFound:
return False
@hybrid_property
def start_unix(self):
"""
Start time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple())
@hybrid_property
def end_unix(self):
"""
End time of the playlist in UNIX time.
"""
return time.mktime(self.schedule_start.timetuple()) + self.duration
@hybrid_property
def duration(self):
"""
Returns the total length of the playlist in seconds.
Returns:
(Integer): Length in seconds
"""
total = 0
for entry in self.entries:
total += entry.duration
return total
def as_dict(self):
"""
Returns the playlist as a dictionary for serialization.
"""
entries = []
for e in self.entries:
entries.append(e.as_dict())
playlist = {
"playlist_id": self.playlist_id,
"fallback_type": self.fallback_type,
"entry_count": self.entry_count,
"entries": entries
}
return playlist
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
return "ID#%s [items: %s | %s - %s]" % (str(self.playlist_id), str(self.entry_count), str(time_start), str(time_end))
class PlaylistEntry(DB.Model, AuraDatabaseModel):
"""
Playlist entries are the individual items of a playlist such as audio files.
"""
__tablename__ = 'playlist_entry'
# primary keys
artificial_id = Column(Integer, primary_key=True)
# foreign keys
artificial_playlist_id = Column(Integer, ForeignKey("playlist.artificial_id"))
entry_num = Column(Integer) # , primary_key=True)
uri = Column(String(1024))
duration = Column(BigInteger)
source = Column(String(1024))
entry_start = Column(DateTime)
entry_start_actual = None # Assigned when the entry is actually played
channel = None # Assigned when entry is actually played
queue_state = None # Assigned when entry is about to be queued
status = None # Assigned when state changes
switchtimer = None
loadtimer = None
fadeouttimer = None
# relationships
playlist = relationship("Playlist", uselist=False, back_populates="entries")
meta_data = relationship("PlaylistEntryMetaData", uselist=False, back_populates="entry")
@staticmethod
def select_playlistentry_for_playlist(artificial_playlist_id, entry_num):
return DB.session.query(PlaylistEntry).filter(PlaylistEntry.artificial_playlist_id == artificial_playlist_id, PlaylistEntry.entry_num == entry_num).first()
@hybrid_property
def entry_end(self):
return self.entry_start + datetime.timedelta(seconds=self.duration)
@hybrid_property
def start_unix(self):
return time.mktime(self.entry_start.timetuple())
@hybrid_property
def end_unix(self):
return time.mktime(self.entry_end.timetuple())
@hybrid_property
def volume(self):
return 100 # FIXME Make DB Column
def get_type(self):
return EngineUtil.get_channel_type(self.uri)
def get_prev_entries(self):
"""
Retrieves all previous entries as part of the current entry's playlist.
Returns:
(List): List of PlaylistEntry
"""
prev_entries = []
for entry in self.playlist.entries:
if entry.entry_start < self.entry_start:
prev_entries.append(entry)
return prev_entries
def get_next_entries(self, schedule_sensitive=True):
"""
Retrieves all following entries as part of the current entry's playlist.
Args:
schedule_sensitive (Boolean): If `True` entries which start after \
the end of the schedule are excluded
Returns:
(List): List of PlaylistEntry
"""
next_entries = []
for entry in self.playlist.entries:
if entry.entry_start > self.entry_start:
if schedule_sensitive:
if entry.entry_start < self.playlist.schedule.schedule_end:
next_entries.append(entry)
else:
next_entries.append(entry)
return next_entries
def as_dict(self):
"""
Returns the entry as a dictionary for serialization.
"""
if self.meta_data:
return {
"id": self.artificial_id,
"duration": self.duration,
"artist": self.meta_data.artist,
"album": self.meta_data.album,
"title": self.meta_data.title
}
return None
def __str__(self):
"""
String representation of the object.
"""
time_start = SimpleUtil.fmt_time(self.start_unix)
time_end = SimpleUtil.fmt_time(self.end_unix)
track = self.source[-25:]
return "PlaylistEntry #%s [%s - %s | %ssec | Source: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track)
class PlaylistEntryMetaData(DB.Model, AuraDatabaseModel):
"""
Metadata for a playlist entry such as the artist and track name.
"""
__tablename__ = "playlist_entry_metadata"
artificial_id = Column(Integer, primary_key=True)
artificial_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"))
artist = Column(String(256))
title = Column(String(256))
album = Column(String(256))
entry = relationship("PlaylistEntry", uselist=False, back_populates="meta_data")
@staticmethod
def select_metadata_for_entry(artificial_playlistentry_id):
return DB.session.query(PlaylistEntry).filter(PlaylistEntryMetaData.artificial_entry_id == artificial_playlistentry_id).first()
#
# TRACK SERVICE
#
# class TrackService(DB.Model, AuraDatabaseModel):
# """
# TrackService holding track-service items consisting of
# """
# __tablename__ = 'trackservice'
# # Primary keys
# id = Column(Integer, primary_key=True, autoincrement=True)
# # Foreign keys
# track_start = Column(DateTime)
# track_end = Column(DateTime) # Currently not used, maybe later for timing checks and multi-entry avoidance.
# artificial_schedule_id = Column(Integer, ForeignKey("schedule.id"))
# artificial_playlist_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"), nullable=True)
# single_entry_id = Column(Integer, ForeignKey("single_entry.id"), nullable=True)
# # Data
# schedule = relationship("Schedule", foreign_keys=[artificial_schedule_id], lazy="joined")
# playlist_entry = relationship("PlaylistEntry", primaryjoin="and_(TrackService.artificial_playlist_entry_id==PlaylistEntry.artificial_id)", lazy="joined")
# single_entry = relationship("SingleEntry", foreign_keys=[single_entry_id], lazy="joined")
# fallback_type = Column(Integer, default=0)
# def __init__(self, entry, fallback_type=0):
# """
# Initializes a trackservice entry based on a playlist entry.
# """
# self.track_start = datetime.datetime.now()
# # if entry.duration:
# # self.track_end = self.track_start + datetime.timedelta(seconds=entry.duration)
# self.fallback_type = fallback_type
# if fallback_type < 4:
# self.schedule_start = entry.playlist.schedule_start
# self.artificial_playlist_entry_id = entry.artificial_id
# self.playlist_entry = entry
# self.schedule = entry.playlist.schedule
# else:
# self.single_entry = entry
# @hybrid_property
# def track(self):
# """
# Retrieves the track information as a dictionary.
# Depending on possible fallback scenarios either `playlist_entry` or `single_entry` is used as a basis:
# - Scenario 1: No fallback, all info is gathered via the playlist entry
# - Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry
# - Scenario 3: This type of fallback didn't get scheduled; a single entry is played
# """
# if self.playlist_entry:
# return self.playlist_entry.as_dict()
# elif self.single_entry:
# return self.single_entry.as_dict()
# else:
# return None
# @hybrid_property
# def show(self):
# """
# Retrieves show information based on the related schedule. If no schedule
# is available (e.g. when the engine is in a fallback state), then the default
# show properties from `AuraConfig` are returned.
# """
# show_info = {}
# if self.schedule:
# show_info["name"] = self.schedule.show_name
# show_info["type"] = self.schedule.type
# show_info["host"] = self.schedule.show_hosts
# elif self.fallback_type == 4:
# show_info["name"] = config.get("fallback_show_name")
# show_info["type"] = config.get("fallback_show_type")
# show_info["host"] = config.get("fallback_show_host")
# return show_info
# @staticmethod
# def select_one(id):
# """
# Select one specific track-service item by ID.
# """
# DB.session.commit() # Required since independend session is used.
# track = DB.session.query(TrackService).filter(TrackService.id == id).first()
# return track
# @staticmethod
# def select_current():
# """
# Selects the currently playing track.
# """
# now = datetime.datetime.now()
# DB.session.commit() # Required since independend session is used.
# track = DB.session.query(TrackService).\
# filter(TrackService.track_start <= str(now)).\
# order_by(TrackService.track_start.desc()).first()
# return track
# @staticmethod
# def select_last_hours(n):
# """
# Selects the tracks playing in the past (`n`) hours.
# """
# last_hours = datetime.datetime.today() - datetime.timedelta(hours=n)
# DB.session.commit() # Required since independend session is used.
# tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(last_hours)).all()
# for track in tracks:
# track = TrackService.select_one(track.id)
# return tracks
# @staticmethod
# def select_by_day(day):
# """
# Select the track-service items for a day.
# """
# day_plus_one = day + datetime.timedelta(days=1)
# DB.session.commit() # Required since independend session is used.
# tracks = DB.session.query(TrackService).\
# filter(TrackService.track_start >= str(day), TrackService.track_start < str(day_plus_one)).\
# order_by(TrackService.track_start.desc()).all()
# res = []
# for item in tracks:
# if item.track: res.append(item)
# return res
# @staticmethod
# def select_by_range(from_day, to_day):
# """
# Selects the track-service items for a day range.
# """
# DB.session.commit()
# tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(from_day),
# TrackService.track_start < str(to_day)).all()
# return tracks
# def __str__(self):
# """
# Convert to String.
# """
# return "TrackID: #%s [track_start: %s, artificial_playlist_entry_id: %s]" % (str(self.id), str(self.track_start), str(self.artificial_playlist_entry_id))
# class SingleEntry(DB.Model, AuraDatabaseModel):
# """
# An entry played in case of e.g. a local fallback or custom programming without a playlist nor schedule.
# """
# __tablename__ = 'single_entry'
# # Primary keys
# id = Column(Integer, primary_key=True)
# # Relationships
# trackservice_id = Column(Integer) #, ForeignKey("trackservice.id"))
# meta_data_id = Column(Integer) #, ForeignKey("trackservice.id"))
# trackservice = relationship("TrackService", uselist=False, back_populates="single_entry")
# meta_data = relationship("SingleEntryMetaData", uselist=False, back_populates="entry")
# # Data
# uri = Column(String(1024))
# duration = Column(BigInteger)
# source = Column(String(1024))
# entry_start = Column(DateTime)
# queue_state = None # Assigned when entry is about to be queued
# channel = None # Assigned when entry is actually played
# status = None # Assigned when state changes
# @hybrid_property
# def entry_end(self):
# return self.entry_start + datetime.timedelta(seconds=self.duration)
# @hybrid_property
# def start_unix(self):
# return time.mktime(self.entry_start.timetuple())
# @hybrid_property
# def end_unix(self):
# return time.mktime(self.entry_start.timetuple()) + self.duration
# @hybrid_property
# def volume(self):
# return 100
# @hybrid_property
# def type(self):
# return EngineUtil.get_channel_type(self.uri)
# def as_dict(self):
# """
# Returns the entry as a dictionary for serialization.
# """
# if self.meta_data:
# return {
# "duration": self.duration,
# "artist": self.meta_data.artist,
# "album": self.meta_data.album,
# "title": self.meta_data.title
# }
# return None
# def __str__(self):
# """
# String representation of the object.
# """
# time_start = SimpleUtil.fmt_time(self.start_unix)
# time_end = SimpleUtil.fmt_time(self.end_unix)
# track = self.source[-25:]
# return "SingleEntry #%s [%s - %s | %ssec | Source: ...%s]" % (str(self.id), time_start, time_end, self.duration, track)
# class SingleEntryMetaData(DB.Model, AuraDatabaseModel):
# """
# Metadata for a autonomous entry such as the artist and track name.
# """
# __tablename__ = "single_entry_metadata"
# id = Column(Integer, primary_key=True)
# single_entry_id = Column(Integer, ForeignKey("single_entry.id"))
# artist = Column(String(256))
# title = Column(String(256))
# album = Column(String(256))
# entry = relationship("SingleEntry", uselist=False, back_populates="meta_data")
# @staticmethod
# def select_metadata_for_entry(single_entry_id):
# return DB.session.query(SingleEntry).filter(SingleEntryMetaData.id == single_entry_id).first()
#
# LEGACY CLASSES
#
# ------------------------------------------------------------------------------------------ #
# class Schedule(DB.Model, AuraDatabaseModel):
# """
# One specific Schedule for a show on a timeslot
# """
# __tablename__ = 'schedule'
#
# # primary and foreign keys
# schedule_start = Column(DateTime, primary_key=True)
#
# schedule_end = Column(DateTime)
# schedule_id = Column(Integer) #, primary_key=True, autoincrement=False)
# show_id = Column(Integer) # well, in fact not needed..
# show_name = Column(String(256))
# show_hosts = Column(String(256))
# funding_category = Column(String(256))
# comment = Column(String(512))
# languages = Column(String(256))
# type = Column(String(256))
# category = Column(String(256))
# topic = Column(String(256))
# musicfocus = Column(String(256))
#
# is_repetition = Column(Boolean())
#
# playlist_id = Column(Integer, ForeignKey("playlist.playlist_id"))
# timeslot_fallback_id = Column(Integer)
# show_fallback_id = Column(Integer)
# station_fallback_id = Column(Integer)
#
# playlist = relationship("Playlist", foreign_keys=[playlist_id], lazy="joined")
# # timeslot_fallback = relationship("Playlist", foreign_keys=[timeslot_fallback_id], lazy="joined")
# # show_fallback = relationship("Playlist", foreign_keys=[show_fallback_id], lazy="joined")
# # station_fallback = relationship("Playlist", foreign_keys=[station_fallback_id], lazy="joined")
#
# @staticmethod
# def select_all():
# # fetching all entries
# all_entries = DB.session.query(Schedule).filter().order_by(Schedule.schedule_start).all()
# return all_entries
#
# @staticmethod
# def select_by_id(id):
# entry = DB.session.query(Schedule).filter(Schedule.schedule_id == id).first()
# return entry
# @staticmethod
# def select_act_programme():
# #DB.session.query(Schedule).filter
# # fetching all from today to ..
# today = datetime.date.today()
# all_entries = DB.session.query(Schedule).filter(Schedule.schedule_start >= today).order_by(Schedule.schedule_start).all()
#
# return all_entries
#
#
# @staticmethod
# def drop_the_future(timedelta):
# then = datetime.datetime.now() + timedelta
#
# # is this really necessary?
# future_entries = DB.session.query(Schedule).filter(Schedule.schedule_start > then)
# for e in future_entries:
# e.delete()
# DB.session.commit()
#
# def get_length(self):
# sec1 = int(datetime.datetime.strptime(self.start[0:16].replace(" ", "T"), "%Y-%m-%dT%H:%M").strftime("%s"))
# sec2 = int(datetime.datetime.strptime(self.end[0:16].replace(" ", "T"), "%Y-%m-%dT%H:%M").strftime("%s"))
# len = sec2 - sec1
# return len
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "ScheduleID: #" + str(self.schedule_id) + " Showname: " + self.show_name + " starts @ " + str(self.schedule_start)
# ------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------ #
#class PlaylistEntry(DB.Model, AuraDatabaseModel):
# __tablename__ = 'playlist_entry'
#
# # primary and foreign keys
# playlist_id = Column(Integer, ForeignKey("playlist.playlist_id"), primary_key=True, nullable=False, autoincrement=True)
# entry_num = Column(Integer, primary_key=True, nullable=False, autoincrement=False)
#
# uri = Column(String(1024))
#
# source = ""
# cleansource = ""
# cleanprotocol = ""
# type = None
# fadeintimer = None
# fadeouttimer = None
# switchtimer = None
#
# meta_data = relationship("PlaylistEntryMetaData", primaryjoin="and_(PlaylistEntry.playlist_id==PlaylistEntryMetaData.playlist_id, PlaylistEntry.entry_num==PlaylistEntryMetaData.entry_num)", lazy="joined")
#
# # normal constructor
# def __init__(self, **kwargs):
# super(PlaylistEntry, self).__init__(**kwargs)
# self.calc_unix_times()
# self.define_clean_source()
#
# # constructor like - called from sqlalchemy
# @orm.reconstructor
# def reconstructor(self):
# self.calc_unix_times()
# self.define_clean_source()
# self.set_entry_type()
#
# def define_clean_source(self):
# if self.uri is None:
# return None
#
# if self.uri.startswith("http"):
# self.cleanprotocol = self.uri[:7]
# self.cleansource = self.uri
#
# elif self.uri.startswith("linein"):
# self.cleanprotocol = self.uri[:9]
# self.cleansource = self.uri[9:]
#
# elif self.uri.startswith("pool") or self.uri.startswith("file") or self.uri.startswith("live"):
# self.cleanprotocol = self.uri[:7]
# self.cleansource = self.uri[7:]
#
# elif self.uri.startswith("playlist"):
# self.cleanprotocol = self.uri[:11]
# self.cleansource = self.uri[11:]
#
# else:
# self.logger.error("Unknown source protocol")
#
# def set_entry_type(self):
# if self.uri.startswith("http"):
# self.type = ScheduleEntryType.HTTP
# if self.uri.startswith("pool") or self.uri.startswith("playlist") or self.uri.startswith("file"):
# self.type = ScheduleEntryType.FILESYSTEM
# if self.uri.startswith("live") or self.uri.startswith("linein"):
# if self.cleansource == "0":
# self.type = ScheduleEntryType.LIVE_0
# elif self.cleansource == "1":
# self.type = ScheduleEntryType.LIVE_1
# elif self.cleansource == "2":
# self.type = ScheduleEntryType.LIVE_2
# elif self.cleansource == "3":
# self.type = ScheduleEntryType.LIVE_3
# elif self.cleansource == "4":
# self.type = ScheduleEntryType.LIVE_4
# def calc_unix_times(self):
# if self.entry_start is not None:
# self.entry_start_unix = time.mktime(self.entry_start.timetuple())
#
#
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_all():
# # fetching all entries
# all_entries = DB.session.query(Playlist).filter(Playlist.fallback_type == 0).order_by(Playlist.entry_start).all()
#
# cnt = 0
# for entry in all_entries:
# entry.programme_index = cnt
# cnt = cnt + 1
#
# return all_entries
#
# @staticmethod
# def select_act_programme(include_act_playing = True):
# # fetching all from today to ..
# today = datetime.date.today()
# all_entries = DB.session.query(Playlist).filter(Playlist.entry_start >= today, Playlist.fallback_type == 0).order_by(Playlist.entry_start).all()
#
# cnt = 0
# for entry in all_entries:
# entry.programme_index = cnt
# cnt = cnt + 1
#
# return all_entries
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def truncate():
# all_entries = DB.session.query(Playlist).filter().order_by(Playlist.entry_start).all()
#
# for a in all_entries:
# a.delete()
# DB.session.commit()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_next_manual_entry_num():
#
# max_manual_entry_num = DB.session.query(func.max(Playlist.entry_num)).filter(Playlist.schedule_id == 0).first()
#
# if max_manual_entry_num[0] is None:
# return 0
# else:
# return int(max_manual_entry_num[0])+1
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_upcoming(datefrom=datetime.datetime.now()):
# upcomingtracks = DB.session.query(Playlist).filter(Playlist.entry_start > datefrom).order_by(Playlist.entry_start).all()
# return upcomingtracks
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one(playlist_id, entry_num):
# return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id, Playlist.entry_num == entry_num).first()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one_playlist_entry_for_show(schedule_id, playlist_type, entry_num):
# return DB.session.query(Playlist).filter(Playlist.schedule_id == schedule_id, Playlist.fallback_type == playlist_type, Playlist.entry_num == entry_num).first()
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_playlist(playlist_id):
# return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.entry_start).all()
#
# @staticmethod
# def drop_the_future(timedelta):
# then = datetime.datetime.now() + timedelta
# #DB.session.delete(ScheduleEntry).filter(ScheduleEntry.entry_start >= then)
#
# # is this really necessary?
# future_entries = DB.session.query(Playlist).filter(Playlist.entry_start > then)
# for e in future_entries:
# e.delete()
# DB.session.commit()
#
# def getChannel(self):
# if self.type == self.type.FILESYSTEM:
# return "fs"
#
# if self.type == self.type.LIVE_0 or self.type == self.type.LIVE_1 or self.type == self.type.LIVE_2 or self.type == self.type.LIVE_3 or self.type == self.type.LIVE_4:
# return "aura_linein_"+self.cleansource # .cleanprotocol[8]
#
# if self.type == self.type.HTTP:
# return "http"
#
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "Showentry starts @ " + str(self.entry_start) + " and plays " + self.source
# class ScheduleEntryFile(DB.Model, AuraDatabaseModel):
# __tablename__ = 'schedule_entry_file'
#
# # primary and foreign keys
# file_id = Column(Integer, primary_key=True, nullable=False, autoincrement=False)
# playlist_id = Column(Integer) #, ForeignKey("schedule_entry.playlist_id")) # primary_key=True, nullable=False, autoincrement=False)
# entry_num = Column(Integer) # , ForeignKey("schedule_entry.entry_num")) # primary_key=True, nullable=False, autoincrement=False)
#
# ForeignKeyConstraint(["playlist_id", "entry_num"], ["schedule_entry.playlist_id", "schedule_entry.entry_num"])
#
# show = Column(String(512))
# size = Column(Integer)
# duration = Column(Integer)
#
# class ScheduleEntryFileMetaData(DB.Model, AuraDatabaseModel):
# __tablename__ = "schedule_entry_file_metadata"
#
# metadata_id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
# file_id = Column(Integer, ForeignKey("schedule_entry_file.file_id"))
#
# artist = Column(String(256))
# title = Column(String(256))
# album = Column(String(256))
#
# # ------------------------------------------------------------------------------------------ #
# class TrackService(DB.Model, AuraDatabaseModel):
# __tablename__ = 'trackservice'
#
# trackservice_id = Column(Integer, primary_key=True, autoincrement=True)
# schedule_entry_id = Column(Integer, ForeignKey("schedule_entry.id"))
# playlist_id = Column(Integer, nullable=False)
# entry_num = Column(Integer, nullable=False)
#
# source = Column(String(255), nullable=False)
# start = Column(DateTime, nullable=False, default=func.now())
# __table_args__ = (
# ForeignKeyConstraint(['playlist_id', 'entry_num'], ['schedule_entry.playlist_id', 'schedule_entry.entry_num']),
# )
# schedule_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackService.playlist_id==ScheduleEntry.playlist_id, TrackService.entry_num==ScheduleEntry.entry_num)", lazy="joined")
#schedule = relationship("Schedule", foreign_keys=[schedule_id], lazy="joined")
# trackservice_entry = relationship("ScheduleEntry", foreign_keys=[playlist_id, entry_num], lazy="joined")
# schedule_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackService.schedule_entry_id==ScheduleEntry.id)", lazy="joined")
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_one(trackservice_id):
# return DB.session.query(TrackService).filter(TrackService.trackservice_id == trackservice_id).first()
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_by_day(day):
# day_plus_one = day + datetime.timedelta(days=1)
# tracks = DB.session.query(TrackService).filter(TrackService.start >= str(day), TrackService.start < str(day_plus_one)).all()
# return tracks
#
# @staticmethod
# # ------------------------------------------------------------------------------------------ #
# def select_by_range(from_day, to_day):
# tracks = DB.session.query(TrackService).filter(TrackService.start >= str(from_day),
# TrackService.start < str(to_day)).all()
# return tracks
#
# # ------------------------------------------------------------------------------------------ #
# def __str__(self):
# return "TrackServiceID: #" + str(self.trackservice_id) + " playlist_id: " + str(self.playlist_id) + " started @ " + str(self.start) + " and played " + self.source
# ------------------------------------------------------------------------------------------ #
# class TrackServiceSchedule(db.Model, AuraDatabaseModel):
# """
# Trackservice is tracking every schedule.
# """
# __tablename__ = 'trackservice_schedule'
#
# # primary and foreign keys
# ts_schedule_id = Column(Integer, primary_key=True, autoincrement=True)
# schedule_id = Column(Integer, ForeignKey("schedule.schedule_id"))
#
# schedule = relationship("Schedule", foreign_keys=[schedule_id], lazy="joined")
#
# # ------------------------------------------------------------------------------------------ #
# @staticmethod
# def select_one(schedule_id):
# # damn BAND-AID
# # db.session.commit()
#
# return db.session.query(ScheduleEntry).filter(TrackServiceSchedule.schedule_id == schedule_id).first()
#
# # ------------------------------------------------------------------------------------------ #
# class TrackServiceScheduleEntry(db.Model, AuraDatabaseModel):
# """
# And a schedule can have multiple entries
# """
# __tablename__ = 'trackservice_entry'
#
# # primary and foreign keys. the foreign keys here can be null, because of fallback stuff
# ts_entry_id = Column(Integer, primary_key=True, autoincrement=True)
# ts_schedule_id = Column(Integer, ForeignKey("trackservice_schedule.ts_schedule_id"), nullable=True)
# playlist_id = Column(Integer, nullable=True)
# entry_num = Column(Integer, nullable=True)
#
# fallback = Column(Boolean, default=False)
# fallback_start = Column(DateTime, nullable=True, default=None)
# source = Column(String(256), nullable=True, default=None)
#
# # foreign key definitions
# __table_args__ = (
# ForeignKeyConstraint(['playlist_id', 'entry_num'], ['schedule_entry.playlist_id', 'schedule_entry.entry_num']),
# )
#
# trackservice_schedule = relationship("TrackServiceSchedule", foreign_keys=[ts_schedule_id], lazy="joined")
# #trackservice_entry = relationship("ScheduleEntry", foreign_keys=[playlist_id, entry_num], lazy="joined")
# trackservice_entry = relationship("ScheduleEntry", primaryjoin="and_(TrackServiceScheduleEntry.playlist_id==ScheduleEntry.playlist_id, TrackServiceScheduleEntry.entry_num==ScheduleEntry.entry_num)" , lazy="joined")
#
# @staticmethod
# def select_all():
# return db.session.query(TrackServiceScheduleEntry).filter().all()
# AuraDatabaseModel.recreate_db(systemexit=True)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import time
from enum import Enum
from modules.core.channels import ChannelType
from modules.scheduling.types import PlaylistType
class EngineUtil:
"""
A class for Engine utilities.
"""
@staticmethod
def get_channel_type(uri):
"""
Returns the channel type, depending on the passed URI and source.
Args:
uri (String): The URI of the source
"""
if uri.startswith("https"):
return ChannelType.HTTPS
if uri.startswith("http"):
return ChannelType.HTTP
if uri.startswith("pool") or uri.startswith("playlist") or uri.startswith("file"):
return ChannelType.FILESYSTEM
if uri.startswith("line://"):
return ChannelType.LIVE
@staticmethod
def uri_to_filepath(base_dir, uri):
"""
Converts a file-system URI to an actual, absolute path to the file.
Args:
basi_dir (String): The location of the audio store.
uri (String): The URI of the file
Returns:
path (String): Absolute file path
"""
return base_dir + "/" + uri[7:] + ".flac"
@staticmethod
def get_playlist_type(fallback_id):
"""
Converts an playlist type ID to the playlist type object.
Args:
id (String): playlist type ID
Returns:
type (PlaylistType): The type
"""
if fallback_id == 0:
return PlaylistType.DEFAULT
elif fallback_id == 1:
return PlaylistType.SHOW
elif fallback_id == 2:
return PlaylistType.TIMESLOT
else:
return PlaylistType.STATION
@staticmethod
def get_entries_string(entries):
"""
Returns a list of entries as String for logging purposes.
"""
s = ""
if isinstance(entries, list):
for entry in entries:
s += str(entry)
if entry != entries[-1]: s += ", "
else:
s = str(entries)
return s
@staticmethod
def lqs_annotate_cuein(uri, cue_in):
"""
Wraps the given URI with a Liquidsoap Cue In annotation.
Args:
uri (String): The path to the audio source
cue_in (Float): The value in seconds wher the cue in should start
Returns:
(String): The annotated URI
"""
if cue_in > 0.0:
uri = "annotate:liq_cue_in=\"%s\":%s" % (str(cue_in), uri)
return uri
@staticmethod
def engine_info(component, version):
"""
Prints the engine logo and version info.
"""
return """\n
█████╗ ██╗ ██╗██████╗ █████╗ ███████╗███╗ ██╗ ██████╗ ██╗███╗ ██╗███████╗
██╔══██╗██║ ██║██╔══██╗██╔══██╗ ██╔════╝████╗ ██║██╔════╝ ██║████╗ ██║██╔════╝
███████║██║ ██║██████╔╝███████║ █████╗ ██╔██╗ ██║██║ ███╗██║██╔██╗ ██║█████╗
██╔══██║██║ ██║██╔══██╗██╔══██║ ██╔══╝ ██║╚██╗██║██║ ██║██║██║╚██╗██║██╔══╝
██║ ██║╚██████╔╝██║ ██║██║ ██║ ███████╗██║ ╚████║╚██████╔╝██║██║ ╚████║███████╗
╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝╚══════╝
%s v%s - Ready to play!
\n""" % (component, version)
class SimpleUtil:
"""
A container class for simple utility methods.
"""
@staticmethod
def clean_dictionary(data):
"""
Delete keys with the value `None` in a dictionary, recursively.
This alters the input so you may wish to `copy` the dict first.
Args:
data (dict): The dicationary
Returns:
(dict):
"""
for key, value in list(data.items()):
if value is None:
del data[key]
elif isinstance(value, dict):
SimpleUtil.clean_dictionary(value)
return data
@staticmethod
def fmt_time(timestamp):
"""
Formats a UNIX timestamp to a String displaying time in the format '%H:%M:%S'.
Args:
(Integer) timestamp: Unix epoch
Returns:
(String): Displaying the time
"""
return datetime.datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')
@staticmethod
def timestamp(date_and_time=None):
"""
Transforms the given `datetime` into a UNIX epoch timestamp.
If no parameter is passed, the current timestamp is returned.
Args:
(Datetime) date_and_time: the date and time to transform.
Returns:
(Integer): timestamp in seconds.
"""
if not date_and_time:
date_and_time = datetime.datetime.now()
return time.mktime(date_and_time.timetuple())
@staticmethod
def strike(text):
"""
Creates a strikethrough version of the given text.
Args:
(String) text: the text to strike.
Returns:
(String): the striked text.
"""
result = ""
for c in str(text):
result += c + TerminalColors.STRIKE.value
return result
@staticmethod
def bold(text):
"""
Creates a bold version of the given text.
"""
return TerminalColors.BOLD.value + text + TerminalColors.ENDC.value
@staticmethod
def underline(text):
"""
Creates a underlined version of the given text.
"""
return TerminalColors.UNDERLINE.value + text + TerminalColors.ENDC.value
@staticmethod
def blue(text):
"""
Creates a blue version of the given text.
"""
return TerminalColors.BLUE.value + text + TerminalColors.ENDC.value
@staticmethod
def red(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.RED.value + text + TerminalColors.ENDC.value
@staticmethod
def pink(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.PINK.value + text + TerminalColors.ENDC.value
@staticmethod
def yellow(text):
"""
Creates a yellow version of the given text.
"""
return TerminalColors.YELLOW.value + text + TerminalColors.ENDC.value
@staticmethod
def green(text):
"""
Creates a red version of the given text.
"""
return TerminalColors.GREEN.value + text + TerminalColors.ENDC.value
@staticmethod
def cyan(text):
"""
Creates a cyan version of the given text.
"""
return TerminalColors.CYAN.value + text + TerminalColors.ENDC.value
class TerminalColors(Enum):
"""
Colors for formatting terminal output.
"""
HEADER = "\033[95m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
PINK = "\033[35m"
CYAN = "\033[36m"
WARNING = "\033[31m"
FAIL = "\033[41m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
STRIKE = "\u0336"
ENDC = "\033[0m"
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
from modules.cli.redis.adapter import ClientRedisAdapter, ServerRedisAdapter
from modules.base.models import AuraDatabaseModel
class Padavan:
args = None
config = None
ss = None
zmqclient = None
redisclient = None
stringreply = ""
# ------------------------------------------------------------------------------------------ #
def __init__(self, args, config):
self.args = args
self.config = config
# ------------------------------------------------------------------------------------------ #
def meditate(self):
if self.args.fetch_new_programme:
self.fetch_new_programme()
elif self.args.mixer_channels_selected:
self.mixer_channels_selected()
elif self.args.mixer_status:
self.mixer_status()
elif self.args.get_act_programme:
self.get_act_programme()
elif self.args.get_status:
self.get_status()
elif self.args.get_connection_status:
self.get_connection_status()
elif self.args.shutdown:
self.shutdown()
elif self.args.redis_message:
self.redis_message(self.args.redis_message[0], self.args.redis_message[1])
elif self.args.select_mixer != -1:
self.select_mixer(self.args.select_mixer)
elif self.args.deselect_mixer != -1:
self.select_mixer(self.args.deselect_mixer, False)
elif self.args.set_volume:
self.set_volume(self.args.set_volume[0], self.args.set_volume[1])
elif self.args.print_message_queue:
self.print_message_queue()
elif self.args.get_file_for:
self.get_next_file(self.args.get_file_for)
elif self.args.set_file_for:
self.set_next_file(self.args.set_file_for[0], self.args.set_file_for[1])
elif self.args.now_playing:
print("")
elif self.args.init_player:
self.init_player()
elif self.args.on_play:
self.on_play(self.args.on_play)
elif self.args.recreatedb:
self.recreatedb()
# else:
# raise Exception("")
# init liquid => faster exec time, when loading at runtime just what is needed
# ------------------------------------------------------------------------------------------ #
def init_liquidsoap_communication(self):
# import
from modules.core.engine import SoundSystem
# init liquidsoap communication
self.ss = SoundSystem(self.config)
# enable connection
self.ss.enable_transaction()
# ------------------------------------------------------------------------------------------ #
def destroy_liquidsoap_communication(self):
# enable connection
self.ss.disable_transaction()
# ------------------------------------------------------------------------------------------ #
def init_redis_communication(self, with_server=False):
self.redisclient = ClientRedisAdapter(self.config)
if with_server:
self.redisserver = ServerRedisAdapter(self.config)
# ------------------------------------------------------------------------------------------ #
def send_redis(self, channel, message):
self.init_redis_communication()
self.redisclient.publish(channel, message)
# ------------------------------------------------------------------------------------------ #
def send_and_wait_redis(self, channel, message, reply_channel):
self.init_redis_communication(True)
self.redisclient.publish(channel, message)
return self.redisserver.listen_for_one_message(reply_channel.value)
# ------------------------------------------------------------------------------------------ #
def shutdown(self):
self.send_redis("aura", "shutdown")
self.stringreply = "Shutdown message sent!"
# ------------------------------------------------------------------------------------------ #
def fetch_new_programme(self):
json_reply = self.send_and_wait_redis("aura", "fetch_new_programme", RedisChannel.FNP_REPLY)
if json_reply != "":
actprogramme = json.loads(json_reply)
self.print_programme(actprogramme)
else:
print("No programme fetched")
# ------------------------------------------------------------------------------------------ #
def get_act_programme(self):
json_reply = self.send_and_wait_redis("aura", "get_act_programme", RedisChannel.GAP_REPLY)
actprogramme = json.loads(json_reply)
self.print_programme(actprogramme)
def get_status(self):
"""
Retrieves the Engine's status information.
"""
json_reply = self.send_and_wait_redis("aura", "get_status", RedisChannel.GS_REPLY)
# status = json.loads(json_reply)
self.stringreply = json_reply
# ------------------------------------------------------------------------------------------ #
def get_connection_status(self):
json_reply = self.send_and_wait_redis("aura", "get_connection_status", RedisChannel.GCS_REPLY)
connection_status = json.loads(json_reply)
self.print_connection_status(connection_status)
# ------------------------------------------------------------------------------------------ #
def print_programme(self, programme):
cnt = 1
for show in programme:
for entry in show["playlist"]:
self.stringreply += str(cnt) + \
" --- schedule id #" + str(show["schedule_id"]) + "." + str(entry["entry_num"]) + \
" - show: " + show["show_name"] + \
" - starts @ " + entry["entry_start"] + \
" - plays " + str(entry["source"]) + "\n"
cnt = cnt + 1
# ------------------------------------------------------------------------------------------ #
def print_connection_status(self, connection_status):
if connection_status["pv"]:
self.stringreply = "Connection to pv: " + TerminalColors.GREEN.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
else:
self.stringreply = "Connection to pv: " + TerminalColors.RED.value + " " + str(connection_status["pv"]) + TerminalColors.ENDC.value
if connection_status["db"]:
self.stringreply += "\nConnection to db: " + TerminalColors.GREEN.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to db: " + TerminalColors.RED.value + " " + str(connection_status["db"]) + TerminalColors.ENDC.value
if connection_status["lqs"]:
self.stringreply += "\nConnection to lqs: " + TerminalColors.GREEN.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqs: " + TerminalColors.RED.value + " " + str(connection_status["lqs"]) + TerminalColors.ENDC.value
if connection_status["lqsr"]:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.GREEN.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to lqsr: " + TerminalColors.RED.value + " " + str(connection_status["lqsr"]) + TerminalColors.ENDC.value
if connection_status["tank"]:
self.stringreply += "\nConnection to tank: " + TerminalColors.GREEN.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to tank: " + TerminalColors.RED.value + " " + str(connection_status["tank"]) + TerminalColors.ENDC.value
if connection_status["redis"]:
self.stringreply += "\nConnection to redis: " + TerminalColors.GREEN.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
else:
self.stringreply += "\nConnection to redis: " + TerminalColors.RED.value + " " + str(connection_status["redis"]) + TerminalColors.ENDC.value
# ------------------------------------------------------------------------------------------ #
def init_player(self):
"""
Initializes the player on Liquidsaop startup.
"""
self.stringreply = self.send_and_wait_redis("aura", "init_player", RedisChannel.IP_REPLY)
def on_play(self, info):
"""
Event handler to be called when some entry started playing.
"""
self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY)
# ------------------------------------------------------------------------------------------ #
def recreatedb(self):
print("YOU WILL GET PROBLEMS DUE TO DATABASE BLOCKING IF aura.py IS RUNNING! NO CHECKS IMPLEMENTED SO FAR!")
x = AuraDatabaseModel()
x.recreate_db()
self.stringreply = "Database recreated!"
# ------------------------------------------------------------------------------------------ #
def redis_message(self, channel, message):
self.send_redis(channel, message)
self.stringreply = "Message '"+message+"' sent to channel '"+channel+"'"
# ------------------------------------------------------------------------------------------ #
def print_message_queue(self):
self.stringreply = self.send_and_wait_redis("aura", "print_message_queue", RedisChannel.PMQ_REPLY)
# LIQUIDSOAP #
# ------------------------------------------------------------------------------------------ #
def select_mixer(self, mixername, activate=True):
# init lqs
self.init_liquidsoap_communication()
# select mixer and return the feedback
self.stringreply = self.ss.channel_activate(mixername, activate)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def set_volume(self, mixernumber, volume):
# init lqs and enable comm
self.init_liquidsoap_communication()
self.stringreply = self.ss.channel_volume(mixernumber, volume)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_channels_selected(self):
self.init_liquidsoap_communication()
am = self.ss.mixer_channels_selected()
if len(am) == 0:
self.destroy_liquidsoap_communication()
raise Exception("Guru recognized a problem: No active source!!!")
self.stringreply = str(am)
# disable connection
self.destroy_liquidsoap_communication()
# ------------------------------------------------------------------------------------------ #
def mixer_status(self):
self.init_liquidsoap_communication()
status = self.ss.mixer_status()
for k, v in status.items():
self.stringreply += "source: " + k + "\t status: " + v + "\n"
# disable connection
self.destroy_liquidsoap_communication()
# REDIS #
# ------------------------------------------------------------------------------------------ #
def get_next_file(self, type):
# redis = RedisMessenger()
# next_file = redis.get_next_file_for(type)
# if next_file == "":
# next_file = "/var/audio/blank.flac"
# self.stringreply = next_file
#self.send_redis("aura", "set_next_file " + type)
next_file = self.send_and_wait_redis("aura", "get_next_file " + type, RedisChannel.GNF_REPLY)
self.stringreply = next_file
# ------------------------------------------------------------------------------------------ #
def set_next_file(self, type, file):
#from modules.cli.redis.messenger import RedisMessenger
#redis = RedisMessenger()
#redis.set_next_file_for(type, file)
self.send_redis("aura", "set_next_file " + type + " " + file)
self.stringreply = "Set "+file+" for fallback '"+type+"'"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import json
from datetime import datetime
from threading import Event
import threading
import redis
from modules.cli.redis.messenger import RedisMessenger
from modules.cli.redis.statestore import RedisStateStore
# from modules.communication.connection_tester import ConnectionTester
from modules.base.exceptions import RedisConnectionException
from modules.cli.redis.channels import RedisChannel
from modules.base.utils import TerminalColors
# ------------------------------------------------------------------------------------------ #
class ServerRedisAdapter(threading.Thread, RedisMessenger):
debug = False
pubsub = None
config = None
redisdb = None
channel = ""
scheduler = None
redisclient = None
# connection_tester = None
soundsystem = None
socket = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
threading.Thread.__init__(self)
RedisMessenger.__init__(self, config)
# init
#threading.Thread.__init__ (self)
self.config = config
self.shutdown_event = Event()
self.channel = RedisChannel.STANDARD.value
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
self.can_send = None
self.redisclient = ClientRedisAdapter(config)
# self.connection_tester = ConnectionTester()
# ------------------------------------------------------------------------------------------ #
def run(self):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"))
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(self.channel)
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
# listener loop
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item))
item["channel"] = self.decode_if_needed(item["channel"])
item["data"] = self.decode_if_needed(item["data"])
try:
self.work(item)
except RedisConnectionException as rce:
self.logger.error(str(rce))
if not self.shutdown_event.is_set():
self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
self.pubsub.unsubscribe()
if not self.shutdown_event.is_set():
self.logger.warning("unsubscribed from " + self.channel + " and finished")
# ------------------------------------------------------------------------------------------ #
def decode_if_needed(self, val):
if isinstance(val, bytes):
return val.decode("utf-8")
return val
# ------------------------------------------------------------------------------------------ #
def listen_for_one_message(self, channel, socket_timeout=2):
self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout)
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(channel)
try:
self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds")
for item in self.pubsub.listen():
it = self.receive_message(item)
if it is not None:
break
except redis.exceptions.TimeoutError as te:
raise te
return item["data"]
# ------------------------------------------------------------------------------------------ #
def receive_message(self, item):
if item["type"] == "subscribe":
self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8"))
return None
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
self.pubsub.unsubscribe()
return item
# ------------------------------------------------------------------------------------------ #
def work(self, item):
if item["data"] == "fetch_new_programme":
#self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme)
self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "shutdown":
self.terminate()
elif item["data"] == "init_player":
self.execute(RedisChannel.IP_REPLY.value, self.soundsystem.init_player)
elif item["data"] == "get_act_programme":
self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string)
elif item["data"] == "get_status":
def get_status_string():
status = "No monitoring plugin available!"
if "monitor" in self.soundsystem.plugins:
status = self.soundsystem.plugins["monitor"].get_status()
return json.dumps(status)
self.execute(RedisChannel.GS_REPLY.value, get_status_string)
# elif item["data"] == "get_connection_status":
# self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status)
elif item["data"] == "print_message_queue":
self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue)
elif item["data"].find("set_next_file") >= 0:
playlist = item["data"].split()[1]
playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist)
elif item["data"].find("get_next_file") >= 0:
playlist = item["data"].split()[1]
#playlist = playlist[0:len(playlist)-8]
self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)
elif item["data"].find("on_play") >= 0:
source = item["data"].split("on_play ")[1]
self.execute(RedisChannel.TS_REPLY.value, self.scheduler.soundsystem.on_play, source)
elif item["data"] == "recreate_db":
self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)
elif item["data"] == "status":
return True
else:
raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"])
# ------------------------------------------------------------------------------------------ #
def execute(self, channel, f, param1=None, param2=None, param3=None):
if param1 != None:
if param2 != None:
if param3 != None:
reply = f(param1, param2, param3)
else:
reply = f(param1, param2)
else:
reply = f(param1)
else:
reply = f()
if reply is None:
reply = ""
# sometimes the sender is faster than the receiver. redis messages would be lost
time.sleep(0.1)
self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value)
# publish
self.redisclient.publish(channel, reply)
# ------------------------------------------------------------------------------------------ #
def join_comm(self):
try:
while self.is_alive():
self.logger.debug(str(datetime.now())+" joining")
self.join()
self.logger.warning("join out")
except (KeyboardInterrupt, SystemExit):
# Dem Server den Shutdown event setzen
# server.shutdown_event.set()
# Der Server wartet auf Eingabe
# Daher einen Client initiieren, der eine Nachricht schickt
self.halt()
sys.exit('Terminated')
# ------------------------------------------------------------------------------------------ #
def halt(self):
"""
Stop the server
"""
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
try:
self.socket.unbind("tcp://"+self.ip+":"+self.port)
except:
pass
# self.socket.close()
# ------------------------------------------------------------------------------------------ #
def send(self, message):
"""
Send a message to the client
:param message: string
"""
# FIXME Review logic
if not self.can_send:
self.logger.debug("sending a "+str(len(message))+" long message via REDIS.")
self.socket.send(message.encode("utf-8"))
self.can_send = False
else:
self.logger.warning("cannot send message via REDIS: "+str(message))
def terminate(self):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.shutdown_event.set()
self.scheduler.terminate()
self.pubsub.close()
self.logger.info("Shutdown event received. Bye bye ...")
# ------------------------------------------------------------------------------------------ #
class ClientRedisAdapter(RedisMessenger):
def __init__(self, config):
RedisMessenger.__init__(self, config)
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
if type(channel) == RedisChannel:
channel = channel.value
self.rstore.publish(channel, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
class RedisChannel(Enum):
STANDARD = "aura"
DPE_REPLY = "delete_playlist_entry_reply"
FNP_REPLY = "fetch_new_programme_reply"
GAP_REPLY = "get_act_programme_reply"
GS_REPLY = "get_status_reply"
GCS_REPLY = "get_connection_status_reply"
GNF_REPLY = "get_next_file_reply"
IPE_REPLY = "insert_playlist_entry_reply"
IP_REPLY = "init_player_reply"
TS_REPLY = "track_service_reply"
MPE_REPLY = "move_playlist_entry_reply"
PMQ_REPLY = "print_message_queue_reply"
RDB_REPLY = "recreate_database_reply"
SNF_REPLY = "get_next_file_reply"
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.cli.redis.statestore import RedisStateStore
from modules.cli.redis.channels import RedisChannel
"""
Send and receive redis messages
"""
# ------------------------------------------------------------------------------------------ #
class RedisMessenger():
logger = None
rstore = None
# ------------------------------------------------------------------------------------------ #
def __init__(self, config):
super(RedisMessenger, self).__init__()
"""
Constructor
"""
self.logger = logging.getLogger("AuraEngine")
self.channel = RedisChannel.STANDARD
self.section = ''
self.rstore = RedisStateStore(config)
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Einen "Kanal" setzen - zb scheduling
@type channel: string
@param channel: Kanal/Name der Komponente
"""
self.channel = channel
if channel in self.components:
self.errnr = self.components[channel]
self.rstore.set_channel(channel)
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
@type section: string
@param section: Gültigkeitsbereich
"""
self.section = section
# # ------------------------------------------------------------------------------------------ #
# def set_mail_addresses(self, fromMail, adminMails):
# """
# Einen Sektion / Gültigkeitsbereich der Meldung setzen - zb internal
# @type section: string
# @param section: Gültigkeitsbereich
# """
# self.fromMail = fromMail
# self.adminMails = adminMails
# # ------------------------------------------------------------------------------------------ #
# def send(self, message, code, level, job, value='', section=''):
# """
# Eine Message senden
# @type message: string
# @param message: menschenverständliche Nachricht
# @type code: string
# @param code: Fehlercode - endet mit 00 bei Erfolg
# @type level: string
# @param level: Error-Level - info, warning, error, fatal
# @type job: string
# @param job: Name der ausgeführten Funktion
# @type value: string
# @param value: Ein Wert
# @type section: string
# @param section: Globale Sektion überschreiben
# """
# section = self.section if section == '' else section
# self.time = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
# self.utime = time.time()
# state = {'message':message.strip().replace("'","\\'"), 'code':self.errnr + str(code),'job':job,'value':value}
# self.rstore.set_section(section)
# self.rstore.store(level, state)
# if level == 'info' or level == 'success':
# self.logger.info(message)
# elif level == 'warning':
# self.logger.warning(message)
# elif level == 'error':
# self.logger.error(message)
# self.send_admin_mail(level, message, state)
# elif level == 'fatal':
# self.logger.critical(message)
# self.send_admin_mail(level, message, state)
# # ------------------------------------------------------------------------------------------ #
# def say_alive(self):
# """
# Soll alle 20 Sekunden von den Komponenten ausgeführt werden,
# um zu melden, dass sie am Leben sind
# """
# self.rstore.set_alive_state()
# # ------------------------------------------------------------------------------------------ #
# def get_alive_state(self, channel):
# """
# Live State abfragen
# @type channel: string
# @param channel: Channel/Komponente
# """
# return self.rstore.get_alive_state(channel)
# # ------------------------------------------------------------------------------------------ #
# def set_state(self, name, value, expires=None, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: Name des state
# @type value: string
# @param value: Wert
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.set_state(name, value, expires, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_add_event(self, name, eventtime, value, channel=None):
# """
# Kündigt einen Event an
# @type name: string
# @param name: der Name des Events
# @type eventtime: string|datetime.datetime
# @param eventtime: Datum und Zeit des events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# if type(eventtime) == type(str()):
# eventtime_str = datetime.datetime.strptime(eventtime[0:16].replace(' ','T'), "%Y-%m-%dT%H:%M").strftime("%Y-%m-%dT%H:%M")
# elif type(eventtime) is datetime.datetime:
# eventtime_str = eventtime.strftime("%Y-%m-%dT%H:%M")
# else:
# raise TypeError('eventtime must be a datetime.date or a string, not a %s' % type(eventtime))
# self.rstore.queue_add_event(eventtime_str, name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def queue_remove_events(self, name, channel=None):
# """
# Löscht Events
# @type name: string
# @param name: der Name des Events
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.queue_remove_events(name, channel)
# # ------------------------------------------------------------------------------------------ #
# def fire_event(self, name, value, channel=None):
# """
# Feuert einen Event
# @type name: string
# @param name: der Name des Events
# @type value: dict
# @param value: Werte
# @type channel: string
# @param channel: Kanal (optional)
# """
# if not channel:
# channel = self.channel
# self.rstore.fire_event(name, value, channel)
# # ------------------------------------------------------------------------------------------ #
# def get_event_queue(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# queue = self.rstore.get_event_queue(name, channel)
# return queue
# # ------------------------------------------------------------------------------------------ #
# def get_events(self, name=None, channel=None):
# """
# Holt events eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: list
# @return: Liste der Events
# """
# events = self.rstore.get_events(name, channel)
# return events
# # ------------------------------------------------------------------------------------------ #
# def get_event(self, name=None, channel=None):
# """
# Holt event eines Kanals
# @type channel: string
# @param channel: Kanal (optional)
# @rtype: dict
# @return: Event
# """
# events = self.rstore.get_events(name, channel)
# result = False
# if events:
# result = events.pop(0)
# return result
# # ------------------------------------------------------------------------------------------ #
# def send_admin_mail(self, level, message, state):
# """
# Sendent mail an Admin(s),
# @type message: string
# @param message: Die Message
# @type state: dict
# @param state: Der State
# @return result
# """
# # FIXME Make Mailer functional: Invalid constructor
# if self.fromMail and self.adminMails:
# #subject = "Possible comba problem on job " + state['job'] + " - " + level
# mailmessage = "Hi Admin,\n comba reports a possible problem\n\n"
# mailmessage = mailmessage + level + "!\n"
# mailmessage = mailmessage + message + "\n\n"
# mailmessage = mailmessage + "Additional information:\n"
# mailmessage = mailmessage + "##################################################\n"
# mailmessage = mailmessage + "Job:\t" + state['job'] + "\n"
# mailmessage = mailmessage + "Code:\t" + state['code'] + "\n"
# mailmessage = mailmessage + "Value:\t" + str(state['value']) + "\n"
# #mailer = AuraMailer(self.adminMails, self.fromMail)
# #mailer.send_admin_mail(subject, mailmessage)
# else:
# return False
# # ------------------------------------------------------------------------------------------ #
# def receive(self):
# """
# Bisher wird nichts empfangen
# """
# return ""
# # ------------------------------------------------------------------------------------------ #
# def get_next_file_for(self, playlisttype):
# next = self.rstore.db.get('next_'+playlisttype+'file')
# if next is None:
# next = b""
# return next.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def on_play(self, info):
# result = self.rstore.db.get('on_play')
# if result is None:
# result = b""
# return result.decode('utf-8')
# ------------------------------------------------------------------------------------------ #
# def set_next_file_for(self, playlisttype, file):
# self.rstore.db.set("next_" + playlisttype + "file", file)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import datetime
import json
import re
import uuid
import redis
class RedisStateStore(object):
"""Store and get Reports from redis"""
def __init__(self, config, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
self.channel = '*'
self.section = '*'
self.separator = '_'
self.daily = False
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Kanal setzen
@type channel: string
@param channel: Kanal
"""
self.channel = channel
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Sektion setzen
@type section: string
@param section: Sektion
"""
self.section = section
# ------------------------------------------------------------------------------------------ #
def set_alive_state(self):
"""
Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
"""
self.set_state('alive', 'Hi', 21)
# ------------------------------------------------------------------------------------------ #
def get_alive_state(self, channel):
"""
Alive Status eines Channels ermitteln
@type channel: string
@param channel: der Channel
@rtype: string/None
@return: Ein String, oder None, bei negativem Ergebnis
"""
return self.get_state('alive', channel)
# ------------------------------------------------------------------------------------------ #
def set_state(self, name, value, expires=None, channel=None):
"""
Setzt einen Status
@type name: string
@param name: Name des state
@type value: string
@param value: Wert
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
key = self.__create_key__(channel + 'State', name)
if value == "":
self.db.delete(key)
else:
# publish on channel
message = json.dumps({'eventname':name, 'value': value})
self.db.publish(channel + 'Publish', message)
# store in database
self.db.set(key, value)
if(expires):
self.db.expire(key, 21)
# ------------------------------------------------------------------------------------------ #
def get_state(self, name, channel):
"""
Holt einen Status
@type name: string
@param name: Name des state
@type channel: string
@param channel: Kanal (optional)
"""
key = self.__create_key__(channel + 'State', name)
return self.db.get(key)
# ------------------------------------------------------------------------------------------ #
def queue_add_event(self, eventtime, name, value, channel=None):
"""
Kündigt einen Event an
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
# ------------------------------------------------------------------------------------------ #
def queue_remove_events(self, name=None, channel=None):
"""
Löscht Events
@type name: string
@param name: Name des Events
@type channel: string
@param channel: Kanal (optional)
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
for delkey in keys:
self.db.delete(delkey)
# ------------------------------------------------------------------------------------------ #
def fire_event(self, name, value, channel=None):
"""
Feuert einen Event
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
# ------------------------------------------------------------------------------------------ #
def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
"""
Feuert einen Event
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
key = self.__create_key__(channel + type, eventtime, name)
value['starts'] = eventtime[0:16]
value['eventchannel'] = channel
value['eventname'] = name
self.db.hset(key, namespace, value)
self.db.expire(key, expire)
# ------------------------------------------------------------------------------------------ #
def get_event_queue(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'evqueue')
return entries
# ------------------------------------------------------------------------------------------ #
def get_events(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Event_' if channel else '*Event_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'events')
return entries
# ------------------------------------------------------------------------------------------ #
def get_next_event(self, name=None, channel=None):
"""
Holt den aktuellsten Event
@type channel: string
@param channel: Kanal (optional)
@rtype: dict/boolean
@return: ein Event oder False
"""
events = self.get_event_queue(name, channel)
if len(events) > 0:
result = events.pop(0)
else:
result = False
return result
# ------------------------------------------------------------------------------------------ #
def store(self, level, value):
"""
Hash speichern
@type level: string
@param level: der errorlevel
@type value: dict
@param value: Werte als dict
"""
microtime = str(time.time())
value['microtime'] = microtime
value['level'] = level
key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
self.db.hset(key, self.channel, value)
self.db.expire(key, 864000)
# ------------------------------------------------------------------------------------------ #
def __get_keys__(self, level ='*'):
"""
Redis-Keys nach Suchkriterium ermitteln
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Die Keys auf die das Suchkriterium zutrifft
"""
key = self.__create_key__(self.channel, self.section, level)
microtime = str(time.time())
search = microtime[0:4] + '*' if self.daily else '*'
return self.db.keys(key + self.separator + '*')
# ------------------------------------------------------------------------------------------ #
def __create_key__(self, *args):
"""
Key erschaffen - beliebig viele Argumente
@rtype: string
@return: Der key
"""
return self.separator.join(args)
def get_entries(self, level ='*'):
"""
Liste von Hashs nach Suchkriterium erhalten
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Redis Hashs
"""
def tsort(x,y):
if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
return 1
elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
return -1
else:
return 0
keys = self.__get_keys__(level)
keys.sort(tsort)
entries = self.__get_entries__(keys, self.channel)
entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
return entries
# ------------------------------------------------------------------------------------------ #
def __get_entries__(self, keys, channel):
entries = []
for key in keys:
entry = self.db.hget(key,channel)
entry = json.dumps(entry.decode('utf-8'))
if not (entry is None):
try:
entry = entry.decode('utf-8').replace('None','"None"')
entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
entry = json.loads(entry)
entry['key'] = key
entries.append(entry)
except:
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
class TransitionType(Enum):
"""
Types of fade-in and fade-out transition.
"""
INSTANT = "instant"
FADE = "fade"
class Channel(Enum):
"""
Channel name mappings to the Liqidsoap channel IDs
"""
FILESYSTEM_A = "in_filesystem_0"
FILESYSTEM_B = "in_filesystem_1"
HTTP_A = "in_http_0"
HTTP_B = "in_http_1"
HTTPS_A = "in_https_0"
HTTPS_B = "in_https_1"
LIVE_0 = "linein_0"
LIVE_1 = "linein_1"
LIVE_2 = "linein_2"
LIVE_3 = "linein_3"
LIVE_4 = "linein_4"
def __str__(self):
return str(self.value)
class ChannelType(Enum):
"""
Engine channel types mapped to `Entry` source types.
"""
FILESYSTEM = {
"id": "fs",
"numeric": 0,
"channels": [Channel.FILESYSTEM_A, Channel.FILESYSTEM_B]
}
HTTP = {
"id": "http",
"numeric": 1,
"channels": [Channel.HTTP_A, Channel.HTTP_B]
}
HTTPS = {
"id": "https",
"numeric": 2,
"channels": [Channel.HTTPS_A, Channel.HTTPS_B]
}
LIVE = {
"id": "live",
"numeric": 3,
"channels": [
Channel.LIVE_0,
Channel.LIVE_1,
Channel.LIVE_2,
Channel.LIVE_3,
Channel.LIVE_4
]
}
@property
def channels(self):
return self.value["channels"]
@property
def numeric(self):
return self.value["numeric"]
def __str__(self):
return str(self.value["id"])
class EntryPlayState(Enum):
UNKNOWN = "unknown"
LOADING = "loading"
READY = "ready_to_play"
PLAYING = "playing"
FINISHED = "finished"
class LiquidsoapResponse(Enum):
SUCCESS = "Done"
STREAM_STATUS_POLLING = "polling"
STREAM_STATUS_STOPPED = "stopped"
STREAM_STATUS_CONNECTED = "connected"