Commit c0e0737f authored by David Trattnig's avatar David Trattnig
Browse files

Generic metadata handling incl. fallbacks. #43 #44

parent 1113af30
......@@ -22,11 +22,10 @@ import datetime
from threading import Thread
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
from modules.base.exceptions import NoActiveEntryException
from modules.base.mail import AuraMailer
from modules.plugins.monitor import AuraMonitor
from modules.core.state import PlayerStateService
from modules.plugins.trackservice import TrackServiceHandler
......@@ -76,23 +75,21 @@ class EngineEventDispatcher():
subscriber_registry = None
mailer = None
soundsystem = None
player_state = None
engine = None
scheduler = None
monitor = None
def __init__(self, soundsystem, scheduler):
def __init__(self, engine, scheduler):
"""
Initialize EventDispatcher
"""
self.subscriber_registry = dict()
self.logger = logging.getLogger("AuraEngine")
self.config = soundsystem.config
self.config = AuraConfig.config()
self.mailer = AuraMailer(self.config)
self.soundsystem = soundsystem
self.engine = engine
self.scheduler = scheduler
self.player_state = PlayerStateService(self.config)
binding = self.attach(AuraMonitor)
binding.subscribe("on_boot")
......@@ -100,14 +97,17 @@ class EngineEventDispatcher():
binding.subscribe("on_resurrect")
binding = self.attach(TrackServiceHandler)
binding.subscribe("on_timeslot")
binding.subscribe("on_play")
binding.subscribe("on_metadata")
binding.subscribe("on_queue")
def attach(self, clazz):
"""
Creates an intance of the given Class.
"""
instance = clazz(self.config, self.soundsystem)
instance = clazz(self.engine)
return EventBinding(self, instance)
......@@ -168,36 +168,54 @@ class EngineEventDispatcher():
self.scheduler.on_ready()
def on_play(self, source):
def on_timeslot(self, timeslot):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing. Note that this event resolves the source URI
and passes an `PlaylistEntry` to event handlers.
Event Handler which is called by the scheduler when the current timeslot is refreshed.
Args:
source (String): The `Entry` object *or* the URI of the media source currently playing
source (String): The `PlaylistEntry` object
"""
def func(self, source):
self.logger.debug("on_play(..)")
entry = None
def func(self, timeslot):
self.logger.debug("on_timeslot(..)")
self.call_event("on_timeslot", timeslot)
thread = Thread(target = func, args = (self, timeslot))
thread.start()
if isinstance(source, str):
try:
self.logger.info(SU.pink("Source '%s' started playing. Resolving ..." % source))
entry = self.player_state.resolve_entry(source)
except NoActiveEntryException:
self.logger.error("Cannot resolve '%s'" % source)
else:
entry = source
def on_play(self, entry):
"""
Event Handler which is called by the engine when some entry is actually playing.
Args:
source (String): The `PlaylistEntry` object
"""
def func(self, entry):
self.logger.debug("on_play(..)")
# Assign timestamp for play time
entry.entry_start_actual = datetime.datetime.now()
self.call_event("on_play", entry)
thread = Thread(target = func, args = (self, source))
thread = Thread(target = func, args = (self, entry))
thread.start()
def on_metadata(self, data):
"""
Event Handler which is called by the soundsystem implementation (i.e. Liquidsoap)
when some entry is actually playing.
Args:
data (dict): A collection of metadata related to the current track
"""
def func(self, data):
self.logger.debug("on_metadata(..)")
self.call_event("on_metadata", data)
thread = Thread(target = func, args = (self, data))
thread.start()
def on_stop(self, entry):
"""
The entry on the assigned channel has been stopped playing.
......@@ -257,7 +275,6 @@ class EngineEventDispatcher():
"""
def func(self, entries):
self.logger.debug("on_queue(..)")
self.player_state.add_to_history(entries)
self.call_event("on_queue", entries)
thread = Thread(target = func, args = (self, entries))
......
......@@ -21,8 +21,15 @@ import json
import logging
import requests
from collections import deque
from datetime import datetime, timedelta
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
from modules.core.resources import ResourceClass
from modules.core.resources import ResourceUtil
from modules.scheduling.fallback_manager import FallbackType
class TrackServiceHandler():
......@@ -31,33 +38,50 @@ class TrackServiceHandler():
"""
logger = None
config = None
soundsystem = None
engine = None
playlog = None
def __init__(self, config, soundsystem):
def __init__(self, engine):
"""
Initialize.
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.config = AuraConfig.config()
self.engine = engine
self.playlog = Playlog(engine)
def on_play(self, entry):
def on_timeslot(self, timeslot=None):
"""
Some track started playing.
Some new timeslot has just started.
"""
self.store_trackservice(entry)
self.store_clock_info(entry)
if timeslot:
self.logger.info(f"Active timeslot used for trackservice '{timeslot}'")
self.playlog.set_timeslot(timeslot)
def store_trackservice(self, entry):
def on_queue(self, entries):
"""
Posts the given `PlaylistEntry` to the Engine API Playlog.
"""
data = dict()
for entry in entries:
self.playlog.add(entry)
def on_play(self, entry):
"""
Some `PlaylistEntry` started playing. This is likely only a LIVE or STREAM entry.
"""
content_class = ResourceUtil.get_content_class(entry.get_content_type())
if content_class == ResourceClass.FILE:
# Files are handled by "on_metadata" called via Liquidsoap
return
diff = (entry.entry_start_actual - entry.entry_start).total_seconds()
self.logger.info("There's a difference of %s seconds between planned and actual start of the entry" % diff)
data = {}
data["track_start"] = entry.entry_start_actual
if entry.meta_data:
data["track_artist"] = entry.meta_data.artist
......@@ -65,31 +89,76 @@ class TrackServiceHandler():
data["track_title"] = entry.meta_data.title
data["track_duration"] = entry.duration
data["track_num"] = entry.entry_num
content_class = ResourceUtil.get_content_class(entry.get_content_type())
data["track_type"] = content_class.numeric
data["playlist_id"] = entry.playlist.playlist_id
data["schedule_id"] = entry.playlist.schedule.schedule_id
data["show_id"] = entry.playlist.schedule.show_id
data["show_name"] = entry.playlist.schedule.show_name
data["log_source"] = self.config.get("api_engine_number")
self.store_trackservice(data)
self.store_clock_info(data)
def on_metadata(self, meta):
"""
Some metadata update was sent from Liquidsoap.
"""
data = {}
data["track_start"] = meta.get("on_air")
data["track_artist"] = meta.get("artist")
data["track_album"] = meta.get("album")
data["track_title"] = meta.get("title")
data["track_duration"] = 0
data["track_type"] = ResourceClass.FILE.numeric
#lqs_source = meta["source"]
entry = self.playlog.resolve_entry(meta["filename"])
if entry:
# This is a playlog according to the scheduled playlist (normal or fallback)
data["track_num"] = entry.entry_num
data["playlist_id"] = entry.playlist.playlist_id
data["schedule_id"] = entry.playlist.schedule.schedule_id
data["show_id"] = entry.playlist.schedule.show_id
data["show_name"] = entry.playlist.schedule.show_name
else:
# This is a fallback playlog which wasn't scheduled actually (e.g. station fallback)
(past, timeslot, next) = self.playlog.get_timeslots()
if timeslot:
data = {**data, **timeslot}
data["log_source"] = self.config.get("api_engine_number")
data = SU.clean_dictionary(data)
self.store_trackservice(data)
self.store_clock_info(data)
def store_trackservice(self, data):
"""
Posts the given `PlaylistEntry` to the Engine API Playlog.
"""
data = SU.clean_dictionary(data)
self.logger.info("Posting schedule update to Engine API...")
self.logger.info("Posting playlog to Engine API...")
url = self.config.get("api_engine_store_playlog")
headers = {'content-type': 'application/json'}
body = json.dumps(data, indent=4, sort_keys=True, default=str)
response = requests.post(url, data=body, headers=headers)
self.logger.info("Engine API response: %s" % response.status_code)
if response.status_code != 204 or response.status_code != 204:
msg = f"Error while posting playlog to Engine API: {response.reason} (Error {response.status_code})\n"
self.logger.info(SU.red(msg) + response.content.decode("utf-8"))
def store_clock_info(self, entry):
def store_clock_info(self, data):
"""
Posts the current and next show information to the Engine API.
"""
current_playlist = self.soundsystem.scheduler.get_active_playlist()
current_schedule = current_playlist.schedule
next_schedule = self.soundsystem.scheduler.get_next_schedules(1)
if next_schedule: next_schedule = next_schedule[0]
current_playlist = self.engine.scheduler.get_active_playlist()
(past_timeslot, current_timeslot, next_timeslot) = self.playlog.get_timeslots()
next_timeslot = self.engine.scheduler.get_next_schedules(1)
if next_timeslot: next_timeslot = next_timeslot[0]
data = dict()
data["engine_source"] = self.config.get("api_engine_number")
......@@ -112,29 +181,11 @@ class TrackServiceHandler():
entry = SU.clean_dictionary(entry)
data["current_playlist"]["entries"].append(entry)
if current_schedule:
cs = dict()
cs["schedule_id"] = current_schedule.schedule_id
cs["schedule_start"] = current_schedule.schedule_start
cs["schedule_end"] = current_schedule.schedule_end
cs["show_id"] = current_schedule.show_id
cs["show_name"] = current_schedule.show_name
cs["playlist_id"] = current_schedule.playlist_id
cs["fallback_type"] = current_schedule.fallback_state.id
cs = SU.clean_dictionary(cs)
data["current_schedule"] = cs
if next_schedule:
ns = dict()
ns["schedule_id"] = next_schedule.schedule_id
ns["schedule_start"] = next_schedule.schedule_start
ns["schedule_end"] = next_schedule.schedule_end
ns["show_id"] = next_schedule.show_id
ns["show_name"] = next_schedule.show_name
ns["playlist_id"] = next_schedule.playlist_id
ns["fallback_type"] = next_schedule.fallback_state.id
ns = SU.clean_dictionary(ns)
data["next_schedule"] = ns
if current_timeslot:
data["current_schedule"] = current_timeslot
if next_timeslot:
data["next_schedule"] = next_timeslot
data = SU.clean_dictionary(data)
......@@ -144,4 +195,189 @@ class TrackServiceHandler():
headers = {'content-type': 'application/json'}
body = json.dumps(data, indent=4, sort_keys=True, default=str)
response = requests.put(url, data=body, headers=headers)
self.logger.info("Engine API response: %s" % response.status_code)
\ No newline at end of file
if response.status_code != 204 or response.status_code != 204:
msg = f"Error while posting clock-info to Engine API: {response.reason} (Error {response.status_code})\n"
self.logger.info(SU.red(msg) + response.content.decode("utf-8"))
class Playlog:
"""
Playlog keeps a short history of currently playing entries. It stores the recent
active entries to a local cache `entry_history` being able to manage concurrently playing entries.
It also is in charge of resolving relevant meta information of the currently playing entry for
the TrackService handler.
"""
config = None
logger = None
engine = None
history = None
previous_timeslot = None
current_timeslot = None
next_timeslot = None
def __init__(self, engine):
"""
Constructor
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.engine = engine
self.history = deque([None, None, None])
self.current_timeslot = {}
self.set_timeslot(None)
def set_timeslot(self, timeslot):
"""
"""
if timeslot and self.previous_timeslot:
if self.previous_timeslot.get("schedule_start") == timeslot.get("schedule_start"):
return # Avoid overwrite by multiple calls in a row
data = {}
next_timeslot = self.engine.scheduler.get_next_schedules(1)
if next_timeslot: next_timeslot = next_timeslot[0]
if timeslot:
self.assign_fallback_playlist(data, timeslot)
data["schedule_id"] = timeslot.schedule_id
data["schedule_start"] = timeslot.schedule_start
data["schedule_end"] = timeslot.schedule_end
data["show_id"] = timeslot.show_id
data["show_name"] = timeslot.show_name
data = SU.clean_dictionary(data)
# Any previous (fake) timeslots should get the proper end now
if not self.previous_timeslot:
self.current_timeslot["schedule_end"] = timeslot.schedule_start
self.previous_timeslot = self.current_timeslot
else:
# Defaults are not existing timeslot
self.assign_fallback_playlist(data, None)
data["schedule_id"] = -1
data["show_id"] = -1
data["show_name"] = ""
if self.previous_timeslot:
data["schedule_start"] = self.previous_timeslot.get("schedule_end")
else:
data["schedule_start"] = datetime.now()
if next_timeslot:
data["schedule_end"] = next_timeslot.schedule_end
else:
# Fake the end, because the timeslot is actually not existing
data["schedule_end"] = datetime.now() + timedelta(hours=1)
if self.next_timeslot:
ns = {}
self.assign_fallback_playlist(ns, next_timeslot)
ns["schedule_id"] = next_timeslot.schedule_id
ns["schedule_start"] = next_timeslot.schedule_start
ns["schedule_end"] = next_timeslot.schedule_end
ns["show_id"] = next_timeslot.show_id
ns["show_name"] = next_timeslot.show_name
ns["playlist_id"] = next_timeslot.playlist_id
ns = SU.clean_dictionary(ns)
self.next_timeslot = ns
self.current_timeslot = data
def assign_fallback_playlist(self, data, timeslot):
"""
"""
fallback_type = None
playlist = None
if timeslot:
fallback_type, playlist = self.engine.scheduler.fallback_manager.resolve_playlist(timeslot)
if playlist:
data["playlist_id"] = playlist.playlist_id
else:
data["playlist_id"] = -1
if fallback_type:
data["fallback_type"] = fallback_type.id
else:
data["fallback_type"] = FallbackType.STATION.id
def get_timeslots(self):
"""
"""
return (self.previous_timeslot, self.current_timeslot, self.next_timeslot)
def add(self, entry):
"""
Saves the currently preloaded [`Entry`] to the local cache.
"""
self.history.pop()
self.history.appendleft(entry)
def get_recent_entries(self):
"""
Retrieves the currently playing [`Entry`] from the local cache.
"""
return self.history
def resolve_entry(self, uri):
"""
Retrieves the playlog matching the provied file URI.
Args:
path (String): The URI of the resource
"""
result = None
entries = self.get_recent_entries()
if not entries:
return None
for entry in entries:
if entry:
entry_source = entry.source
if entry.get_content_type() in ResourceClass.FILE.types:
base_dir = self.config.get("audio_source_folder")
extension = self.config.get("audio_source_extension")
entry_source = ResourceUtil.uri_to_filepath(base_dir, entry.source, extension)
if entry_source == uri:
self.logger.info("Resolved '%s' entry '%s' for URI '%s'" % (entry.get_content_type(), entry, uri))
result = entry
break
if not result:
msg = "Found no entry in the recent history which matches the given source '%s'" % (uri)
self.logger.critical(SU.red(msg))
return result
def print_entry_history(self):
"""
Prints all recents entries of the history.
"""
msg = "Active entry history:\n"
for entries in self.history:
msg += "["
for e in entries:
msg += "\n" + str(e)
msg += "]"
self.logger.info(msg)
......@@ -28,17 +28,19 @@ from datetime import datetime, timedelta
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
from modules.core.resources import ResourceClass
from modules.core.engine import Engine
from modules.core.channels import Channel
class FallbackType(Enum):
"""
Types of playlists.
"""
NONE = { "id": 0, "name": "default" } # No fallback active, default playout
SCHEDULE = { "id": 1, "name": "schedule" } # The first played when some default playlist fails
SHOW = { "id": 2, "name": "show" } # The second played when the timeslot fallback fails
STATION = { "id": 3, "name": "station" } # The last played when everything else fails
NONE = { "id": 0, "name": "default", "lqs_sources": [ Channel.QUEUE_A, Channel.QUEUE_A] } # No fallback active, default playout
SCHEDULE = { "id": 1, "name": "schedule", "lqs_sources": [ Channel.FALLBACK_QUEUE_A, Channel.FALLBACK_QUEUE_B]} # The first played when some default playlist fails
SHOW = { "id": 2, "name": "show", "lqs_sources": [ "station_folder", "station_playlist"]} # The second played when the timeslot fallback fails
STATION = { "id": 3, "name": "station", "lqs_sources": [ "station_folder", "station_playlist"] } # The last played when everything else fails
@property
def id(self):
......
......@@ -103,7 +103,7 @@ class AuraScheduler(threading.Thread):
self.engine = engine
self.engine.scheduler = self
self.is_soundsytem_init = False
# Scheduler Initialization
self.is_initialized = False
self.func_on_initialized = func_on_init
......@@ -313,6 +313,7 @@ class AuraScheduler(threading.Thread):
current_schedule = schedule
break
self.engine.event_dispatcher.on_timeslot(current_schedule)
return current_schedule
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment