diff --git a/modules/core/events.py b/modules/core/events.py index 356b6e50c6d8a5ca954412c2f4dc72522141ab4a..e82be4ca3246c65ecd09d9a0e3de8a95046b082d 100644 --- a/modules/core/events.py +++ b/modules/core/events.py @@ -22,11 +22,10 @@ import datetime from threading import Thread +from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU -from modules.base.exceptions import NoActiveEntryException from modules.base.mail import AuraMailer from modules.plugins.monitor import AuraMonitor -from modules.core.state import PlayerStateService from modules.plugins.trackservice import TrackServiceHandler @@ -76,23 +75,21 @@ class EngineEventDispatcher(): subscriber_registry = None mailer = None - soundsystem = None - player_state = None + engine = None scheduler = None monitor = None - def __init__(self, soundsystem, scheduler): + def __init__(self, engine, scheduler): """ Initialize EventDispatcher """ self.subscriber_registry = dict() self.logger = logging.getLogger("AuraEngine") - self.config = soundsystem.config + self.config = AuraConfig.config() self.mailer = AuraMailer(self.config) - self.soundsystem = soundsystem + self.engine = engine self.scheduler = scheduler - self.player_state = PlayerStateService(self.config) binding = self.attach(AuraMonitor) binding.subscribe("on_boot") @@ -100,14 +97,17 @@ class EngineEventDispatcher(): binding.subscribe("on_resurrect") binding = self.attach(TrackServiceHandler) + binding.subscribe("on_timeslot") binding.subscribe("on_play") + binding.subscribe("on_metadata") + binding.subscribe("on_queue") def attach(self, clazz): """ Creates an intance of the given Class. """ - instance = clazz(self.config, self.soundsystem) + instance = clazz(self.engine) return EventBinding(self, instance) @@ -168,36 +168,54 @@ class EngineEventDispatcher(): self.scheduler.on_ready() - def on_play(self, source): + def on_timeslot(self, timeslot): """ - Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) - when some entry is actually playing. Note that this event resolves the source URI - and passes an `PlaylistEntry` to event handlers. + Event Handler which is called by the scheduler when the current timeslot is refreshed. Args: - source (String): The `Entry` object *or* the URI of the media source currently playing + source (String): The `PlaylistEntry` object """ - def func(self, source): - self.logger.debug("on_play(..)") - entry = None + def func(self, timeslot): + self.logger.debug("on_timeslot(..)") + self.call_event("on_timeslot", timeslot) + + thread = Thread(target = func, args = (self, timeslot)) + thread.start() + - if isinstance(source, str): - try: - self.logger.info(SU.pink("Source '%s' started playing. Resolving ..." % source)) - entry = self.player_state.resolve_entry(source) - except NoActiveEntryException: - self.logger.error("Cannot resolve '%s'" % source) - else: - entry = source + def on_play(self, entry): + """ + Event Handler which is called by the engine when some entry is actually playing. + Args: + source (String): The `PlaylistEntry` object + """ + def func(self, entry): + self.logger.debug("on_play(..)") # Assign timestamp for play time entry.entry_start_actual = datetime.datetime.now() self.call_event("on_play", entry) - thread = Thread(target = func, args = (self, source)) + thread = Thread(target = func, args = (self, entry)) thread.start() + def on_metadata(self, data): + """ + Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap) + when some entry is actually playing. + + Args: + data (dict): A collection of metadata related to the current track + """ + def func(self, data): + self.logger.debug("on_metadata(..)") + self.call_event("on_metadata", data) + + thread = Thread(target = func, args = (self, data)) + thread.start() + + def on_stop(self, entry): """ The entry on the assigned channel has been stopped playing. @@ -257,7 +275,6 @@ class EngineEventDispatcher(): """ def func(self, entries): self.logger.debug("on_queue(..)") - self.player_state.add_to_history(entries) self.call_event("on_queue", entries) thread = Thread(target = func, args = (self, entries)) diff --git a/modules/plugins/trackservice.py b/modules/plugins/trackservice.py index 9c615a44cc6df471dc9260e920be1a269acadb12..89581026b4f72610a358714c0af3bf4accb887f2 100644 --- a/modules/plugins/trackservice.py +++ b/modules/plugins/trackservice.py @@ -21,8 +21,15 @@ import json import logging import requests +from collections import deque +from datetime import datetime, timedelta + +from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU +from modules.core.resources import ResourceClass from modules.core.resources import ResourceUtil +from modules.scheduling.fallback_manager import FallbackType + class TrackServiceHandler(): @@ -31,33 +38,50 @@ class TrackServiceHandler(): """ logger = None config = None - soundsystem = None + engine = None + playlog = None - def __init__(self, config, soundsystem): + def __init__(self, engine): """ Initialize. """ self.logger = logging.getLogger("AuraEngine") - self.config = config - self.soundsystem = soundsystem + self.config = AuraConfig.config() + self.engine = engine + self.playlog = Playlog(engine) - def on_play(self, entry): + + def on_timeslot(self, timeslot=None): """ - Some track started playing. + Some new timeslot has just started. """ - self.store_trackservice(entry) - self.store_clock_info(entry) + if timeslot: + self.logger.info(f"Active timeslot used for trackservice '{timeslot}'") + self.playlog.set_timeslot(timeslot) - def store_trackservice(self, entry): + def on_queue(self, entries): """ - Posts the given `PlaylistEntry` to the Engine API Playlog. """ - data = dict() + for entry in entries: + self.playlog.add(entry) + + + def on_play(self, entry): + """ + Some `PlaylistEntry` started playing. This is likely only a LIVE or STREAM entry. + """ + content_class = ResourceUtil.get_content_class(entry.get_content_type()) + if content_class == ResourceClass.FILE: + # Files are handled by "on_metadata" called via Liquidsoap + return + diff = (entry.entry_start_actual - entry.entry_start).total_seconds() self.logger.info("There's a difference of %s seconds between planned and actual start of the entry" % diff) + + data = {} data["track_start"] = entry.entry_start_actual if entry.meta_data: data["track_artist"] = entry.meta_data.artist @@ -65,31 +89,76 @@ class TrackServiceHandler(): data["track_title"] = entry.meta_data.title data["track_duration"] = entry.duration data["track_num"] = entry.entry_num - content_class = ResourceUtil.get_content_class(entry.get_content_type()) data["track_type"] = content_class.numeric data["playlist_id"] = entry.playlist.playlist_id data["schedule_id"] = entry.playlist.schedule.schedule_id data["show_id"] = entry.playlist.schedule.show_id data["show_name"] = entry.playlist.schedule.show_name + data["log_source"] = self.config.get("api_engine_number") + + self.store_trackservice(data) + self.store_clock_info(data) + + + def on_metadata(self, meta): + """ + Some metadata update was sent from Liquidsoap. + """ + data = {} + data["track_start"] = meta.get("on_air") + data["track_artist"] = meta.get("artist") + data["track_album"] = meta.get("album") + data["track_title"] = meta.get("title") + data["track_duration"] = 0 + data["track_type"] = ResourceClass.FILE.numeric + #lqs_source = meta["source"] + + entry = self.playlog.resolve_entry(meta["filename"]) + if entry: + # This is a playlog according to the scheduled playlist (normal or fallback) + data["track_num"] = entry.entry_num + data["playlist_id"] = entry.playlist.playlist_id + data["schedule_id"] = entry.playlist.schedule.schedule_id + data["show_id"] = entry.playlist.schedule.show_id + data["show_name"] = entry.playlist.schedule.show_name + else: + # This is a fallback playlog which wasn't scheduled actually (e.g. station fallback) + (past, timeslot, next) = self.playlog.get_timeslots() + if timeslot: + data = {**data, **timeslot} + data["log_source"] = self.config.get("api_engine_number") data = SU.clean_dictionary(data) + self.store_trackservice(data) + self.store_clock_info(data) + + + + + def store_trackservice(self, data): + """ + Posts the given `PlaylistEntry` to the Engine API Playlog. + """ + data = SU.clean_dictionary(data) - self.logger.info("Posting schedule update to Engine API...") + self.logger.info("Posting playlog to Engine API...") url = self.config.get("api_engine_store_playlog") headers = {'content-type': 'application/json'} body = json.dumps(data, indent=4, sort_keys=True, default=str) response = requests.post(url, data=body, headers=headers) - self.logger.info("Engine API response: %s" % response.status_code) + if response.status_code != 204 or response.status_code != 204: + msg = f"Error while posting playlog to Engine API: {response.reason} (Error {response.status_code})\n" + self.logger.info(SU.red(msg) + response.content.decode("utf-8")) - def store_clock_info(self, entry): + def store_clock_info(self, data): """ Posts the current and next show information to the Engine API. """ - current_playlist = self.soundsystem.scheduler.get_active_playlist() - current_schedule = current_playlist.schedule - next_schedule = self.soundsystem.scheduler.get_next_schedules(1) - if next_schedule: next_schedule = next_schedule[0] + current_playlist = self.engine.scheduler.get_active_playlist() + (past_timeslot, current_timeslot, next_timeslot) = self.playlog.get_timeslots() + next_timeslot = self.engine.scheduler.get_next_schedules(1) + if next_timeslot: next_timeslot = next_timeslot[0] data = dict() data["engine_source"] = self.config.get("api_engine_number") @@ -112,29 +181,11 @@ class TrackServiceHandler(): entry = SU.clean_dictionary(entry) data["current_playlist"]["entries"].append(entry) - if current_schedule: - cs = dict() - cs["schedule_id"] = current_schedule.schedule_id - cs["schedule_start"] = current_schedule.schedule_start - cs["schedule_end"] = current_schedule.schedule_end - cs["show_id"] = current_schedule.show_id - cs["show_name"] = current_schedule.show_name - cs["playlist_id"] = current_schedule.playlist_id - cs["fallback_type"] = current_schedule.fallback_state.id - cs = SU.clean_dictionary(cs) - data["current_schedule"] = cs - - if next_schedule: - ns = dict() - ns["schedule_id"] = next_schedule.schedule_id - ns["schedule_start"] = next_schedule.schedule_start - ns["schedule_end"] = next_schedule.schedule_end - ns["show_id"] = next_schedule.show_id - ns["show_name"] = next_schedule.show_name - ns["playlist_id"] = next_schedule.playlist_id - ns["fallback_type"] = next_schedule.fallback_state.id - ns = SU.clean_dictionary(ns) - data["next_schedule"] = ns + if current_timeslot: + data["current_schedule"] = current_timeslot + + if next_timeslot: + data["next_schedule"] = next_timeslot data = SU.clean_dictionary(data) @@ -144,4 +195,189 @@ class TrackServiceHandler(): headers = {'content-type': 'application/json'} body = json.dumps(data, indent=4, sort_keys=True, default=str) response = requests.put(url, data=body, headers=headers) - self.logger.info("Engine API response: %s" % response.status_code) \ No newline at end of file + if response.status_code != 204 or response.status_code != 204: + msg = f"Error while posting clock-info to Engine API: {response.reason} (Error {response.status_code})\n" + self.logger.info(SU.red(msg) + response.content.decode("utf-8")) + + + + +class Playlog: + """ + Playlog keeps a short history of currently playing entries. It stores the recent + active entries to a local cache `entry_history` being able to manage concurrently playing entries. + + It also is in charge of resolving relevant meta information of the currently playing entry for + the TrackService handler. + """ + config = None + logger = None + engine = None + history = None + previous_timeslot = None + current_timeslot = None + next_timeslot = None + + + def __init__(self, engine): + """ + Constructor + """ + self.config = AuraConfig.config() + self.logger = logging.getLogger("AuraEngine") + self.engine = engine + self.history = deque([None, None, None]) + self.current_timeslot = {} + self.set_timeslot(None) + + + def set_timeslot(self, timeslot): + """ + """ + if timeslot and self.previous_timeslot: + if self.previous_timeslot.get("schedule_start") == timeslot.get("schedule_start"): + return # Avoid overwrite by multiple calls in a row + + data = {} + next_timeslot = self.engine.scheduler.get_next_schedules(1) + if next_timeslot: next_timeslot = next_timeslot[0] + + if timeslot: + self.assign_fallback_playlist(data, timeslot) + data["schedule_id"] = timeslot.schedule_id + data["schedule_start"] = timeslot.schedule_start + data["schedule_end"] = timeslot.schedule_end + data["show_id"] = timeslot.show_id + data["show_name"] = timeslot.show_name + data = SU.clean_dictionary(data) + + # Any previous (fake) timeslots should get the proper end now + if not self.previous_timeslot: + self.current_timeslot["schedule_end"] = timeslot.schedule_start + self.previous_timeslot = self.current_timeslot + + else: + # Defaults are not existing timeslot + self.assign_fallback_playlist(data, None) + data["schedule_id"] = -1 + data["show_id"] = -1 + data["show_name"] = "" + + if self.previous_timeslot: + data["schedule_start"] = self.previous_timeslot.get("schedule_end") + else: + data["schedule_start"] = datetime.now() + + if next_timeslot: + data["schedule_end"] = next_timeslot.schedule_end + else: + # Fake the end, because the timeslot is actually not existing + data["schedule_end"] = datetime.now() + timedelta(hours=1) + + + if self.next_timeslot: + ns = {} + self.assign_fallback_playlist(ns, next_timeslot) + ns["schedule_id"] = next_timeslot.schedule_id + ns["schedule_start"] = next_timeslot.schedule_start + ns["schedule_end"] = next_timeslot.schedule_end + ns["show_id"] = next_timeslot.show_id + ns["show_name"] = next_timeslot.show_name + ns["playlist_id"] = next_timeslot.playlist_id + ns = SU.clean_dictionary(ns) + self.next_timeslot = ns + + self.current_timeslot = data + + + + + def assign_fallback_playlist(self, data, timeslot): + """ + """ + fallback_type = None + playlist = None + + if timeslot: + fallback_type, playlist = self.engine.scheduler.fallback_manager.resolve_playlist(timeslot) + + if playlist: + data["playlist_id"] = playlist.playlist_id + else: + data["playlist_id"] = -1 + + if fallback_type: + data["fallback_type"] = fallback_type.id + else: + data["fallback_type"] = FallbackType.STATION.id + + + + + + def get_timeslots(self): + """ + """ + return (self.previous_timeslot, self.current_timeslot, self.next_timeslot) + + + def add(self, entry): + """ + Saves the currently preloaded [`Entry`] to the local cache. + """ + self.history.pop() + self.history.appendleft(entry) + + + def get_recent_entries(self): + """ + Retrieves the currently playing [`Entry`] from the local cache. + """ + return self.history + + + def resolve_entry(self, uri): + """ + Retrieves the playlog matching the provied file URI. + + Args: + path (String): The URI of the resource + """ + result = None + entries = self.get_recent_entries() + if not entries: + return None + + for entry in entries: + if entry: + entry_source = entry.source + + if entry.get_content_type() in ResourceClass.FILE.types: + base_dir = self.config.get("audio_source_folder") + extension = self.config.get("audio_source_extension") + entry_source = ResourceUtil.uri_to_filepath(base_dir, entry.source, extension) + if entry_source == uri: + self.logger.info("Resolved '%s' entry '%s' for URI '%s'" % (entry.get_content_type(), entry, uri)) + result = entry + break + + if not result: + msg = "Found no entry in the recent history which matches the given source '%s'" % (uri) + self.logger.critical(SU.red(msg)) + + return result + + + def print_entry_history(self): + """ + Prints all recents entries of the history. + """ + msg = "Active entry history:\n" + for entries in self.history: + msg += "[" + for e in entries: + msg += "\n" + str(e) + msg += "]" + self.logger.info(msg) + + diff --git a/modules/scheduling/fallback_manager.py b/modules/scheduling/fallback_manager.py index 9abbc31d731e82d94fac11aefa4392032b32bd83..fc4f73426a8f53ae3cce789ddec26baaf38416c1 100644 --- a/modules/scheduling/fallback_manager.py +++ b/modules/scheduling/fallback_manager.py @@ -28,17 +28,19 @@ from datetime import datetime, timedelta from modules.base.utils import SimpleUtil as SU from modules.base.mail import AuraMailer from modules.core.resources import ResourceClass -from modules.core.engine import Engine +from modules.core.channels import Channel + + class FallbackType(Enum): """ Types of playlists. """ - NONE = { "id": 0, "name": "default" } # No fallback active, default playout - SCHEDULE = { "id": 1, "name": "schedule" } # The first played when some default playlist fails - SHOW = { "id": 2, "name": "show" } # The second played when the timeslot fallback fails - STATION = { "id": 3, "name": "station" } # The last played when everything else fails + NONE = { "id": 0, "name": "default", "lqs_sources": [ Channel.QUEUE_A, Channel.QUEUE_A] } # No fallback active, default playout + SCHEDULE = { "id": 1, "name": "schedule", "lqs_sources": [ Channel.FALLBACK_QUEUE_A, Channel.FALLBACK_QUEUE_B]} # The first played when some default playlist fails + SHOW = { "id": 2, "name": "show", "lqs_sources": [ "station_folder", "station_playlist"]} # The second played when the timeslot fallback fails + STATION = { "id": 3, "name": "station", "lqs_sources": [ "station_folder", "station_playlist"] } # The last played when everything else fails @property def id(self): diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index 131f701777496479c129a4580b99bfb2ba302f9c..34e8a420def83db70e990e29e669e337c2df4c92 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -103,7 +103,7 @@ class AuraScheduler(threading.Thread): self.engine = engine self.engine.scheduler = self self.is_soundsytem_init = False - + # Scheduler Initialization self.is_initialized = False self.func_on_initialized = func_on_init @@ -313,6 +313,7 @@ class AuraScheduler(threading.Thread): current_schedule = schedule break + self.engine.event_dispatcher.on_timeslot(current_schedule) return current_schedule