Commit 02a549b2 authored by David Trattnig's avatar David Trattnig
Browse files

Distinction between programme and API modules. #41

parent 66b67e2e
......@@ -22,13 +22,14 @@
import logging
import requests
from datetime import datetime
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.scheduling.utils import TimeslotFilter
class CalendarFetcher:
class ApiFetcher:
"""
Fetches the timeslots, playlists and playlist entries as JSON
via the API endpoints of Steering and Tank.
......@@ -46,14 +47,11 @@ class CalendarFetcher:
def __init__(self, config):
def __init__(self):
"""
Constructor
Args:
config (AuraConfig): Holds the engine configuration
"""
self.config = config
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.steering_calendar_url = self.config.get("api_steering_calendar")
self.tank_playlist_url = self.config.get("api_tank_playlist")
......@@ -234,8 +232,8 @@ class CalendarFetcher:
and transparent timeslot ID assigment for more expressive use.
"""
count_before = len(timeslots)
timeslots = self.remove_data_more_than_24h_in_the_future(timeslots)
timeslots = self.remove_data_in_the_past(timeslots)
timeslots = TimeslotFilter.filter_24h(timeslots)
timeslots = TimeslotFilter.filter_past(timeslots)
count_after = len(timeslots)
self.logger.debug("Removed %d unnecessary timeslots from response. Timeslots left: %d" % ((count_before - count_after), count_after))
......@@ -245,46 +243,5 @@ class CalendarFetcher:
def remove_data_more_than_24h_in_the_future(self, timeslots):
"""
Removes entries 24h in the future and 12 hours in the past.
Note: This might influence resuming (in case of a crash)
single timeslots which are longer than 12 hours long.
Think e.g. live broadcasts.
"""
items = []
now = SU.timestamp()
now_plus_24hours = now + (12*60*60)
now_minus_12hours = now - (12*60*60)
for s in timeslots:
start_time = datetime.strptime(s["start"], "%Y-%m-%dT%H:%M:%S")
start_time = SU.timestamp(start_time)
if start_time <= now_plus_24hours and start_time >= now_minus_12hours:
items.append(s)
return items
def remove_data_in_the_past(self, timeslots):
"""
Removes all timeslots from the past, except the one which is
currently playing.
"""
items = []
now = SU.timestamp()
for s in timeslots:
start_time = datetime.strptime(s["start"], "%Y-%m-%dT%H:%M:%S")
start_time = SU.timestamp(start_time)
end_time = datetime.strptime(s["end"], "%Y-%m-%dT%H:%M:%S")
end_time = SU.timestamp(end_time)
# Append all elements in the future
if start_time >= now:
items.append(s)
# Append the one which is playing now
elif start_time < now < end_time:
items.append(s)
return items
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
import queue
import logging
from datetime import datetime
from src.base.utils import SimpleUtil as SU
from src.scheduling.models import Timeslot, Playlist, PlaylistEntry, PlaylistEntryMetaData
from src.scheduling.calender_fetcher import CalendarFetcher
class AuraCalendarService(threading.Thread):
"""
The `AuraCalendarService` retrieves all current schedules and related
playlists including audio files from the configured API endpoints and
stores it in the local database.
To perform the API queries it utilizes the CalendarFetcher class.
Attributes:
#FIXME Review attributes not needed.
"""
queue = None
config = None
logger = None
fetched_timeslot_data = None
calendar_fetcher = None
stop_event = None
def __init__(self, config):
"""
Initializes the class.
Args:
config (AuraConfig): The configuration
"""
threading.Thread.__init__(self)
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.queue = queue.Queue()
self.stop_event = threading.Event()
self.calendar_fetcher = CalendarFetcher(config)
def get_queue(self):
"""
Retrieves the queue of fetched data.
"""
return self.queue
def run(self):
"""
Fetch calendar data and store it in the database. Also handles local deletion of remotely
deleted schedules.
Returns
Timeslot ([]): An arrar of retrieved timeslots passed via `self.queue`
"""
result = []
now_unix = SU.timestamp()
scheduling_window_start = self.config.get("scheduling_window_start")
try:
fetched_timeslot_data = self.calendar_fetcher.fetch()
self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslot_data))
# If nothing is fetched, return
if not fetched_timeslot_data:
self.queue.put("fetching_aborted Nothing fetched")
return
# Check if existing timeslots have been deleted
local_timeslots = Timeslot.get_timeslots(datetime.now())
for local_timeslot in local_timeslots:
# Only allow deletion of timeslots which are deleted before the start of the scheduling window
if local_timeslot.start_unix > now_unix:
if (local_timeslot.start_unix - scheduling_window_start) > now_unix:
# Filter the local timeslot from the fetched ones
existing_timeslot = list(filter(lambda new_timeslot: \
new_timeslot["timeslot_id"] == local_timeslot.timeslot_id, fetched_timeslot_data))
if existing_timeslot:
# self.logger.debug("Timeslot #%s is still existing remotely!" % (local_timeslot.timeslot_id))
pass
else:
self.logger.info("Timeslot #%s has been deleted remotely, hence also delete it locally [%s]" % \
(local_timeslot.timeslot_id, str(local_timeslot)))
local_timeslot.delete(commit=True)
self.logger.info("Deleted local timeslot #%s from database" % local_timeslot.timeslot_id)
else:
msg = "Timeslot #%s has been deleted remotely. Since the scheduling window has already started, it won't be deleted locally." % \
local_timeslot.timeslot_id
self.logger.error(SU.red(msg))
# Process fetched timeslots
for timeslot in fetched_timeslot_data:
# Check timeslot for validity
if "start" not in timeslot:
self.logger.warning("No 'start' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
if "end" not in timeslot:
self.logger.warning("No 'end' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
# Store the timeslot
timeslot_db = self.store_timeslot(timeslot)
# Store playlists to play
self.store_playlist(timeslot_db, timeslot_db.playlist_id, timeslot["playlist"])
if timeslot_db.schedule_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.schedule_fallback_id, timeslot["schedule_fallback"])
if timeslot_db.show_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.show_fallback_id, timeslot["show_fallback"])
if timeslot_db.station_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.station_fallback_id, timeslot["station_fallback"])
result.append(timeslot_db)
# Release the mutex
self.queue.put(result)
except Exception as e:
# Release the mutex
self.logger.warning("Fetching aborted due to: %s" % str(e), e)
self.queue.put("fetching_aborted " + str(e))
# terminate the thread
return
def store_timeslot(self, timeslot):
"""
Stores the given timeslot to the database.
Args:
timeslot (Timeslot): The timeslot
"""
timeslot_db = Timeslot.for_datetime(timeslot["start"])
havetoadd = False
if not timeslot_db:
self.logger.debug("no timeslot with given timeslot id in database => create new")
timeslot_db = Timeslot()
havetoadd = True
timeslot_db.show_id = timeslot["show_id"]
timeslot_db.timeslot_id = timeslot["timeslot_id"]
timeslot_db.timeslot_start = timeslot["start"]
timeslot_db.timeslot_end = timeslot["end"]
timeslot_db.show_name = timeslot["show_name"]
timeslot_db.show_hosts = timeslot["show_hosts"]
timeslot_db.is_repetition = timeslot["is_repetition"]
timeslot_db.funding_category = timeslot["show_fundingcategory"]
timeslot_db.languages = timeslot["show_languages"]
timeslot_db.type = timeslot["show_type"]
timeslot_db.category = timeslot["show_categories"]
timeslot_db.topic = timeslot["show_topics"]
timeslot_db.musicfocus = timeslot["show_musicfocus"]
timeslot_db.playlist_id = timeslot["playlist_id"]
timeslot_db.schedule_fallback_id = timeslot["schedule_fallback_id"]
timeslot_db.show_fallback_id = timeslot["show_fallback_id"]
timeslot_db.station_fallback_id = timeslot["station_fallback_id"]
timeslot_db.store(add=havetoadd, commit=True)
return timeslot_db
# def store_playlist(self, timeslot_db, playlist_id, fetched_playlist, fallbackplaylist_type=0):
def store_playlist(self, timeslot_db, playlist_id, fetched_playlist):
"""
Stores the Playlist to the database.
"""
if not playlist_id or not fetched_playlist:
self.logger.debug(f"Playlist ID#{playlist_id} is not available!")
return
playlist_db = Playlist.select_playlist_for_timeslot(timeslot_db.timeslot_start, playlist_id)
havetoadd = False
if not playlist_db:
playlist_db = Playlist()
havetoadd = True
self.logger.debug("Storing playlist %d for timeslot (%s)" % (playlist_id, str(timeslot_db)))
playlist_db.playlist_id = playlist_id
playlist_db.timeslot_start = timeslot_db.timeslot_start
playlist_db.show_name = timeslot_db.show_name
if "entries" in fetched_playlist:
playlist_db.entry_count = len(fetched_playlist["entries"])
else:
playlist_db.entry_count = 0
playlist_db.store(havetoadd, commit=True)
if playlist_db.entry_count > 0:
self.store_playlist_entries(timeslot_db, playlist_db, fetched_playlist)
return playlist_db
def store_playlist_entries(self, timeslot_db, playlist_db, fetched_playlist):
"""
Stores the playlist entries to the database.
"""
entry_num = 0
time_marker = playlist_db.start_unix
self.expand_entry_duration(timeslot_db, fetched_playlist)
self.delete_orphaned_entries(playlist_db, fetched_playlist)
for entry in fetched_playlist["entries"]:
entry_db = PlaylistEntry.select_playlistentry_for_playlist(playlist_db.artificial_id, entry_num)
havetoadd = False
if not entry_db:
entry_db = PlaylistEntry()
havetoadd = True
entry_db.entry_start = datetime.fromtimestamp(time_marker)
entry_db.artificial_playlist_id = playlist_db.artificial_id
entry_db.entry_num = entry_num
entry_db.duration = SU.nano_to_seconds(entry["duration"])
if "uri" in entry:
# FIXME Refactor mix of uri/filename/file/source
entry_db.uri = entry["uri"]
entry_db.source = entry["uri"]
if "filename" in entry:
entry_db.source = entry["filename"]
entry_db.store(havetoadd, commit=True)
if "file" in entry:
self.store_playlist_entry_metadata(entry_db, entry["file"]["metadata"])
entry_num = entry_num + 1
time_marker += entry_db.duration
def delete_orphaned_entries(self, playlist_db, fetched_playlist):
"""
Deletes all playlist entries which are beyond the current playlist's `entry_count`.
Such entries might be existing due to a remotely changed playlist, which now has
less entries than before.
"""
new_last_idx = len(fetched_playlist["entries"])
existing_last_idx = PlaylistEntry.count_entries(playlist_db.artificial_id)-1
if existing_last_idx < new_last_idx:
return
for entry_num in range(new_last_idx, existing_last_idx+1, 1):
PlaylistEntry.delete_entry(playlist_db.artificial_id, entry_num)
self.logger.info(SU.yellow("Deleted playlist entry %s:%s" % (playlist_db.artificial_id, entry_num)))
entry_num += 1
def expand_entry_duration(self, timeslot_db, fetched_playlist):
"""
If some playlist entry doesn't have a duration assigned, its duration is expanded to the
remaining duration of the playlist (= timeslot duration minus playlist entries with duration).
If there's more than one entry without duration, such entries are removed from the playlist.
"""
total_seconds = (timeslot_db.timeslot_end - timeslot_db.timeslot_start).total_seconds()
total_duration = SU.seconds_to_nano(total_seconds)
actual_duration = 0
missing_duration = []
idx = 0
for entry in fetched_playlist["entries"]:
if not "duration" in entry:
missing_duration.append(idx)
else:
actual_duration += entry["duration"]
idx += 1
if len(missing_duration) == 1:
fetched_playlist["entries"][missing_duration[0]]["duration"] = total_duration - actual_duration
self.logger.info("Expanded duration of playlist entry #%s:%s" % (fetched_playlist["id"], missing_duration[0]))
elif len(missing_duration) > 1:
# This case should actually never happen, as TANK doesn't allow more than one entry w/o duration anymore
for i in reversed(missing_duration[1:-1]):
self.logger.error(SU.red("Deleted Playlist Entry without duration: %s" % \
str(fetched_playlist["entries"][i])))
del fetched_playlist["entries"][i]
def store_playlist_entry_metadata(self, entry_db, metadata):
"""
Stores the meta-data for a PlaylistEntry.
"""
metadata_db = PlaylistEntryMetaData.select_metadata_for_entry(entry_db.artificial_id)
havetoadd = False
if not metadata_db:
metadata_db = PlaylistEntryMetaData()
havetoadd = True
metadata_db.artificial_entry_id = entry_db.artificial_id
if "artist" in metadata:
metadata_db.artist = metadata["artist"]
else:
metadata_db.artist = ""
if "album" in metadata:
metadata_db.album = metadata["album"]
else:
metadata_db.album = ""
if "title" in metadata:
metadata_db.title = metadata["title"]
else:
metadata_db.title = ""
metadata_db.store(havetoadd, commit=True)
def stop(self):
self.stop_event.set()
......@@ -21,22 +21,24 @@
import logging
import threading
import queue
from datetime import datetime
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.core.engine import Engine
from src.scheduling.calendar import AuraCalendarService
from src.scheduling.models import Timeslot
from src.scheduling.models import Timeslot, Playlist, PlaylistEntry, PlaylistEntryMetaData
from src.scheduling.api import ApiFetcher
class Programme():
class ProgrammeService():
"""
The current programme of the calendar. The programme is consisting of a set of timeslots.
The current programme of the calendar. The programme is a set of timeslots for the current day.
"""
config = None
logger = None
......@@ -61,10 +63,10 @@ class Programme():
# Fetch programme from API endpoints
self.logger.debug("Trying to fetch new programe from API endpoints...")
acs = AuraCalendarService(self.config)
queue = acs.get_queue()
acs.start() # start fetching thread
response = queue.get() # wait for the end
pp = ProgrammePersistence()
pp_queue = pp.get_queue()
pp.start() # start fetching thread
response = pp_queue.get() # wait for the end
self.logger.debug("... Programme fetch via API done!")
# Reset last successful fetch state
......@@ -100,8 +102,7 @@ class Programme():
def load_programme_from_db(self):
"""
Loads the programme from Engine's database and enables
them via `self.enable_entries(..)`. After that, the
Loads the programme from Engine's database and enables them via `self.enable_entries(..)`. After that, the
current message queue is printed to the console.
"""
self.programme = Timeslot.get_timeslots()
......@@ -230,4 +231,327 @@ class Programme():
timeslot.start_unix - window_end > now_unix:
return True
return False
\ No newline at end of file
return False
class ProgrammePersistence(threading.Thread):
"""
The `ProgrammePersistence` service retrieves all current schedules and related
playlists including audio files from the configured API endpoints and stores
it in the local database.
To perform the API queries it utilizes the ApiFetcher class.
"""
queue = None
config = None
logger = None
fetched_timeslot_data = None
api_fetcher = None
stop_event = None
def __init__(self):
"""
Initializes the class.
"""
threading.Thread.__init__(self)
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.queue = queue.Queue()
self.stop_event = threading.Event()
self.api_fetcher = ApiFetcher()
def get_queue(self):
"""
Retrieves the queue of fetched data.
"""
return self.queue
def run(self):
"""
Fetch calendar data and store it in the database. Also handles local deletion of remotely
deleted schedules.
Returns
Timeslot ([]): An arrar of retrieved timeslots passed via `self.queue`
"""
result = []
now_unix = SU.timestamp()
scheduling_window_start = self.config.get("scheduling_window_start")
try:
fetched_timeslot_data = self.api_fetcher.fetch()
self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslot_data))
# If nothing is fetched, return
if not fetched_timeslot_data:
self.queue.put("fetching_aborted Nothing fetched")
return
# Check if existing timeslots have been deleted
local_timeslots = Timeslot.get_timeslots(datetime.now())
for local_timeslot in local_timeslots:
# Only allow deletion of timeslots which are deleted before the start of the scheduling window
if local_timeslot.start_unix > now_unix:
if (local_timeslot.start_unix - scheduling_window_start) > now_unix:
# Filter the local timeslot from the fetched ones
existing_timeslot = list(filter(lambda new_timeslot: \
new_timeslot["timeslot_id"] == local_timeslot.timeslot_id, fetched_timeslot_data))
if existing_timeslot:
# self.logger.debug("Timeslot #%s is still existing remotely!" % (local_timeslot.timeslot_id))
pass
else:
self.logger.info("Timeslot #%s has been deleted remotely, hence also delete it locally [%s]" % \
(local_timeslot.timeslot_id, str(local_timeslot)))
local_timeslot.delete(commit=True)
self.logger.info("Deleted local timeslot #%s from database" % local_timeslot.timeslot_id)
else:
msg = "Timeslot #%s has been deleted remotely. Since the scheduling window has already started, it won't be deleted locally." % \
local_timeslot.timeslot_id
self.logger.error(SU.red(msg))
# Process fetched timeslots
for timeslot in fetched_timeslot_data:
# Check timeslot for validity
if "start" not in timeslot:
self.logger.warning("No 'start' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
if "end" not in timeslot:
self.logger.warning("No 'end' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
# Store the timeslot
timeslot_db = self.store_timeslot(timeslot)
# Store playlists to play