Commit 05b6b59b authored by David Trattnig's avatar David Trattnig
Browse files

Separation of concerns: API vs. Persistence. #41

parent 3d4c2760
......@@ -21,23 +21,26 @@
import logging
import requests
import queue
import threading
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.scheduling.utils import TimeslotFilter
class ApiFetcher:
class ApiFetcher(threading.Thread):
"""
Fetches the timeslots, playlists and playlist entries as JSON
via the API endpoints of Steering and Tank.
"""
config = None
logging = None
queue = None
has_already_fetched = False
fetched_timeslot_data = None
stop_event = None
# Config for API Endpoints
steering_calendar_url = None
......@@ -47,23 +50,63 @@ class ApiFetcher:
def __init__(self):
def __init__(self, config):
"""
Constructor
"""
self.config = AuraConfig.config()
self.config = config
self.logger = logging.getLogger("AuraEngine")
self.steering_calendar_url = self.config.get("api_steering_calendar")
self.tank_playlist_url = self.config.get("api_tank_playlist")
self.tank_session = self.config.get("api_tank_session")
self.tank_secret = self.config.get("api_tank_secret")
self.queue = queue.Queue()
self.stop_event = threading.Event()
threading.Thread.__init__(self)
def run(self):
"""
Fetch timeslot data from the API.
Returns
Timeslot ([dict]): An array of retrieved timeslots dictionary
"""
try:
fetched_timeslots = self.fetch()
self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslots))
# If nothing is fetched, return
if not fetched_timeslots:
self.queue.put("fetching_aborted Nothing fetched")
return None
# Release the mutex
self.queue.put(fetched_timeslots)
except Exception as e:
# Release the mutex
self.logger.warning("Fetching aborted due to: %s" % str(e), e)
self.queue.put("fetching_aborted " + str(e))
# Terminate the thread
return
#
# PUBLIC METHODS
# METHODS
#
def get_fetched_data(self):
"""
Retrieves the fetched data from the queue.
"""
return self.queue.get()
def fetch(self):
"""
Retrieve all required data from the API.
......@@ -107,11 +150,6 @@ class ApiFetcher:
#
# PRIVATE METHODS
#
def fetch_timeslot_data(self):
"""
Fetches timeslot data from Steering.
......@@ -243,5 +281,9 @@ class ApiFetcher:
def terminate(self):
"""
Terminates the thread.
"""
self.logger.info("Shutting down API fetcher...")
self.stop_event.set()
......@@ -21,8 +21,6 @@
import logging
import threading
import queue
from datetime import datetime
......@@ -42,8 +40,10 @@ class ProgrammeService():
"""
config = None
logger = None
programme = None
timeslots = None
api_fetcher = None
last_successful_fetch = None
programme_store = None
def __init__(self):
......@@ -51,68 +51,55 @@ class ProgrammeService():
Constructor
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.logger = logging.getLogger("AuraEngine")
self.programme_store = ProgrammeStore()
def refresh(self):
"""
Fetch the latest programme from `AuraCalendarService` which stores it to the database.
After that, the programme is in turn loaded from the database and stored in `self.programme`.
Fetch the latest programme from `ProgrammeStore` which stores it to the database.
After that, the programme is in turn loaded from the database and stored in `self.timeslots`.
"""
# Fetch programme from API endpoints
self.logger.debug("Trying to fetch new programe from API endpoints...")
pp = ProgrammePersistence()
pp_queue = pp.get_queue()
pp.start() # start fetching thread
response = pp_queue.get() # wait for the end
self.logger.debug("... Programme fetch via API done!")
# Create a fetching thread and wait until it's done
self.api_fetcher = ApiFetcher(self.config)
self.api_fetcher.start()
response = self.api_fetcher.get_fetched_data()
# Reset last successful fetch state
lsf = self.last_successful_fetch
self.last_successful_fetch = None
if response is None:
msg = SU.red("Trying to load programme from Engine Database, because AuraCalendarService returned an empty response.")
msg = SU.red("Trying to load programme from Engine Database, because ApiFetcher returned an empty response.")
self.logger.warning(msg)
elif type(response) is list:
self.programme = response
if self.programme is not None and len(self.programme) > 0:
self.last_successful_fetch = datetime.now()
self.logger.info(SU.green("Finished fetching current programme from API"))
if len(self.programme) == 0:
self.timeslots = response
if self.timeslots is not None and len(self.timeslots) > 0:
self.last_successful_fetch = datetime.now()
self.programme_store.store_timeslots(response)
self.logger.info(SU.green(f"Finished fetching current programme from API ({len(response)})"))
if len(self.timeslots) == 0:
self.logger.critical("Programme fetched from Steering/Tank has no entries!")
elif response.startswith("fetching_aborted"):
msg = SU.red("Trying to load programme from database only, because fetching was being aborted from AuraCalendarService! Reason: ")
msg = SU.red("Trying to load programme from database only, because fetching was being aborted from ApiFetcher! Reason: ")
self.logger.warning(msg + response[16:])
else:
msg = SU.red("Trying to load programme from database only, because of an unknown response from AuraCalendarService: " + response)
msg = SU.red("Trying to load programme from database only, because of an unknown response from ApiFetcher: " + response)
self.logger.warning(msg)
# Always load latest programme from the database
self.last_successful_fetch = lsf
self.load_programme_from_db()
self.logger.info(SU.green("Finished loading current programme from database (%s timeslots)" % str(len(self.programme))))
for timeslot in self.programme:
self.timeslots = self.programme_store.load_timeslots()
self.logger.info(SU.green("Finished loading current programme from database (%s timeslots)" % str(len(self.timeslots))))
for timeslot in self.timeslots:
self.logger.debug("\tTimeslot %s with Playlist %s" % (str(timeslot), str(timeslot.playlist)))
def load_programme_from_db(self):
"""
Loads the programme from Engine's database and enables them via `self.enable_entries(..)`. After that, the
current message queue is printed to the console.
"""
self.programme = Timeslot.get_timeslots()
if not self.programme:
self.logger.critical(SU.red("Could not load programme from database. We are in big trouble my friend!"))
return
def get_current_entry(self):
"""
Retrieves the current `PlaylistEntry` which should be played as per programme.
......@@ -123,8 +110,8 @@ class ProgrammeService():
now_unix = Engine.engine_time()
# Load programme if necessary
if not self.programme:
self.load_programme_from_db()
if not self.timeslots:
self.refresh()
# Check for current timeslot
current_timeslot = self.get_current_timeslot()
......@@ -167,8 +154,8 @@ class ProgrammeService():
now_unix = Engine.engine_time()
# Iterate over all timeslots and find the one to be played right now
if self.programme:
for timeslot in self.programme:
if self.timeslots:
for timeslot in self.timeslots:
if timeslot.start_unix <= now_unix and now_unix < timeslot.end_unix:
current_timeslot = timeslot
break
......@@ -190,7 +177,7 @@ class ProgrammeService():
now_unix = Engine.engine_time()
next_timeslots = []
for timeslot in self.programme:
for timeslot in self.timeslots:
if timeslot.start_unix > now_unix:
if (len(next_timeslots) < max_count) or max_count == 0:
next_timeslots.append(timeslot)
......@@ -235,105 +222,82 @@ class ProgrammeService():
def terminate(self):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.logger.info("Shutting down programme service ...")
if self.api_fetcher:
self.api_fetcher.terminate()
class ProgrammePersistence(threading.Thread):
class ProgrammeStore():
"""
The `ProgrammePersistence` service retrieves all current schedules and related
The `ProgrammeStore` service retrieves all current schedules and related
playlists including audio files from the configured API endpoints and stores
it in the local database.
To perform the API queries it utilizes the ApiFetcher class.
"""
queue = None
config = None
logger = None
fetched_timeslot_data = None
api_fetcher = None
stop_event = None
def __init__(self):
"""
Initializes the class.
"""
threading.Thread.__init__(self)
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.queue = queue.Queue()
self.stop_event = threading.Event()
self.api_fetcher = ApiFetcher()
def get_queue(self):
"""
Retrieves the queue of fetched data.
def load_timeslots(self):
"""
return self.queue
Loads the programme from the database.
"""
try:
timeslots = Timeslot.get_timeslots(datetime.now())
except Exception as e:
self.logger.critical(SU.red("Could not load programme from database. We are in big trouble my friend!"), e)
return timeslots
def run(self):
def store_timeslots(self, fetched_timeslots):
"""
Fetch calendar data and store it in the database. Also handles local deletion of remotely
deleted schedules.
Returns
Timeslot ([]): An arrar of retrieved timeslots passed via `self.queue`
Stores the fetched timeslots to the database.
"""
result = []
# Check if existing timeslots have been deleted
self.update_deleted_timeslots(fetched_timeslots)
try:
fetched_timeslot_data = self.api_fetcher.fetch()
self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslot_data))
# Process fetched timeslots
for timeslot in fetched_timeslots:
# If nothing is fetched, return
if not fetched_timeslot_data:
self.queue.put("fetching_aborted Nothing fetched")
return
# Check if existing timeslots have been deleted
self.update_deleted_timeslots(fetched_timeslot_data)
# Process fetched timeslots
for timeslot in fetched_timeslot_data:
# Check timeslot for validity
if "start" not in timeslot:
self.logger.warning("No 'start' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
if "end" not in timeslot:
self.logger.warning("No 'end' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
# Store the timeslot
timeslot_db = self.store_timeslot(timeslot)
# Store playlists to play
self.store_playlist(timeslot_db, timeslot_db.playlist_id, timeslot["playlist"])
if timeslot_db.schedule_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.schedule_fallback_id, timeslot["schedule_fallback"])
if timeslot_db.show_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.show_fallback_id, timeslot["show_fallback"])
if timeslot_db.station_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.station_fallback_id, timeslot["station_fallback"])
result.append(timeslot_db)
# Release the mutex
self.queue.put(result)
except Exception as e:
# Release the mutex
self.logger.warning("Fetching aborted due to: %s" % str(e), e)
self.queue.put("fetching_aborted " + str(e))
# Check timeslot for validity
if "start" not in timeslot:
self.logger.warning("No 'start' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
if "end" not in timeslot:
self.logger.warning("No 'end' of timeslot given. Skipping the timeslot: %s " % str(timeslot))
continue
# Store the timeslot
timeslot_db = self.store_timeslot(timeslot)
# terminate the thread
return
# Store playlists to play
self.store_playlist(timeslot_db, timeslot_db.playlist_id, timeslot["playlist"])
if timeslot_db.schedule_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.schedule_fallback_id, timeslot["schedule_fallback"])
if timeslot_db.show_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.show_fallback_id, timeslot["show_fallback"])
if timeslot_db.station_fallback_id:
self.store_playlist(timeslot_db, timeslot_db.station_fallback_id, timeslot["station_fallback"])
def update_deleted_timeslots(self, fetched_timeslot_data):
def update_deleted_timeslots(self, fetched_timeslots):
"""
Checks if some timeslot has been deleted remotely, so delete it locally too.
......@@ -342,7 +306,7 @@ class ProgrammePersistence(threading.Thread):
means no data got retrieved.
Args:
fetched_timeslot_data ([dict]): List of timeslot dictionaries from the API
fetched_timeslots ([dict]): List of timeslot dictionaries from the API
"""
now_unix = SU.timestamp()
scheduling_window_start = self.config.get("scheduling_window_start")
......@@ -355,7 +319,7 @@ class ProgrammePersistence(threading.Thread):
# Filter the local timeslot from the fetched ones
existing_remotely = list(filter(lambda new_timeslot: \
new_timeslot["timeslot_id"] == local_timeslot.timeslot_id, fetched_timeslot_data))
new_timeslot["timeslot_id"] == local_timeslot.timeslot_id, fetched_timeslots))
if not existing_remotely:
# Only allow deletion of timeslots which are deleted before the start of the scheduling window
......@@ -561,5 +525,3 @@ class ProgrammePersistence(threading.Thread):
def stop(self):
self.stop_event.set()
......@@ -368,8 +368,10 @@ class AuraScheduler(threading.Thread):
"""
Called when thread is stopped or a signal to terminate is received.
"""
self.exit_event.set()
self.logger.info("Shutting down scheduler ...")
self.programme.terminate()
self.exit_event.set()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment