diff --git a/engine-core.py b/engine-core.py index 9eda0b62d164911c70095ff8fd25b90a1ccea6ed..b79560b1b164b105a178361b51216e54b76a4760 100755 --- a/engine-core.py +++ b/engine-core.py @@ -21,10 +21,8 @@ import os import sys -import meta import signal import logging -import unittest import subprocess from flask import Flask @@ -61,6 +59,7 @@ class AuraEngine: server = None messenger = None controller = None + soundsystem = None scheduler = None lqs = None lqs_startup = None @@ -117,6 +116,11 @@ class AuraEngine: else: self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually.")) + # from modules.communication.redis.adapter import ServerRedisAdapter + # self.messenger = ServerRedisAdapter(self.config) + # self.messenger.scheduler = self.scheduler + # self.messenger.soundsystem = self.soundsystem + # self.messenger.start() def start_lqs(self, debug_output, verbose_output): diff --git a/guru.py b/guru.py index 99f662a140aee3611b615822ca85bd6700da3daf..19cb9d9aad77ed5245d4c345e67972ee31444c68 100755 --- a/guru.py +++ b/guru.py @@ -21,13 +21,12 @@ import time import sys + import redis -from pathlib import Path from argparse import ArgumentParser -# own libs -from modules.cli_tool.padavan import Padavan +from modules.cli.padavan import Padavan from modules.base.exceptions import PlaylistException from modules.base.config import AuraConfig diff --git a/modules/base/exceptions.py b/modules/base/exceptions.py index dbb4d1c780ce477f064ba7a92d07ce41b4bd3bc0..ec60c36c96eecc5bc8c6743600e61bee938e697f 100644 --- a/modules/base/exceptions.py +++ b/modules/base/exceptions.py @@ -42,14 +42,6 @@ class NoActiveEntryException(Exception): pass -# Monitoring Exceptions - -class EngineMalfunctionException(Exception): - pass - -class MailingException(Exception): - pass - # Liquidsoap Execeptions diff --git a/modules/communication/mail.py b/modules/base/mail.py similarity index 97% rename from modules/communication/mail.py rename to modules/base/mail.py index 5e4a45c8335243cadb408f343e80c991fbb0ebe5..d5829fe36945e98d70d81fca12d26c50ad17e50f 100644 --- a/modules/communication/mail.py +++ b/modules/base/mail.py @@ -19,7 +19,15 @@ import smtplib from email.message import EmailMessage -from modules.base.exceptions import MailingException + + + +class MailingException(Exception): + """ + Thrown when some mail cannot be sent. + """ + pass + class AuraMailer(): @@ -29,7 +37,6 @@ class AuraMailer(): config = None - def __init__(self, config): """ Constructor to initialize service with Aura `config`. @@ -41,12 +48,10 @@ class AuraMailer(): self.admin_mails = config.get("admin_mail") - # # PUBLIC METHODS # - def send_admin_mail(self, subject, body): """ Sends an email to the administrator as defined in the configuration. @@ -61,7 +66,6 @@ class AuraMailer(): self.__send(mail_to, subject, body) - # # PRIVATE METHODS # diff --git a/modules/cli_tool/padavan.py b/modules/cli/padavan.py similarity index 100% rename from modules/cli_tool/padavan.py rename to modules/cli/padavan.py diff --git a/modules/communication/liquidsoap/playerclient.py b/modules/communication/liquidsoap/playerclient.py index 83a0af1b8d409b482fb02b695f8892cb57168188..5bdb4383e09671921364914f3442ab0b203a0d57 100644 --- a/modules/communication/liquidsoap/playerclient.py +++ b/modules/communication/liquidsoap/playerclient.py @@ -202,6 +202,24 @@ class LiquidSoapPlayerClient(LiquidSoapClient): return self.message + # + # General Entries + # + + def entry_status(self, rid): + """ + Retrieves the status of a given entry. + + Args: + rid (String): Resource ID (RID) + + Returns: + Liquidsoap server response + """ + self.command("request", "status", str(rid)) + return self.message + + # # Other # diff --git a/modules/communication/redis/adapter.py b/modules/communication/redis/adapter.py index 2b5cfcc28012eecc8399fb3c7711c33b7e7b8010..5a39ba161f7a16bf87b504e100739d4ceaeb57a4 100644 --- a/modules/communication/redis/adapter.py +++ b/modules/communication/redis/adapter.py @@ -20,12 +20,12 @@ import sys import time import json -import redis -import logging -import threading - from datetime import datetime from threading import Event +import threading + +import redis + from modules.communication.redis.messenger import RedisMessenger from modules.communication.redis.statestore import RedisStateStore @@ -154,7 +154,9 @@ class ServerRedisAdapter(threading.Thread, RedisMessenger): elif item["data"] == "get_status": def get_status_string(): - status = self.soundsystem.monitoring.get_status() + status = "No monitoring plugin available!" + if "monitor" in self.soundsystem.plugins: + status = self.soundsystem.plugins["monitor"].get_status() return json.dumps(status) self.execute(RedisChannel.GS_REPLY.value, get_status_string) diff --git a/modules/communication/redis/messenger.py b/modules/communication/redis/messenger.py index 73718c4dff6e1da7d8b86884a5787e70bf354c0c..48fa0d44a417b9d0a12c6052187c6c704b17d672 100644 --- a/modules/communication/redis/messenger.py +++ b/modules/communication/redis/messenger.py @@ -17,15 +17,13 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import time + import logging -import datetime + from modules.communication.redis.statestore import RedisStateStore -from modules.communication.mail import AuraMailer -from modules.base.exceptions import PlaylistException from modules.base.enum import RedisChannel -from modules.base.logger import AuraLogger + """ Send and receive redis messages diff --git a/modules/communication/redis/statestore.py b/modules/communication/redis/statestore.py index 91cf109fb35b52771fb56953ce6e14b427ba6cb5..de7dfeb9b7d88c1ffa4bd5863cf60d754913ca54 100644 --- a/modules/communication/redis/statestore.py +++ b/modules/communication/redis/statestore.py @@ -17,13 +17,14 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import redis import time import datetime import json import re import uuid +import redis + class RedisStateStore(object): diff --git a/modules/core/engine.py b/modules/core/engine.py index 602d6d019fe5bea68a11d308e6b99bbfd861e2a9..d8a011b8e67f868530bd5c2e04f8f000b84e2b48 100644 --- a/modules/core/engine.py +++ b/modules/core/engine.py @@ -19,47 +19,46 @@ import time import logging -import json from urllib.parse import urlparse, ParseResult from contextlib import suppress from threading import Thread +import meta + from modules.base.enum import ChannelType, Channel, TransitionType, LiquidsoapResponse, EntryPlayState -from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil -from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, EngineMalfunctionException, LQStreamException, LoadSourceException +from modules.base.utils import TerminalColors, SimpleUtil as SU, EngineUtil +from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, LQStreamException, LoadSourceException from modules.communication.liquidsoap.playerclient import LiquidSoapPlayerClient # from modules.communication.liquidsoap.recorderclient import LiquidSoapRecorderClient from modules.core.startup import StartupThread from modules.core.state import PlayerStateService -from modules.core.monitor import Monitoring -from modules.communication.mail import AuraMailer +from modules.core.events import EngineEventDispatcher + class SoundSystem(): - """ - SoundSystem Class + """ + The Soundsystem Mixer Control. - Uses LiquidSoapClient, but introduces more complex commands, transactions and error handling. + This class represents a virtual mixer as an abstraction layer to the actual audio hardware. + It uses LiquidSoapClient, but introduces more complex commands, transactions and error handling. + + From one layer above it is used by `AuraScheduler` as if a virtual DJ is remote controling the mixer. """ client = None logger = None transaction = 0 channels = None scheduler = None - monitoring = None - auramailer = None + event_dispatcher = None is_liquidsoap_running = False connection_attempts = 0 disable_logging = False fade_in_active = False fade_out_active = False - - # Active Channel & Entry Handling - active_channel_type = None active_channel = None - player_state = None - + plugins=None def __init__(self, config): """ @@ -71,72 +70,53 @@ class SoundSystem(): """ self.config = config self.logger = logging.getLogger("AuraEngine") - self.client = LiquidSoapPlayerClient(config, "engine.sock") # self.lqcr = LiquidSoapRecorderClient(config, "record.sock") - self.auramailer = AuraMailer(self.config) - self.monitoring = Monitoring(config, self, self.auramailer) + self.is_active() # TODO Check if it makes sense to move it to the boot-phase + self.plugins = dict() - self.is_active() - - # Initialize Default Channels - self.active_channel = { - ChannelType.FILESYSTEM: Channel.FILESYSTEM_A, - ChannelType.HTTP: Channel.HTTP_A, - ChannelType.HTTPS: Channel.HTTPS_A, - ChannelType.LIVE: Channel.LIVE_0 - } - # self.active_entries = {} - self.player_state = PlayerStateService(config) - - def start(self): """ - Starts the sound-system. + Starts the engine. Called when the connection to the sound-system implementation + has been established. """ + self.event_dispatcher = EngineEventDispatcher(self, self.scheduler) + # Sleep needed, because the socket is created too slowly by Liquidsoap time.sleep(1) - self.enable_transaction() - time.sleep(1) - - # Initialize all channels - channels = self.mixer_channels_reload() - for c in channels: - self.channel_volume(c, "0") - - # Setting init params like a blank file - # install_dir = self.config.get("install_dir") - # channel = self.active_channel[ChannelType.FILESYSTEM] - # self.playlist_push(channel, install_dir + "/configuration/blank.flac") - - self.disable_transaction() + self.mixer_initialize() self.is_liquidsoap_running = True - self.logger.info(SimpleUtil.green("Engine Core ------[ connected ]-------- Liquidsoap")) - - # Start Monitoring - is_valid = self.monitoring.has_valid_status(False) - status = self.monitoring.get_status() - self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4)) - if not is_valid: - self.logger.info("Engine Status: " + SimpleUtil.red(status["engine"]["status"])) - raise EngineMalfunctionException - else: - self.logger.info("Engine Status: " + SimpleUtil.green("OK")) + self.event_dispatcher.on_initialized() + self.logger.info(SU.green("Engine Core ------[ connected ]-------- Liquidsoap")) + self.event_dispatcher.on_boot() + self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__)) + self.event_dispatcher.on_ready() + # + # MIXER : GENERAL + # - def is_ready(self): + + def mixer_initialize(self): """ - Returns `True` if the soundsystem is connected to Liquidsoap and is ready to be used. + - Pull all faders down to volume 0. + - Initialize default channels per type """ - return self.is_liquidsoap_running - - + self.enable_transaction() + time.sleep(1) # TODO Check is this is still required + channels = self.mixer_channels_reload() + for channel in channels: + self.channel_volume(channel, "0") + self.disable_transaction() - # - # MIXER : GENERAL - # + self.active_channel = { + ChannelType.FILESYSTEM: Channel.FILESYSTEM_A, + ChannelType.HTTP: Channel.HTTP_A, + ChannelType.HTTPS: Channel.HTTPS_A, + ChannelType.LIVE: Channel.LIVE_0 + } def mixer_status(self): @@ -149,8 +129,8 @@ class SoundSystem(): self.enable_transaction() inputs = self.mixer_channels() - for input in inputs: - inputstate[input] = self.channel_status(cnt) + for channel in inputs: + inputstate[channel] = self.channel_status(cnt) cnt = cnt + 1 self.disable_transaction() @@ -163,7 +143,6 @@ class SoundSystem(): """ if self.channels is None or len(self.channels) == 0: self.channels = self.__send_lqc_command__(self.client, "mixer", "inputs") - return self.channels @@ -177,14 +156,13 @@ class SoundSystem(): self.enable_transaction() inputs = self.mixer_channels() - for input in inputs: + for channel in inputs: status = self.channel_status(cnt) if "selected=true" in status: - activeinputs.append(input) + activeinputs.append(channel) cnt = cnt + 1 self.disable_transaction() - return activeinputs @@ -196,9 +174,9 @@ class SoundSystem(): activemixer_copy = self.mixer_channels().copy() activemixer_copy.remove(input_type) except ValueError as e: - self.logger.error("Requested channel (" + input_type + ") not in channellist. Reason: " + str(e)) + self.logger.error("Requested channel (%s) not in channel-list. Reason: %s" % (input_type, str(e))) except AttributeError: - self.logger.critical("Channellist is None") + self.logger.critical("Empty channel list") return activemixer_copy @@ -211,15 +189,6 @@ class SoundSystem(): return self.mixer_channels() - - # ------------------------------------------------------------------------------------------ # - def get_mixer_volume(self, channel): - # FIXME Is this needed; even possible? - return False - - - - # # MIXER : CONTROL SECTION # @@ -260,11 +229,10 @@ class SoundSystem(): elif entry.get_type() == ChannelType.HTTP or entry.get_type() == ChannelType.HTTPS: is_ready = self.stream_load_entry(entry) - if is_ready == True: + if is_ready: entry.status = EntryPlayState.READY - # Store in play-log cache for later reference - self.player_state.add_to_history([entry]) + self.event_dispatcher.on_queue([entry]) @@ -290,7 +258,7 @@ class SoundSystem(): raise InvalidChannelException # Determine channel - channel = self.channel_swap(entry.get_type()) + channel = self.channel_swap(entries[0].get_type()) # Queue entries for entry in entries: @@ -303,8 +271,7 @@ class SoundSystem(): if self.playlist_push(entry.channel, entry.source) == True: entry.status = EntryPlayState.READY - # Store in play-log cache for later reference - self.player_state.add_to_history(entries) + self.event_dispatcher.on_queue(entries) @@ -336,7 +303,7 @@ class SoundSystem(): self.disable_transaction() # Update active channel and type - self.active_channel[entry.get_type()] = entry.channel + self.active_channel[entry.get_type()] = entry.channel # Dear filesystem channels, please leave the room as you would like to find it! if entry.previous_channel and entry.previous_channel in ChannelType.FILESYSTEM.channels: @@ -347,7 +314,7 @@ class SoundSystem(): self.enable_transaction() self.channel_activate(entry.previous_channel.value, False) res = self.playlist_clear(entry.previous_channel) - self.logger.info("Clear Queue Response: "+res) + self.logger.info("Clear Queue Response: " + res) self.disable_transaction() Thread(target=clean_up).start() @@ -355,24 +322,19 @@ class SoundSystem(): def on_play(self, source): """ - Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) + Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) when some entry is actually playing. Args: source (String): The URI of the media source currently being played """ - self.logger.info(SimpleUtil.pink("Source '%s' started playing" % source)) - - try: - self.player_state.store_trackservice_entry(source) - except NoActiveEntryException: - self.logger.warn(SimpleUtil.red("Currently there's nothing playing!")) + self.event_dispatcher.on_play(source) def stop(self, entry, transition): """ - Stops the currently playing entry. + Stops the currently playing entry. Args: entry (Entry): The entry to stop playing @@ -382,7 +344,7 @@ class SoundSystem(): self.enable_transaction() if not entry.channel: - self.logger.warn(SimpleUtil.red("Trying to stop entry %s, but it has no channel assigned" % entry)) + self.logger.warn(SU.red("Trying to stop entry %s, but it has no channel assigned" % entry)) return if transition == TransitionType.FADE: @@ -390,9 +352,9 @@ class SoundSystem(): else: self.channel_volume(entry.channel, 0) - self.logger.info(SimpleUtil.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry))) + self.logger.info(SU.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry))) self.disable_transaction() - + self.event_dispatcher.on_stop(entry) @@ -421,7 +383,7 @@ class SoundSystem(): else: new_channel = Channel.FILESYSTEM_A msg = "Swapped filesystem channel from B > A" - + elif channel_type == ChannelType.HTTP: if previous_channel == Channel.HTTP_A: new_channel = Channel.HTTP_B @@ -438,7 +400,7 @@ class SoundSystem(): new_channel = Channel.HTTPS_A msg = "Swapped HTTPS Stream channel from B > A" - if msg: self.logger.info(SimpleUtil.pink(msg)) + if msg: self.logger.info(SU.pink(msg)) return (previous_channel, new_channel) @@ -520,34 +482,34 @@ class SoundSystem(): channels = self.mixer_channels() index = channels.index(channel) except ValueError as e: - msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e)) + msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e)) self.logger.error(msg) self.logger.info("Available channels: %s" % str(channels)) return try: if len(channel) < 1: - msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") + msg = SU.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") self.logger.warning(msg) else: message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume))) if not self.disable_logging: if message.find('volume=' + str(volume) + '%'): - self.logger.info(SimpleUtil.pink("Set volume of channel '%s' to %s" % (channel, str(volume)))) + self.logger.info(SU.pink("Set volume of channel '%s' to %s" % (channel, str(volume)))) else: - msg = SimpleUtil.red("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message) + msg = SU.red("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message) self.logger.warning(msg) return message except AttributeError as e: #(LQConnectionError, AttributeError): self.disable_transaction(force=True) - msg = SimpleUtil.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) + msg = SU.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) self.logger.error(msg) # - # Channel Type - Stream + # Channel Type - Stream # @@ -673,7 +635,7 @@ class SoundSystem(): """ if channel not in ChannelType.FILESYSTEM.channels: raise InvalidChannelException - self.logger.info(SimpleUtil.pink("playlist.push('%s', '%s'" % (channel, uri))) + self.logger.info(SU.pink("playlist.push('%s', '%s'" % (channel, uri))) self.enable_transaction() audio_store = self.config.get("audiofolder") @@ -723,7 +685,7 @@ class SoundSystem(): if channel not in ChannelType.FILESYSTEM.channels: raise InvalidChannelException - self.logger.info(SimpleUtil.pink("Clearing filesystem queue '%s'!" % channel)) + self.logger.info(SU.pink("Clearing filesystem queue '%s'!" % channel)) self.enable_transaction() result = self.__send_lqc_command__(self.client, channel, "playlist_clear") @@ -735,7 +697,7 @@ class SoundSystem(): # - # Fading + # Fading # @@ -761,7 +723,7 @@ class SoundSystem(): msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \ (entry.channel, str(step), str(target_volume)) - self.logger.info(SimpleUtil.pink(msg)) + self.logger.info(SU.pink(msg)) # Enable logging, which might have been disabled in a previous fade-out self.disable_logging = True @@ -772,7 +734,7 @@ class SoundSystem(): time.sleep(step) msg = "Finished with fading-in '%s'." % entry.channel - self.logger.info(SimpleUtil.pink(msg)) + self.logger.info(SU.pink(msg)) self.fade_in_active = False if not self.fade_out_active: @@ -803,7 +765,7 @@ class SoundSystem(): step = abs(fade_out_time) / entry.volume msg = "Starting to fading-out '%s'. Step is %ss." % (entry.channel, str(step)) - self.logger.info(SimpleUtil.pink(msg)) + self.logger.info(SU.pink(msg)) # Disable logging... it is going to be enabled again after fadein and -out is finished self.disable_logging = True @@ -814,7 +776,7 @@ class SoundSystem(): time.sleep(step) msg = "Finished with fading-out '%s'" % entry.channel - self.logger.info(SimpleUtil.pink(msg)) + self.logger.info(SU.pink(msg)) # Enable logging again self.fade_out_active = False @@ -834,64 +796,64 @@ class SoundSystem(): # - # ------------------------------------------------------------------------------------------ # - def recorder_stop(self): - self.enable_transaction() + # # ------------------------------------------------------------------------------------------ # + # def recorder_stop(self): + # self.enable_transaction() - for i in range(5): - if self.config.get("rec_" + str(i)) == "y": - self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop") + # for i in range(5): + # if self.config.get("rec_" + str(i)) == "y": + # self.__send_lqc_command__(self.client, "recorder_" + str(i), "stop") - self.disable_transaction() + # self.disable_transaction() - # ------------------------------------------------------------------------------------------ # - def recorder_start(self, num=-1): - if not self.is_liquidsoap_running: - if num==-1: - msg = "Want to start recorder, but LiquidSoap is not running" - else: - msg = "Want to start recorder " + str(num) + ", but LiquidSoap is not running" - self.logger.warning(msg) - return False + # # ------------------------------------------------------------------------------------------ # + # def recorder_start(self, num=-1): + # if not self.is_liquidsoap_running: + # if num==-1: + # msg = "Want to start recorder, but LiquidSoap is not running" + # else: + # msg = "Want to start recorder " + str(num) + ", but LiquidSoap is not running" + # self.logger.warning(msg) + # return False - self.enable_transaction() + # self.enable_transaction() - if num == -1: - self.recorder_start_all() - else: - self.recorder_start_one(num) + # if num == -1: + # self.recorder_start_all() + # else: + # self.recorder_start_one(num) - self.disable_transaction() + # self.disable_transaction() - # ------------------------------------------------------------------------------------------ # - def recorder_start_all(self): - if not self.is_liquidsoap_running: - self.logger.warning("Want to start all recorder, but LiquidSoap is not running") - return False + # # ------------------------------------------------------------------------------------------ # + # def recorder_start_all(self): + # if not self.is_liquidsoap_running: + # self.logger.warning("Want to start all recorder, but LiquidSoap is not running") + # return False - self.enable_transaction() - for i in range(5): - self.recorder_start_one(i) - self.disable_transaction() + # self.enable_transaction() + # for i in range(5): + # self.recorder_start_one(i) + # self.disable_transaction() - # ------------------------------------------------------------------------------------------ # - def recorder_start_one(self, num): - if not self.is_liquidsoap_running: - return False + # # ------------------------------------------------------------------------------------------ # + # def recorder_start_one(self, num): + # if not self.is_liquidsoap_running: + # return False - if self.config.get("rec_" + str(num)) == "y": - returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status") + # if self.config.get("rec_" + str(num)) == "y": + # returnvalue = self.__send_lqc_command__(self.client, "recorder", str(num), "status") - if returnvalue == "off": - self.__send_lqc_command__(self.client, "recorder", str(num), "start") + # if returnvalue == "off": + # self.__send_lqc_command__(self.client, "recorder", str(num), "start") - # ------------------------------------------------------------------------------------------ # - def get_recorder_status(self): - self.enable_transaction(self.client) - recorder_state = self.__send_lqc_command__(self.client, "record", "status") - self.disable_transaction(self.client) + # # ------------------------------------------------------------------------------------------ # + # def get_recorder_status(self): + # self.enable_transaction(self.client) + # recorder_state = self.__send_lqc_command__(self.client, "record", "status") + # self.disable_transaction(self.client) - return recorder_state + # return recorder_state @@ -981,9 +943,8 @@ class SoundSystem(): self.disable_transaction(socket=self.client, force=True) raise e else: - # also store when was last admin mail sent with which content... - # FIXME implement admin mail sending - self.logger.critical("SEND ADMIN MAIL AT THIS POINT") + self.event_dispatcher.on_critical("Criticial Liquidsoap connection issue", \ + "Could not connect to Liquidsoap (Multiple attempts)", e) raise e @@ -1000,7 +961,7 @@ class SoundSystem(): except Exception as e: self.logger.error("Cannot check if Liquidsoap is running. Reason: " + str(e)) self.is_liquidsoap_running = False - + return self.is_liquidsoap_running @@ -1069,10 +1030,11 @@ class SoundSystem(): self.__open_conn(socket) except FileNotFoundError: self.disable_transaction(socket=socket, force=True) - + subject = "CRITICAL Exception when connecting to Liquidsoap" msg = "socket file " + socket.socket_path + " not found. Is liquidsoap running?" - self.logger.critical(TerminalColors.RED.value + msg + TerminalColors.ENDC.value) - self.auramailer.send_admin_mail("CRITICAL Exception when connecting to Liquidsoap", msg) + self.logger.critical(SU.red(msg)) + self.event_dispatcher.on_critical(subject, msg, None) + # ------------------------------------------------------------------------------------------ # def disable_transaction(self, socket=None, force=False): diff --git a/modules/core/events.py b/modules/core/events.py new file mode 100644 index 0000000000000000000000000000000000000000..8fff9894ba0fd7a8e8e38f387c09361fc19c1635 --- /dev/null +++ b/modules/core/events.py @@ -0,0 +1,250 @@ +# +# Aura Engine (https://gitlab.servus.at/aura/engine) +# +# Copyright (C) 2020 - The Aura Engine Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging + +from modules.base.utils import SimpleUtil as SU +from modules.base.exceptions import NoActiveEntryException +# from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, EngineMalfunctionException, LQStreamException, LoadSourceException +from modules.base.mail import AuraMailer +from modules.plugins.monitor import AuraMonitor +from modules.core.state import PlayerStateService + + +from modules.plugins.api import TrackserviceHandler + + +class EventBinding(): + """ + A binding between the event dispatcher and some event handler. + + This allows you to subscribe to events in this way: + + ``` + binding = dispatcher.attach(AuraMonitor) + binding.subscribe("on_boot").subscribe("on_play") + ``` + """ + dispatcher = None + instance = None + + def __init__(self, dispatcher, instance): + self.dispatcher = dispatcher + self.instance = instance + + + def subscribe(self, event_type): + """ + Subscribes the instance to some event identified by the `event_type` string. + """ + self.dispatcher.subscribe(self.instance, event_type) + return self + + + def get_instances(self): + """ + Returns the object within that binding. + """ + return self.instance + + + +class EngineEventDispatcher(): + """ + Performs execution of handlers for engine events. + """ + logger = None + config = None + + subscriber_registry = None + mailer = None + soundsystem = None + player_state = None + scheduler = None + api_handler = None + + + def __init__(self, soundsystem, scheduler): + """ + Initialize EventDispatcher + """ + self.subscriber_registry = dict() + self.logger = logging.getLogger("AuraEngine") + self.config = soundsystem.config + self.mailer = AuraMailer(self.config) + self.soundsystem = soundsystem + self.scheduler = scheduler + self.player_state = PlayerStateService(self.config) + + # self.api_handler = ApiHandler(self.config) + # self.monitoring = Monitoring(self.config, self.soundsystem) + + binding = self.attach(AuraMonitor) + binding.subscribe("on_boot") + + binding = self.attach(TrackserviceHandler) + binding.subscribe("on_play") + + + + def attach(self, clazz): + """ + Creates an intance of the given Class. + """ + instance = clazz(self.config, self.soundsystem) + return EventBinding(self, instance) + + + def subscribe(self, instance, event_type): + """ + Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`. + """ + if not event_type in self.subscriber_registry: + self.subscriber_registry[event_type] = [] + self.subscriber_registry[event_type].append(instance) + + + def call_event(self, event_type, args): + """ + Calls all subscribers for the given event type. + """ + if not event_type in self.subscriber_registry: + return + listeners = self.subscriber_registry[event_type] + if not listeners: + return + for listener in listeners: + method = getattr(listener, event_type) + if method: + if args: + method(args) + else: + method() + + + # + # Events + # + + + def on_initialized(self): + """ + Called when the engine is initialized e.g. connected to Liquidsoap. + """ + self.logger.debug("on_initialized(..)") + self.scheduler.on_initialized() + self.call_event("on_initialized", None) + + + def on_boot(self): + """ + Called when the engine is starting up. This happens after the initialization step. + """ + self.logger.debug("on_boot(..)") + self.call_event("on_boot", None) + + # self.monitoring.on_boot() + + + def on_ready(self): + """ + Called when the engine is booted and ready to play. + """ + self.logger.debug("on_initialized(..)") + self.scheduler.on_ready() + + + def on_play(self, source): + """ + Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) + when some entry is actually playing. + + Args: + source (String): The URI of the media source currently being played + """ + self.logger.debug("on_play(..)") + + self.logger.info(SU.pink("Source '%s' started playing" % source)) + try: + self.player_state.store_trackservice_entry(source) + # self.player_state.get_current_entry(source) + except NoActiveEntryException: + self.on_idle() + + self.call_event("on_initialized", None) + + + def on_stop(self, entry): + """ + The entry on the assigned channel has been stopped playing. + """ + self.logger.debug("on_stop(..)") + self.call_event("on_stop", entry) + + + def on_idle(self): + """ + Callend when no entry is playing + """ + self.logger.debug("on_idle(..)") + self.logger.warn(SU.red("Currently there's nothing playing!")) + self.call_event("on_idle", None) + + + def on_schedule_change(self, schedule): + """ + Called when the playlist or entries of the current schedule have changed. + """ + self.logger.debug("on_schedule_change(..)") + self.call_event("on_schedule_change", schedule) + + + def on_queue(self, entries): + """ + One or more entries have been queued and are currently pre-loaded. + """ + self.logger.debug("on_queue(..)") + self.player_state.add_to_history(entries) + self.call_event("on_queue", entries) + + + def on_sick(self): + """ + Called when the engine is in some unhealthy state. + """ + self.logger.debug("on_sick(..)") + self.call_event("on_sick", None) + + + def on_resurrect(self): + """ + Called when the engine turned healthy again after being sick. + """ + self.logger.debug("on_resurrect(..)") + self.call_event("on_resurrect", None) + + + def on_critical(self, subject, message, data=None): + """ + Callend when some critical event occurs + """ + self.logger.debug("on_critical(..)") + if not data: data = "" + self.mailer.send_admin_mail(subject, message + "\n\n" + str(data)) + self.call_event("on_critical", (subject, message, data)) diff --git a/modules/core/startup.py b/modules/core/startup.py index fba87d47b615d252b824ccb1f40dd9fe92040272..8e5d9ddd84c4f22acf1c606a1cb3e8aa99ee1695 100644 --- a/modules/core/startup.py +++ b/modules/core/startup.py @@ -17,36 +17,32 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import time + import logging -import datetime import threading -import meta -import json from modules.base.exceptions import NoActiveScheduleException -from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil +from modules.base.utils import SimpleUtil as SU + class StartupThread(threading.Thread): """ StartupThread class. - Boots the mixer and starts playing the current schedule. + Boots the engine and starts playing the current schedule. """ logger = None active_entry = None - soundsystem = None - scheduler = None - monitoring = None + engine = None - def __init__(self, soundsystem): + + def __init__(self, engine): """ Initialize the thread. """ threading.Thread.__init__(self) self.logger = logging.getLogger("AuraEngine") - self.soundsystem = soundsystem - self.scheduler = soundsystem.scheduler + self.engine = engine def run(self): @@ -54,15 +50,10 @@ class StartupThread(threading.Thread): Boots the soundsystem. """ try: - self.soundsystem.start() - self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__)) - self.scheduler.on_ready() - + self.engine.start() + except NoActiveScheduleException as e: self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.") except Exception as e: - self.logger.error(SimpleUtil.red("Error while initializing the soundsystem: " + str(e)), e) - - - + self.logger.error(SU.red("Error while initializing the soundsystem: " + str(e)), e) diff --git a/modules/core/state.py b/modules/core/state.py index 4f12aadee78a0e65586633f8ff735e94b2508f13..71169f5632b240dc01747e529e0a68846f8eae23 100644 --- a/modules/core/state.py +++ b/modules/core/state.py @@ -1,7 +1,7 @@ # # Aura Engine (https://gitlab.servus.at/aura/engine) # -# Copyright (C) 2017-2020 - The Aura Engine Team. +# Copyright (C) 2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -22,7 +22,7 @@ from collections import deque from modules.base.exceptions import NoActiveEntryException from modules.base.utils import SimpleUtil, EngineUtil -from modules.database.model import SingleEntry, SingleEntryMetaData, PlaylistEntry, PlaylistEntryMetaData, TrackService + @@ -31,7 +31,7 @@ class PlayerStateService: PlayerStateService keeps a short history of currently playing entries. It stores the recent active entries to a local cache `entry_history` being able to manage concurrently playing entries. - It also is in charge of storing relevant meta information of the currently playing entry to + It also is in charge of storing relevant meta information of the currently playing entry to the TrackService table. """ @@ -73,6 +73,13 @@ class PlayerStateService: return self.entry_history[0] + def entry_for_source(self, source): + """ + Retrieves the `PlaylistEntry` matching the provied source URI. + """ + # TODO Implement + + def store_trackservice_entry(self, filepath): """ Stores the entry identified by the given source in the Track Service. @@ -84,26 +91,26 @@ class PlayerStateService: (NoActiveEntryException): In case currently nothing is playing """ found = False - entries = self.get_recent_entries() + # entries = self.get_recent_entries() - if not entries: - raise NoActiveEntryException + # if not entries: + # raise NoActiveEntryException - for active_entry in entries: - base_dir = self.config.get("audiofolder") - if EngineUtil.uri_to_filepath(base_dir, active_entry.source) == filepath: - trackservice = TrackService(active_entry) - trackservice.store(add=True, commit=True) + # for active_entry in entries: + # base_dir = self.config.get("audiofolder") + # if EngineUtil.uri_to_filepath(base_dir, active_entry.source) == filepath: + # trackservice = TrackService(active_entry) + # trackservice.store(add=True, commit=True) - active_entry.trackservice_id = trackservice.id - active_entry.store(add=False, commit=True) + # active_entry.trackservice_id = trackservice.id + # active_entry.store(add=False, commit=True) - self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice)) - found = True + # self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice)) + # found = True - if not found: - msg = "Found no entry in the recent history which matches the given source '%s'" % (filepath) - self.logger.critical(SimpleUtil.red(msg)) + # if not found: + # msg = "Found no entry in the recent history which matches the given source '%s'" % (filepath) + # self.logger.critical(SimpleUtil.red(msg)) @@ -119,53 +126,3 @@ class PlayerStateService: msg += "]" self.logger.info(msg) - - - # def adapt_trackservice_title(self, source): - # """ - # Updates the track-service entry with the info from a fallback track/playlist. - # """ - # liquidsoap_offset = int(self.config.lqs_delay_offset) - # scheduled_entry = self.get_active_entry(liquidsoap_offset) - - # entry = SingleEntry() - # meta = SingleEntryMetaData() - - # # # Validate artist and title - # # if not title: - # # title = self.config.get("fallback_title_not_available") - - # # Create Entry - # entry.source = source - # entry.duration = self.fallback_manager.get_track_duration(source) - # if not entry.duration: - # self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry))) - - # # Create track service log for local station fallback (type=4) - # trackservice = TrackService(entry, 4) - # trackservice.store(add=True, commit=True) - - # entry.store(add=True, commit=True) - - # # Create Meta - # meta.artist = "----FIXME" - # meta.album = "" - # meta.title = "----TODO" - # meta.single_entry_id = entry.id - # meta.store(add=True, commit=True) - - # # Reference each other - # entry.meta_data_id = meta.id - # entry.trackservice_id = trackservice.id - # entry.store(add=False, commit=True) - - - # msg = "Track Service active track '%s' updated with fallback source '%s - %s'!" % (scheduled_entry, meta.artist, meta.title) - # self.logger.info(msg) - # return msg - - - # - # PRIVATE METHODS - # - diff --git a/modules/database/model.py b/modules/database/model.py index ca2504b3548f611cfcad9dbc1fb689c3cef656b7..ccb20f6a600c4f10970c48a9a2b2e5f565fe8fad 100644 --- a/modules/database/model.py +++ b/modules/database/model.py @@ -25,11 +25,11 @@ import datetime import sqlalchemy as sa from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import orm, func, BigInteger, Boolean, Column, DateTime, Integer, String, ForeignKey, ForeignKeyConstraint +from sqlalchemy import orm, func +from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, ForeignKey, ForeignKeyConstraint from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import false, true from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method -from sqlalchemy.orm import relationship from sqlalchemy import create_engine @@ -169,14 +169,14 @@ class Schedule(DB.Model, AuraDatabaseModel): @staticmethod - def select_show_on_datetime(datetime): - return DB.session.query(Schedule).filter(Schedule.schedule_start == datetime).first() + def select_show_on_datetime(date_time): + return DB.session.query(Schedule).filter(Schedule.schedule_start == date_time).first() @staticmethod def select_programme(date_from=datetime.date.today()): """ - Select all schedules starting from `date_from` or from today if no + Select all schedules starting from `date_from` or from today if no parameter is passed. Args: @@ -281,7 +281,7 @@ class Playlist(DB.Model, AuraDatabaseModel): playlist_id = Column(Integer, autoincrement=False) # , ForeignKey("schedule.playlist_id")) show_name = Column(String(256)) fallback_type = Column(Integer) - entry_count = Column(Integer) + entry_count = Column(Integer) @staticmethod @@ -344,6 +344,17 @@ class Playlist(DB.Model, AuraDatabaseModel): return DB.session.query(Playlist).filter(Playlist.playlist_id == playlist_id).order_by(Playlist.schedule_start).all() + @staticmethod + def is_empty(): + """ + Checks if the given is empty + """ + try: + return not DB.session.query(Playlist).one_or_none() + except sa.orm.exc.MultipleResultsFound: + return False + + @hybrid_property def start_unix(self): """ @@ -543,251 +554,251 @@ class PlaylistEntryMetaData(DB.Model, AuraDatabaseModel): # -class TrackService(DB.Model, AuraDatabaseModel): - """ - TrackService holding track-service items consisting of - """ - __tablename__ = 'trackservice' - - # Primary keys - id = Column(Integer, primary_key=True, autoincrement=True) - - # Foreign keys - track_start = Column(DateTime) - track_end = Column(DateTime) # Currently not used, maybe later for timing checks and multi-entry avoidance. - artificial_schedule_id = Column(Integer, ForeignKey("schedule.id")) - artificial_playlist_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"), nullable=True) - single_entry_id = Column(Integer, ForeignKey("single_entry.id"), nullable=True) - - # Data - schedule = relationship("Schedule", foreign_keys=[artificial_schedule_id], lazy="joined") - playlist_entry = relationship("PlaylistEntry", primaryjoin="and_(TrackService.artificial_playlist_entry_id==PlaylistEntry.artificial_id)", lazy="joined") - single_entry = relationship("SingleEntry", foreign_keys=[single_entry_id], lazy="joined") - - fallback_type = Column(Integer, default=0) - - - def __init__(self, entry, fallback_type=0): - """ - Initializes a trackservice entry based on a playlist entry. - """ - self.track_start = datetime.datetime.now() - # if entry.duration: - # self.track_end = self.track_start + datetime.timedelta(seconds=entry.duration) - self.fallback_type = fallback_type - - if fallback_type < 4: - self.schedule_start = entry.playlist.schedule_start - self.artificial_playlist_entry_id = entry.artificial_id - self.playlist_entry = entry - self.schedule = entry.playlist.schedule - else: - self.single_entry = entry - +# class TrackService(DB.Model, AuraDatabaseModel): +# """ +# TrackService holding track-service items consisting of +# """ +# __tablename__ = 'trackservice' - @hybrid_property - def track(self): - """ - Retrieves the track information as a dictionary. +# # Primary keys +# id = Column(Integer, primary_key=True, autoincrement=True) + +# # Foreign keys +# track_start = Column(DateTime) +# track_end = Column(DateTime) # Currently not used, maybe later for timing checks and multi-entry avoidance. +# artificial_schedule_id = Column(Integer, ForeignKey("schedule.id")) +# artificial_playlist_entry_id = Column(Integer, ForeignKey("playlist_entry.artificial_id"), nullable=True) +# single_entry_id = Column(Integer, ForeignKey("single_entry.id"), nullable=True) + +# # Data +# schedule = relationship("Schedule", foreign_keys=[artificial_schedule_id], lazy="joined") +# playlist_entry = relationship("PlaylistEntry", primaryjoin="and_(TrackService.artificial_playlist_entry_id==PlaylistEntry.artificial_id)", lazy="joined") +# single_entry = relationship("SingleEntry", foreign_keys=[single_entry_id], lazy="joined") + +# fallback_type = Column(Integer, default=0) + + +# def __init__(self, entry, fallback_type=0): +# """ +# Initializes a trackservice entry based on a playlist entry. +# """ +# self.track_start = datetime.datetime.now() +# # if entry.duration: +# # self.track_end = self.track_start + datetime.timedelta(seconds=entry.duration) +# self.fallback_type = fallback_type + +# if fallback_type < 4: +# self.schedule_start = entry.playlist.schedule_start +# self.artificial_playlist_entry_id = entry.artificial_id +# self.playlist_entry = entry +# self.schedule = entry.playlist.schedule +# else: +# self.single_entry = entry + + +# @hybrid_property +# def track(self): +# """ +# Retrieves the track information as a dictionary. - Depending on possible fallback scenarios either `playlist_entry` or `single_entry` is used as a basis: - - - Scenario 1: No fallback, all info is gathered via the playlist entry - - Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry - - Scenario 3: This type of fallback didn't get scheduled; a single entry is played - """ - if self.playlist_entry: - return self.playlist_entry.as_dict() - elif self.single_entry: - return self.single_entry.as_dict() - else: - return None - - - @hybrid_property - def show(self): - """ - Retrieves show information based on the related schedule. If no schedule - is available (e.g. when the engine is in a fallback state), then the default - show properties from `AuraConfig` are returned. - """ - show_info = {} - - if self.schedule: - show_info["name"] = self.schedule.show_name - show_info["type"] = self.schedule.type - show_info["host"] = self.schedule.show_hosts - elif self.fallback_type == 4: - show_info["name"] = config.get("fallback_show_name") - show_info["type"] = config.get("fallback_show_type") - show_info["host"] = config.get("fallback_show_host") +# Depending on possible fallback scenarios either `playlist_entry` or `single_entry` is used as a basis: + +# - Scenario 1: No fallback, all info is gathered via the playlist entry +# - Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry +# - Scenario 3: This type of fallback didn't get scheduled; a single entry is played +# """ +# if self.playlist_entry: +# return self.playlist_entry.as_dict() +# elif self.single_entry: +# return self.single_entry.as_dict() +# else: +# return None + + +# @hybrid_property +# def show(self): +# """ +# Retrieves show information based on the related schedule. If no schedule +# is available (e.g. when the engine is in a fallback state), then the default +# show properties from `AuraConfig` are returned. +# """ +# show_info = {} + +# if self.schedule: +# show_info["name"] = self.schedule.show_name +# show_info["type"] = self.schedule.type +# show_info["host"] = self.schedule.show_hosts +# elif self.fallback_type == 4: +# show_info["name"] = config.get("fallback_show_name") +# show_info["type"] = config.get("fallback_show_type") +# show_info["host"] = config.get("fallback_show_host") - return show_info +# return show_info - @staticmethod - def select_one(id): - """ - Select one specific track-service item by ID. - """ - DB.session.commit() # Required since independend session is used. - track = DB.session.query(TrackService).filter(TrackService.id == id).first() - return track +# @staticmethod +# def select_one(id): +# """ +# Select one specific track-service item by ID. +# """ +# DB.session.commit() # Required since independend session is used. +# track = DB.session.query(TrackService).filter(TrackService.id == id).first() +# return track - @staticmethod - def select_current(): - """ - Selects the currently playing track. - """ - now = datetime.datetime.now() - DB.session.commit() # Required since independend session is used. - track = DB.session.query(TrackService).\ - filter(TrackService.track_start <= str(now)).\ - order_by(TrackService.track_start.desc()).first() - return track +# @staticmethod +# def select_current(): +# """ +# Selects the currently playing track. +# """ +# now = datetime.datetime.now() +# DB.session.commit() # Required since independend session is used. +# track = DB.session.query(TrackService).\ +# filter(TrackService.track_start <= str(now)).\ +# order_by(TrackService.track_start.desc()).first() +# return track - @staticmethod - def select_last_hours(n): - """ - Selects the tracks playing in the past (`n`) hours. - """ - last_hours = datetime.datetime.today() - datetime.timedelta(hours=n) - DB.session.commit() # Required since independend session is used. - tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(last_hours)).all() - for track in tracks: - track = TrackService.select_one(track.id) - return tracks +# @staticmethod +# def select_last_hours(n): +# """ +# Selects the tracks playing in the past (`n`) hours. +# """ +# last_hours = datetime.datetime.today() - datetime.timedelta(hours=n) +# DB.session.commit() # Required since independend session is used. +# tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(last_hours)).all() +# for track in tracks: +# track = TrackService.select_one(track.id) +# return tracks - @staticmethod - def select_by_day(day): - """ - Select the track-service items for a day. - """ - day_plus_one = day + datetime.timedelta(days=1) - DB.session.commit() # Required since independend session is used. - tracks = DB.session.query(TrackService).\ - filter(TrackService.track_start >= str(day), TrackService.track_start < str(day_plus_one)).\ - order_by(TrackService.track_start.desc()).all() +# @staticmethod +# def select_by_day(day): +# """ +# Select the track-service items for a day. +# """ +# day_plus_one = day + datetime.timedelta(days=1) +# DB.session.commit() # Required since independend session is used. +# tracks = DB.session.query(TrackService).\ +# filter(TrackService.track_start >= str(day), TrackService.track_start < str(day_plus_one)).\ +# order_by(TrackService.track_start.desc()).all() - res = [] - for item in tracks: - if item.track: res.append(item) - return res +# res = [] +# for item in tracks: +# if item.track: res.append(item) +# return res - @staticmethod - def select_by_range(from_day, to_day): - """ - Selects the track-service items for a day range. - """ - DB.session.commit() - tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(from_day), - TrackService.track_start < str(to_day)).all() - return tracks +# @staticmethod +# def select_by_range(from_day, to_day): +# """ +# Selects the track-service items for a day range. +# """ +# DB.session.commit() +# tracks = DB.session.query(TrackService).filter(TrackService.track_start >= str(from_day), +# TrackService.track_start < str(to_day)).all() +# return tracks - def __str__(self): - """ - Convert to String. - """ - return "TrackID: #%s [track_start: %s, artificial_playlist_entry_id: %s]" % (str(self.id), str(self.track_start), str(self.artificial_playlist_entry_id)) +# def __str__(self): +# """ +# Convert to String. +# """ +# return "TrackID: #%s [track_start: %s, artificial_playlist_entry_id: %s]" % (str(self.id), str(self.track_start), str(self.artificial_playlist_entry_id)) -class SingleEntry(DB.Model, AuraDatabaseModel): - """ - An entry played in case of e.g. a local fallback or custom programming without a playlist nor schedule. - """ - __tablename__ = 'single_entry' +# class SingleEntry(DB.Model, AuraDatabaseModel): +# """ +# An entry played in case of e.g. a local fallback or custom programming without a playlist nor schedule. +# """ +# __tablename__ = 'single_entry' - # Primary keys - id = Column(Integer, primary_key=True) +# # Primary keys +# id = Column(Integer, primary_key=True) - # Relationships - trackservice_id = Column(Integer) #, ForeignKey("trackservice.id")) - meta_data_id = Column(Integer) #, ForeignKey("trackservice.id")) +# # Relationships +# trackservice_id = Column(Integer) #, ForeignKey("trackservice.id")) +# meta_data_id = Column(Integer) #, ForeignKey("trackservice.id")) - trackservice = relationship("TrackService", uselist=False, back_populates="single_entry") - meta_data = relationship("SingleEntryMetaData", uselist=False, back_populates="entry") +# trackservice = relationship("TrackService", uselist=False, back_populates="single_entry") +# meta_data = relationship("SingleEntryMetaData", uselist=False, back_populates="entry") - # Data - uri = Column(String(1024)) - duration = Column(BigInteger) - source = Column(String(1024)) - entry_start = Column(DateTime) +# # Data +# uri = Column(String(1024)) +# duration = Column(BigInteger) +# source = Column(String(1024)) +# entry_start = Column(DateTime) - queue_state = None # Assigned when entry is about to be queued - channel = None # Assigned when entry is actually played - status = None # Assigned when state changes +# queue_state = None # Assigned when entry is about to be queued +# channel = None # Assigned when entry is actually played +# status = None # Assigned when state changes - @hybrid_property - def entry_end(self): - return self.entry_start + datetime.timedelta(seconds=self.duration) +# @hybrid_property +# def entry_end(self): +# return self.entry_start + datetime.timedelta(seconds=self.duration) - @hybrid_property - def start_unix(self): - return time.mktime(self.entry_start.timetuple()) +# @hybrid_property +# def start_unix(self): +# return time.mktime(self.entry_start.timetuple()) - @hybrid_property - def end_unix(self): - return time.mktime(self.entry_start.timetuple()) + self.duration +# @hybrid_property +# def end_unix(self): +# return time.mktime(self.entry_start.timetuple()) + self.duration - @hybrid_property - def volume(self): - return 100 +# @hybrid_property +# def volume(self): +# return 100 - @hybrid_property - def type(self): - return EngineUtil.get_channel_type(self.uri) +# @hybrid_property +# def type(self): +# return EngineUtil.get_channel_type(self.uri) - def as_dict(self): - """ - Returns the entry as a dictionary for serialization. - """ - if self.meta_data: - return { - "duration": self.duration, - "artist": self.meta_data.artist, - "album": self.meta_data.album, - "title": self.meta_data.title - } - return None +# def as_dict(self): +# """ +# Returns the entry as a dictionary for serialization. +# """ +# if self.meta_data: +# return { +# "duration": self.duration, +# "artist": self.meta_data.artist, +# "album": self.meta_data.album, +# "title": self.meta_data.title +# } +# return None - def __str__(self): - """ - String representation of the object. - """ - time_start = SimpleUtil.fmt_time(self.start_unix) - time_end = SimpleUtil.fmt_time(self.end_unix) - track = self.source[-25:] - return "SingleEntry #%s [%s - %s | %ssec | Source: ...%s]" % (str(self.id), time_start, time_end, self.duration, track) +# def __str__(self): +# """ +# String representation of the object. +# """ +# time_start = SimpleUtil.fmt_time(self.start_unix) +# time_end = SimpleUtil.fmt_time(self.end_unix) +# track = self.source[-25:] +# return "SingleEntry #%s [%s - %s | %ssec | Source: ...%s]" % (str(self.id), time_start, time_end, self.duration, track) -class SingleEntryMetaData(DB.Model, AuraDatabaseModel): - """ - Metadata for a autonomous entry such as the artist and track name. - """ - __tablename__ = "single_entry_metadata" +# class SingleEntryMetaData(DB.Model, AuraDatabaseModel): +# """ +# Metadata for a autonomous entry such as the artist and track name. +# """ +# __tablename__ = "single_entry_metadata" - id = Column(Integer, primary_key=True) - single_entry_id = Column(Integer, ForeignKey("single_entry.id")) +# id = Column(Integer, primary_key=True) +# single_entry_id = Column(Integer, ForeignKey("single_entry.id")) - artist = Column(String(256)) - title = Column(String(256)) - album = Column(String(256)) +# artist = Column(String(256)) +# title = Column(String(256)) +# album = Column(String(256)) - entry = relationship("SingleEntry", uselist=False, back_populates="meta_data") +# entry = relationship("SingleEntry", uselist=False, back_populates="meta_data") - @staticmethod - def select_metadata_for_entry(single_entry_id): - return DB.session.query(SingleEntry).filter(SingleEntryMetaData.id == single_entry_id).first() +# @staticmethod +# def select_metadata_for_entry(single_entry_id): +# return DB.session.query(SingleEntry).filter(SingleEntryMetaData.id == single_entry_id).first() diff --git a/modules/liquidsoap/__init__.py b/modules/liquidsoap/__init__.py index 13fef6e4de1114c06ab259327621d0bd9549ff2a..8b137891791fe96927ad78e64b0aad7bded08bdc 100644 --- a/modules/liquidsoap/__init__.py +++ b/modules/liquidsoap/__init__.py @@ -1 +1 @@ -__author__ = 'gg' + diff --git a/modules/liquidsoap/helpers/__init__.py b/modules/liquidsoap/helpers/__init__.py index 56b56083d440d50ab18405800b939f6dbc319723..9df7513dceb652132e543a42460edae0571c19ec 100644 --- a/modules/liquidsoap/helpers/__init__.py +++ b/modules/liquidsoap/helpers/__init__.py @@ -1,4 +1,3 @@ -__author__ = 'michel' import os import sys diff --git a/modules/plugins/api.py b/modules/plugins/api.py new file mode 100644 index 0000000000000000000000000000000000000000..b5e28352692bab702382313a6a7542fbdc059291 --- /dev/null +++ b/modules/plugins/api.py @@ -0,0 +1,58 @@ +# +# Aura Engine (https://gitlab.servus.at/aura/engine) +# +# Copyright (C) 2020 - The Aura Engine Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + + +import logging + +class TrackserviceHandler(): + """ + ApiHandler is in charge of external API calls + """ + logger = None + config = None + soundsystem = None + + + def __init__(self, config, soundsystem): + """ + Initialize. + """ + self.logger = logging.getLogger("AuraEngine") + self.config = config + self.soundsystem = soundsystem + + + def on_play(self, entry): + """ + Some track started playing. + """ + self.logger.info("::: CALLED TRACKSERVICE HANDLER :::") + self.store_trackservice(entry) + self.store_schedule(entry) + + + def store_trackservice(self, entry): + """ + Posts the given `PlaylistEntry` to the Engine API Trackservice. + """ + + def store_schedule(self, entry): + """ + Posts the given `PlaylistEntry` to the Engine API Clock Data. + """ diff --git a/modules/core/monitor.py b/modules/plugins/monitor.py similarity index 88% rename from modules/core/monitor.py rename to modules/plugins/monitor.py index 2f6df26e548127cfef11e75a9fe1521e3e0c209a..1b061742099bfc7b361944d52e3d638743c2dcc9 100644 --- a/modules/core/monitor.py +++ b/modules/plugins/monitor.py @@ -17,33 +17,38 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import meta - -import sys +import os import urllib import logging import json -import os.path import threading import platform -from os import path from enum import Enum -from time import time, ctime, sleep -from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST - +from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST +import meta from modules.communication.redis.adapter import ClientRedisAdapter -from modules.base.utils import SimpleUtil +from modules.base.utils import SimpleUtil as SU +from modules.base.mail import AuraMailer + +# Exceptions + +class EngineMalfunctionException(Exception): + pass + + +# Status Codes + class MonitorResponseCode(Enum): OK = "OK" INVALID_STATE = "INVALID-STATE" -class Monitoring: +class AuraMonitor: """ Engine Monitoring is in charge of: @@ -66,14 +71,14 @@ class Monitoring: heartbeat_running = None - def __init__(self, config, soundsystem, mailer): + def __init__(self, config, soundsystem): """ Initialize Monitoring """ self.logger = logging.getLogger("AuraEngine") self.config = config self.soundsystem = soundsystem - self.mailer = mailer + self.mailer = AuraMailer(self.config) self.status = dict() self.status["engine"] = dict() self.status["soundsystem"] = dict() @@ -84,6 +89,9 @@ class Monitoring: self.status["api"]["engine"] = dict() self.already_invalid = False + # Register as an engine plugin + self.soundsystem.plugins["monitor"] = self + # Heartbeat settings self.heartbeat_running = False self.heartbeat_server = config.get("heartbeat_server") @@ -92,7 +100,25 @@ class Monitoring: self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM) self.engine_id = self.get_engine_id() - + + # + # EVENTS + # + + def on_boot(self): + """ + Called when the engine is booting. + """ + # Start Monitoring + is_valid = self.has_valid_status(False) + status = self.get_status() + self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4)) + if not is_valid: + self.logger.info("Engine Status: " + SU.red(status["engine"]["status"])) + raise EngineMalfunctionException + else: + self.logger.info("Engine Status: " + SU.green("[OK]")) + # # PUBLIC METHODS @@ -142,7 +168,6 @@ class Monitoring: return is_valid - # # PRIVATE METHODS # @@ -207,7 +232,7 @@ class Monitoring: if self.already_invalid: self.already_invalid = False status = json.dumps(self.get_status()) - self.logger.info(SimpleUtil.green("OK - Engine turned back into some healthy state!")+"\n"+str(status)) + self.logger.info(SU.green("OK - Engine turned back into some healthy state!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "OK - Engine turned back into some HEALTHY STATE!", \ "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status)) @@ -216,7 +241,7 @@ class Monitoring: if not self.already_invalid: self.already_invalid = True status = json.dumps(self.get_status()) - self.logger.critical(SimpleUtil.red("Engine turned into some INVALID STATE!")+"\n"+str(status)) + self.logger.critical(SU.red("Engine turned into some INVALID STATE!")+"\n"+str(status)) self.mailer.send_admin_mail( \ "ERROR - Engine turned into some INVALID STATE!", \ "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status)) @@ -275,7 +300,7 @@ class Monitoring: Checks if a given directory is existing and holds content """ status = dict() - status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path) + status["exists"] = os.path.exists(dir_path) and os.path.isdir(dir_path) status["has_content"] = False if status["exists"]: @@ -329,5 +354,5 @@ class Monitoring: s.connect(('<broadcast>', 0)) return s.getsockname()[0] except: - self.logger.critical(SimpleUtil.red("Error while accessing network via <broadcast>!")) + self.logger.critical(SU.red("Error while accessing network via <broadcast>!")) return "<UNKNOWN NETWORK>" \ No newline at end of file diff --git a/modules/scheduling/calendar.py b/modules/scheduling/calendar.py index 454ece20e84365a01c38c1b488b95a0b8e242ac2..d4a5376cf5e88693aad63aaa03fcc90a9fc6015c 100644 --- a/modules/scheduling/calendar.py +++ b/modules/scheduling/calendar.py @@ -19,21 +19,16 @@ -import os -import sys + import threading -import json import queue -import traceback -import urllib import logging -from datetime import datetime, timedelta +from datetime import datetime from modules.base.enum import PlaylistType from modules.base.utils import SimpleUtil from modules.database.model import Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData -from modules.communication.redis.messenger import RedisMessenger from modules.scheduling.calender_fetcher import CalendarFetcher diff --git a/modules/scheduling/fallback_manager.py b/modules/scheduling/fallback_manager.py index 00ab5f05fc2f086ff891ae96fe0584f6f7456c18..cf409a5b261219a67ccaaf910ded59891500fc62 100644 --- a/modules/scheduling/fallback_manager.py +++ b/modules/scheduling/fallback_manager.py @@ -29,7 +29,7 @@ import librosa from accessify import private, protected from modules.base.enum import PlaylistType from modules.base.utils import SimpleUtil, EngineUtil -from modules.communication.mail import AuraMailer +from modules.base.mail import AuraMailer from modules.base.enum import ChannelType diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 9294ff03507ae15a56a828e15a56f5f426f0cbeb..6aeedb3016ac77d27ed49fb7bcd986f96f77db7a 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -30,8 +30,7 @@ import threading from operator import attrgetter from datetime import datetime, timedelta -from modules.database.model import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData, SingleEntry, SingleEntryMetaData, TrackService - +from modules.database.model import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData from modules.base.exceptions import NoActiveScheduleException, NoActiveEntryException, LoadSourceException from modules.base.enum import Channel, ChannelType, TimerType, TransitionType, EntryQueueState, EntryPlayState from modules.base.utils import SimpleUtil, TerminalColors, EngineUtil @@ -54,25 +53,23 @@ def alchemyencoder(obj): else: return str(obj) - - class AuraScheduler(threading.Thread): """ Aura Scheduler Class - Retrieves data from Steering and Tank - - Stores and fires events for LiquidSoap + - Stores and fires events for the sound-system Attributes: config (AuraConfig): Holds the Engine Configuration logger: The logger - exit_event(threading.Event): Used to exit the thread if requested - soundsystem: Manages the audio streams via LiquidSoap + exit_event(threading.Event): Used to exit the thread if requested + soundsystem: Virtual mixer last_successful_fetch (datetime): Stores the last time a fetch from Steering/Tank was successful programme: The current radio programme to be played as defined in the local engine database active_entry(Show, Track): This is a Tuple consisting of the currently played `Show` and `Track` - message_timer(List<threading.Timer>): The timer queue of Liquidsoap commands for playlists/entries to be played + message_timer(List<threading.Timer>): The timer queue of sound-system commands for playlists/entries to be played """ redismessenger = None job_result = {} @@ -86,9 +83,11 @@ class AuraScheduler(threading.Thread): message_timer = [] fallback_manager = None client = None + is_soundsytem_ready = None is_initialized = None + def __init__(self, config, soundsystem, func_on_init): """ Constructor @@ -101,12 +100,12 @@ class AuraScheduler(threading.Thread): self.config = config self.logger = logging.getLogger("AuraEngine") - # self.init_error_messages() self.init_database() self.fallback_manager = FallbackManager(config, self.logger, self) self.redismessenger = RedisMessenger(config) self.soundsystem = soundsystem self.soundsystem.scheduler = self + self.is_soundsytem_init = False # Scheduler Initialization self.is_initialized = False @@ -154,8 +153,9 @@ class AuraScheduler(threading.Thread): self.func_on_initialized() # The soundsystem is ready - if self.soundsystem.is_ready(): + if self.is_soundsytem_init: self.queue_programme() + except Exception as e: self.logger.critical(SimpleUtil.red("Unhandled error while fetching & scheduling new programme! (%s)" % str(e)), e) @@ -169,6 +169,12 @@ class AuraScheduler(threading.Thread): # PUBLIC METHODS # + def on_initialized(self): + """ + Called when the sound-sytem is connected and ready to play. + """ + self.is_soundsytem_init = True + def on_ready(self): """ @@ -229,17 +235,17 @@ class AuraScheduler(threading.Thread): self.soundsystem.enable_transaction() response = self.soundsystem.playlist_seek(active_entry.channel, seconds_to_seek) self.soundsystem.disable_transaction() - self.logger.info("LiquidSoap seek response: " + response) + self.logger.info("Sound-system seek response: " + response) elif active_entry.get_type() == ChannelType.HTTP \ or active_entry.get_type() == ChannelType.HTTPS \ or active_entry.get_type() == ChannelType.LIVE: - # Pre-roll and play active entry - self.soundsystem.preroll(active_entry) - self.soundsystem.play(active_entry, TransitionType.FADE) + # Pre-roll and play active entry + self.soundsystem.preroll(active_entry) + self.soundsystem.play(active_entry, TransitionType.FADE) - # self.queue_end_of_schedule(active_schedule, True) + # self.queue_end_of_schedule(active_schedule, True) else: self.logger.critical("Unknown Entry Type: %s" % active_entry) @@ -272,7 +278,7 @@ class AuraScheduler(threading.Thread): # Check for scheduled playlist current_playlist = self.fallback_manager.resolve_playlist(current_schedule) if not current_playlist: - msg = "There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule." + msg = "There's no active playlist for a current schedule. Most likely the playlist was never available or finished before the end of the schedule." self.logger.warning(SimpleUtil.red(msg)) return None @@ -525,11 +531,11 @@ class AuraScheduler(threading.Thread): if entry.queue_state == EntryQueueState.CUT: s = "\n│ %s" % SimpleUtil.red("↓↓↓ This entry is going to be cut.") - if strike: + if strike: entry_str = SimpleUtil.strike(entry) else: entry_str = str(entry) - + entry_line = "%sEntry %s | %s" % (prefix, str(entry.entry_num+1), entry_str) return s + entry_line @@ -542,7 +548,7 @@ class AuraScheduler(threading.Thread): def get_virtual_now(self): """ - Liquidsoap is slow in executing commands, therefore it's needed to schedule + Liquidsoap is slow in executing commands, therefore it's needed to schedule actions by (n) seconds in advance, as defined in the configuration file by the property `lqs_delay_offset`. @@ -574,7 +580,7 @@ class AuraScheduler(threading.Thread): def filter_scheduling_window(self, schedules): """ - Ignore schedules which are beyond the scheduling window. The end of the scheduling window + Ignore schedules which are beyond the scheduling window. The end of the scheduling window is defined by the config option `scheduling_window_end`. This value defines the seconds minus the actual start time of the schedule. """ @@ -609,7 +615,7 @@ class AuraScheduler(threading.Thread): def queue_programme(self): """ Queues the current programme (playlists as per schedule) by creating - timed commands to Liquidsoap to enable the individual tracks of playlists. + timed commands to the sound-system to enable the individual tracks of playlists. """ # Get a clean set of the schedules within the scheduling window @@ -663,7 +669,7 @@ class AuraScheduler(threading.Thread): def queue_playlist_entries(self, schedule, entries, fade_in, fade_out): """ - Creates Liquidsoap player commands for all playlist items to be executed at the scheduled time. + Creates sound-system player commands for all playlist items to be executed at the scheduled time. Args: schedule (Schedule): The schedule this entries belong to @@ -682,7 +688,7 @@ class AuraScheduler(threading.Thread): # Mark entries which start after the end of their schedule or are cut clean_entries = self.preprocess_entries(entries, True) - # Group all filesystem entries, allowing them to be queued at once + # Group/aggregate all filesystem entries, allowing them to be queued at once for entry in clean_entries: if previous_entry == None or \ (previous_entry != None and \ @@ -831,7 +837,7 @@ class AuraScheduler(threading.Thread): # Stop function to be called when schedule ends def do_stop(schedule): - last_entry = schedule.queued_entries[-1] + last_entry = schedule.queued_entries[-1] # FIXME sometimes an issue with startup queues self.logger.info(SimpleUtil.cyan("=== stop('%s') ===" % str(last_entry.playlist.schedule))) transition_type = TransitionType.INSTANT if fade_out: @@ -1025,7 +1031,7 @@ class AuraScheduler(threading.Thread): # Check if tables exists, if not create them try: - Playlist.select_all() + Playlist.is_empty() except sqlalchemy.exc.ProgrammingError as e: errcode = e.orig.args[0] diff --git a/requirements.txt b/requirements.txt index 916b935f2b729cc4e7aa6d755ee61cb9ecb23cf2..7db9e5992267a1bd838fb94e12d7b16a51206ce6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ marshmallow-sqlalchemy==0.22.2 apispec==3.3.0 apispec-webframeworks==0.5.2 mysqlclient==1.3.12 -redis==3.4.1 +redis==3.5.3 mutagen==1.44.0 validators==0.12.1 simplejson==3.17.0