Commit 65cafc22 authored by David Trattnig's avatar David Trattnig
Browse files

New EventDispatcher and major refactoring.

parent 1b3938d1
......@@ -21,10 +21,8 @@
import os
import sys
import meta
import signal
import logging
import unittest
import subprocess
from flask import Flask
......@@ -61,6 +59,7 @@ class AuraEngine:
server = None
messenger = None
controller = None
soundsystem = None
scheduler = None
lqs = None
lqs_startup = None
......@@ -117,6 +116,11 @@ class AuraEngine:
else:
self.logger.info(SimpleUtil.yellow("Please note, Liquidsoap needs to be started manually."))
# from modules.communication.redis.adapter import ServerRedisAdapter
# self.messenger = ServerRedisAdapter(self.config)
# self.messenger.scheduler = self.scheduler
# self.messenger.soundsystem = self.soundsystem
# self.messenger.start()
def start_lqs(self, debug_output, verbose_output):
......
......@@ -21,13 +21,12 @@
import time
import sys
import redis
from pathlib import Path
from argparse import ArgumentParser
# own libs
from modules.cli_tool.padavan import Padavan
from modules.cli.padavan import Padavan
from modules.base.exceptions import PlaylistException
from modules.base.config import AuraConfig
......
......@@ -42,14 +42,6 @@ class NoActiveEntryException(Exception):
pass
# Monitoring Exceptions
class EngineMalfunctionException(Exception):
pass
class MailingException(Exception):
pass
# Liquidsoap Execeptions
......
......@@ -19,7 +19,15 @@
import smtplib
from email.message import EmailMessage
from modules.base.exceptions import MailingException
class MailingException(Exception):
"""
Thrown when some mail cannot be sent.
"""
pass
class AuraMailer():
......@@ -29,7 +37,6 @@ class AuraMailer():
config = None
def __init__(self, config):
"""
Constructor to initialize service with Aura `config`.
......@@ -41,12 +48,10 @@ class AuraMailer():
self.admin_mails = config.get("admin_mail")
#
# PUBLIC METHODS
#
def send_admin_mail(self, subject, body):
"""
Sends an email to the administrator as defined in the configuration.
......@@ -61,7 +66,6 @@ class AuraMailer():
self.__send(mail_to, subject, body)
#
# PRIVATE METHODS
#
......
......@@ -202,6 +202,24 @@ class LiquidSoapPlayerClient(LiquidSoapClient):
return self.message
#
# General Entries
#
def entry_status(self, rid):
"""
Retrieves the status of a given entry.
Args:
rid (String): Resource ID (RID)
Returns:
Liquidsoap server response
"""
self.command("request", "status", str(rid))
return self.message
#
# Other
#
......
......@@ -20,12 +20,12 @@
import sys
import time
import json
import redis
import logging
import threading
from datetime import datetime
from threading import Event
import threading
import redis
from modules.communication.redis.messenger import RedisMessenger
from modules.communication.redis.statestore import RedisStateStore
......@@ -154,7 +154,9 @@ class ServerRedisAdapter(threading.Thread, RedisMessenger):
elif item["data"] == "get_status":
def get_status_string():
status = self.soundsystem.monitoring.get_status()
status = "No monitoring plugin available!"
if "monitor" in self.soundsystem.plugins:
status = self.soundsystem.plugins["monitor"].get_status()
return json.dumps(status)
self.execute(RedisChannel.GS_REPLY.value, get_status_string)
......
......@@ -17,15 +17,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import logging
import datetime
from modules.communication.redis.statestore import RedisStateStore
from modules.communication.mail import AuraMailer
from modules.base.exceptions import PlaylistException
from modules.base.enum import RedisChannel
from modules.base.logger import AuraLogger
"""
Send and receive redis messages
......
......@@ -17,13 +17,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import redis
import time
import datetime
import json
import re
import uuid
import redis
class RedisStateStore(object):
......
This diff is collapsed.
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from modules.base.utils import SimpleUtil as SU
from modules.base.exceptions import NoActiveEntryException
# from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, EngineMalfunctionException, LQStreamException, LoadSourceException
from modules.base.mail import AuraMailer
from modules.plugins.monitor import AuraMonitor
from modules.core.state import PlayerStateService
from modules.plugins.api import TrackserviceHandler
class EventBinding():
"""
A binding between the event dispatcher and some event handler.
This allows you to subscribe to events in this way:
```
binding = dispatcher.attach(AuraMonitor)
binding.subscribe("on_boot").subscribe("on_play")
```
"""
dispatcher = None
instance = None
def __init__(self, dispatcher, instance):
self.dispatcher = dispatcher
self.instance = instance
def subscribe(self, event_type):
"""
Subscribes the instance to some event identified by the `event_type` string.
"""
self.dispatcher.subscribe(self.instance, event_type)
return self
def get_instances(self):
"""
Returns the object within that binding.
"""
return self.instance
class EngineEventDispatcher():
"""
Performs execution of handlers for engine events.
"""
logger = None
config = None
subscriber_registry = None
mailer = None
soundsystem = None
player_state = None
scheduler = None
api_handler = None
def __init__(self, soundsystem, scheduler):
"""
Initialize EventDispatcher
"""
self.subscriber_registry = dict()
self.logger = logging.getLogger("AuraEngine")
self.config = soundsystem.config
self.mailer = AuraMailer(self.config)
self.soundsystem = soundsystem
self.scheduler = scheduler
self.player_state = PlayerStateService(self.config)
# self.api_handler = ApiHandler(self.config)
# self.monitoring = Monitoring(self.config, self.soundsystem)
binding = self.attach(AuraMonitor)
binding.subscribe("on_boot")
binding = self.attach(TrackserviceHandler)
binding.subscribe("on_play")
def attach(self, clazz):
"""
Creates an intance of the given Class.
"""
instance = clazz(self.config, self.soundsystem)
return EventBinding(self, instance)
def subscribe(self, instance, event_type):
"""
Subscribes to some event type. Preferably use it via `EventBinding.subscribe(..)`.
"""
if not event_type in self.subscriber_registry:
self.subscriber_registry[event_type] = []
self.subscriber_registry[event_type].append(instance)
def call_event(self, event_type, args):
"""
Calls all subscribers for the given event type.
"""
if not event_type in self.subscriber_registry:
return
listeners = self.subscriber_registry[event_type]
if not listeners:
return
for listener in listeners:
method = getattr(listener, event_type)
if method:
if args:
method(args)
else:
method()
#
# Events
#
def on_initialized(self):
"""
Called when the engine is initialized e.g. connected to Liquidsoap.
"""
self.logger.debug("on_initialized(..)")
self.scheduler.on_initialized()
self.call_event("on_initialized", None)
def on_boot(self):
"""
Called when the engine is starting up. This happens after the initialization step.
"""
self.logger.debug("on_boot(..)")
self.call_event("on_boot", None)
# self.monitoring.on_boot()
def on_ready(self):
"""
Called when the engine is booted and ready to play.
"""
self.logger.debug("on_initialized(..)")
self.scheduler.on_ready()
def on_play(self, source):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
Args:
source (String): The URI of the media source currently being played
"""
self.logger.debug("on_play(..)")
self.logger.info(SU.pink("Source '%s' started playing" % source))
try:
self.player_state.store_trackservice_entry(source)
# self.player_state.get_current_entry(source)
except NoActiveEntryException:
self.on_idle()
self.call_event("on_initialized", None)
def on_stop(self, entry):
"""
The entry on the assigned channel has been stopped playing.
"""
self.logger.debug("on_stop(..)")
self.call_event("on_stop", entry)
def on_idle(self):
"""
Callend when no entry is playing
"""
self.logger.debug("on_idle(..)")
self.logger.warn(SU.red("Currently there's nothing playing!"))
self.call_event("on_idle", None)
def on_schedule_change(self, schedule):
"""
Called when the playlist or entries of the current schedule have changed.
"""
self.logger.debug("on_schedule_change(..)")
self.call_event("on_schedule_change", schedule)
def on_queue(self, entries):
"""
One or more entries have been queued and are currently pre-loaded.
"""
self.logger.debug("on_queue(..)")
self.player_state.add_to_history(entries)
self.call_event("on_queue", entries)
def on_sick(self):
"""
Called when the engine is in some unhealthy state.
"""
self.logger.debug("on_sick(..)")
self.call_event("on_sick", None)
def on_resurrect(self):
"""
Called when the engine turned healthy again after being sick.
"""
self.logger.debug("on_resurrect(..)")
self.call_event("on_resurrect", None)
def on_critical(self, subject, message, data=None):
"""
Callend when some critical event occurs
"""
self.logger.debug("on_critical(..)")
if not data: data = ""
self.mailer.send_admin_mail(subject, message + "\n\n" + str(data))
self.call_event("on_critical", (subject, message, data))
......@@ -17,36 +17,32 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import logging
import datetime
import threading
import meta
import json
from modules.base.exceptions import NoActiveScheduleException
from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil
from modules.base.utils import SimpleUtil as SU
class StartupThread(threading.Thread):
"""
StartupThread class.
Boots the mixer and starts playing the current schedule.
Boots the engine and starts playing the current schedule.
"""
logger = None
active_entry = None
soundsystem = None
scheduler = None
monitoring = None
engine = None
def __init__(self, soundsystem):
def __init__(self, engine):
"""
Initialize the thread.
"""
threading.Thread.__init__(self)
self.logger = logging.getLogger("AuraEngine")
self.soundsystem = soundsystem
self.scheduler = soundsystem.scheduler
self.engine = engine
def run(self):
......@@ -54,15 +50,10 @@ class StartupThread(threading.Thread):
Boots the soundsystem.
"""
try:
self.soundsystem.start()
self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__))
self.scheduler.on_ready()
self.engine.start()
except NoActiveScheduleException as e:
self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.")
except Exception as e:
self.logger.error(SimpleUtil.red("Error while initializing the soundsystem: " + str(e)), e)
self.logger.error(SU.red("Error while initializing the soundsystem: " + str(e)), e)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
......@@ -22,7 +22,7 @@ from collections import deque
from modules.base.exceptions import NoActiveEntryException
from modules.base.utils import SimpleUtil, EngineUtil
from modules.database.model import SingleEntry, SingleEntryMetaData, PlaylistEntry, PlaylistEntryMetaData, TrackService
......@@ -31,7 +31,7 @@ class PlayerStateService:
PlayerStateService keeps a short history of currently playing entries. It stores the recent
active entries to a local cache `entry_history` being able to manage concurrently playing entries.
It also is in charge of storing relevant meta information of the currently playing entry to
It also is in charge of storing relevant meta information of the currently playing entry to
the TrackService table.
"""
......@@ -73,6 +73,13 @@ class PlayerStateService:
return self.entry_history[0]
def entry_for_source(self, source):
"""
Retrieves the `PlaylistEntry` matching the provied source URI.
"""
# TODO Implement
def store_trackservice_entry(self, filepath):
"""
Stores the entry identified by the given source in the Track Service.
......@@ -84,26 +91,26 @@ class PlayerStateService:
(NoActiveEntryException): In case currently nothing is playing
"""
found = False
entries = self.get_recent_entries()
# entries = self.get_recent_entries()
if not entries:
raise NoActiveEntryException
# if not entries:
# raise NoActiveEntryException
for active_entry in entries:
base_dir = self.config.get("audiofolder")
if EngineUtil.uri_to_filepath(base_dir, active_entry.source) == filepath:
trackservice = TrackService(active_entry)
trackservice.store(add=True, commit=True)
# for active_entry in entries:
# base_dir = self.config.get("audiofolder")
# if EngineUtil.uri_to_filepath(base_dir, active_entry.source) == filepath:
# trackservice = TrackService(active_entry)
# trackservice.store(add=True, commit=True)
active_entry.trackservice_id = trackservice.id
active_entry.store(add=False, commit=True)
# active_entry.trackservice_id = trackservice.id
# active_entry.store(add=False, commit=True)
self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice))
found = True
# self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice))
# found = True
if not found:
msg = "Found no entry in the recent history which matches the given source '%s'" % (filepath)
self.logger.critical(SimpleUtil.red(msg))
# if not found:
# msg = "Found no entry in the recent history which matches the given source '%s'" % (filepath)
# self.logger.critical(SimpleUtil.red(msg))
......@@ -119,53 +126,3 @@ class PlayerStateService:
msg += "]"
self.logger.info(msg)
# def adapt_trackservice_title(self, source):
# """
# Updates the track-service entry with the info from a fallback track/playlist.
# """
# liquidsoap_offset = int(self.config.lqs_delay_offset)
# scheduled_entry = self.get_active_entry(liquidsoap_offset)
# entry = SingleEntry()
# meta = SingleEntryMetaData()
# # # Validate artist and title
# # if not title:
# # title = self.config.get("fallback_title_not_available")
# # Create Entry
# entry.source = source
# entry.duration = self.fallback_manager.get_track_duration(source)
# if not entry.duration:
# self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry)))
# # Create track service log for local station fallback (type=4)
# trackservice = TrackService(entry, 4)
# trackservice.store(add=True, commit=True)
# entry.store(add=True, commit=True)