From b3a6b193fee09512afd4e28bdb9fdb1905403ba3 Mon Sep 17 00:00:00 2001 From: David Trattnig <david@subsquare.at> Date: Fri, 19 Aug 2022 17:24:46 +0200 Subject: [PATCH] refact: improve fetching as a basis for #100 --- src/aura_engine/scheduling/api.py | 298 +++++++++++------------- src/aura_engine/scheduling/programme.py | 26 +-- src/aura_engine/scheduling/utils.py | 17 ++ 3 files changed, 160 insertions(+), 181 deletions(-) diff --git a/src/aura_engine/scheduling/api.py b/src/aura_engine/scheduling/api.py index 6113e5f1..f7863661 100644 --- a/src/aura_engine/scheduling/api.py +++ b/src/aura_engine/scheduling/api.py @@ -26,6 +26,8 @@ import queue import threading from aura_engine.base.api import SimpleApi +from aura_engine.base.config import AuraConfig +from aura_engine.base.lang import private from aura_engine.base.utils import SimpleUtil as SU from aura_engine.scheduling.utils import TimeslotFilter @@ -38,246 +40,228 @@ class ApiFetcher(threading.Thread): and Tank. """ + API_TIMESLOT_ID = "id" + API_PLID_PLAYLIST = "playlist_id" + API_PLID_SCHEDULE = "schedule_default_playlist_id" + API_PLID_SHOW = "show_default_playlist_id" + MODEL_TIMESLOT_ID = "timeslot_id" + MODEL_PLID_PLAYLIST = "playlist_id" + MODEL_PLID_SCHEDULE = "default_schedule_playlist_id" + MODEL_PLID_SHOW = "default_show_playlist_id" + config = None logging = None queue = None + record_mapping_timeslot = None has_already_fetched = False - fetched_timeslot_data = None stop_event = None api = None # Config for API Endpoints - steering_calendar_url = None - tank_playlist_url = None - tank_session = None - tank_secret = None + url_api_timeslots = None + url_api_playlist = None + tank_headers = None - def __init__(self, config): - self.config = config + def __init__(self): + """ + Initialize the API Fetcher. + """ + self.config = AuraConfig.config() self.logger = logging.getLogger("AuraEngine") self.api = SimpleApi() - self.steering_calendar_url = self.config.get("api_steering_calendar") - self.tank_playlist_url = self.config.get("api_tank_playlist") - self.tank_session = self.config.get("api_tank_session") - self.tank_secret = self.config.get("api_tank_secret") + self.url_api_timeslots = self.config.get("api_steering_calendar") + self.url_api_playlist = self.config.get("api_tank_playlist") self.queue = queue.Queue() self.stop_event = threading.Event() - threading.Thread.__init__(self) - def run(self): - """Fetch timeslot data from the API. + self.record_mapping_timeslot = { + ApiFetcher.MODEL_TIMESLOT_ID: ApiFetcher.API_TIMESLOT_ID, + ApiFetcher.MODEL_PLID_PLAYLIST: ApiFetcher.API_PLID_PLAYLIST, + ApiFetcher.MODEL_PLID_SCHEDULE: ApiFetcher.API_PLID_SCHEDULE, + ApiFetcher.MODEL_PLID_SHOW: ApiFetcher.API_PLID_SHOW, + } - Returns - Timeslot ([dict]): An array of retrieved timeslots dictionary + tank_session = self.config.get("api_tank_session") + tank_secret = self.config.get("api_tank_secret") + self.tank_headers = { + "Authorization": f"Bearer {tank_session}:{tank_secret}", + "content-type": "application/json", + } + + threading.Thread.__init__(self) + def run(self): + """ + Fetch timeslot data from the Steering and Tank API. """ try: - fetched_timeslots = self.fetch() - self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslots)) + self.logger.debug("Fetching timeslots from STEERING") + timeslots = self.get_current_timeslots() + self.logger.debug("Fetching playlists from TANK") + timeslots = self.add_playlists_to_timeslots(timeslots) # If nothing is fetched, return - if not fetched_timeslots: + if not timeslots: self.queue.put("fetching_aborted Nothing fetched") return None # Release the mutex - self.queue.put(fetched_timeslots) + self.queue.put(timeslots) except Exception as e: # Release the mutex - self.logger.warning("Fetching aborted due to: %s" % str(e), e) self.queue.put("fetching_aborted " + str(e)) # Terminate the thread return - # - # METHODS - # - def get_fetched_data(self): - """Retrieve fetched data from the queue.""" + """ + Retrieve fetched data from the queue. + """ return self.queue.get() - def fetch(self): - """Retrieve all required data from the API.""" - return_data = [] - - self.logger.debug("Fetching timeslots from STEERING") - self.fetched_timeslot_data = self.fetch_timeslot_data() - if not self.fetched_timeslot_data: - self.logger.critical(SU.red("No timeslots fetched from API!")) - return None - - for timeslot in self.fetched_timeslot_data: - - if "schedule_default_playlist_id" in timeslot: - timeslot["default_schedule_playlist_id"] = timeslot["schedule_default_playlist_id"] - timeslot["schedule_fallback_id"] = None - if "show_default_playlist_id" in timeslot: - timeslot["default_show_playlist_id"] = timeslot["show_default_playlist_id"] - timeslot["show_fallback_id"] = None - - self.logger.debug("Fetching playlists from TANK") - self.fetch_playlists() + def terminate(self): + """ + Terminate the thread. + """ + self.logger.info(SU.yellow("[ApiFetcher] Shutting down...")) + self.stop_event.set() - try: - for timeslot in self.fetched_timeslot_data: - - # Skip timeslot if no start or end is given - if "start" not in timeslot: - self.logger.warning( - "No start of timeslot given. Skipping timeslot: " + str(timeslot) - ) - timeslot = None - if "end" not in timeslot: - self.logger.warning( - "No end of timeslot given. Skipping timeslot: " + str(timeslot) - ) - timeslot = None - - if timeslot: - return_data.append(timeslot) - except TypeError: - self.logger.error(SU.red("Nothing fetched ...")) - self.fetched_timeslot_data = None - return None + # + # private + # - return return_data + @private + def get_current_timeslots(self): + """ + Fetch timeslot data from Steering. - def fetch_timeslot_data(self): - """Fetch timeslot data from Steering. + This method also: + - Filters invalid and unnecessary timeslots. + - Remaps any API fields to there local modal representation. Returns: - ([Timeslot]): An array of timeslots + ([Timeslot]): A list of timeslots + @private """ timeslots = None - url = self.steering_calendar_url + url = self.url_api_timeslots self.logger.debug("Fetch timeslots from Steering API...") result = self.api.get(url) timeslots = result.json if not timeslots: return None - return self.polish_timeslots(timeslots) - def fetch_playlists(self): - """Fetch all playlists including fallback playlists for every timeslot. + timeslots = self.filter_timeslots(timeslots) + + for t in timeslots: + self.remap_record(t, self.record_mapping_timeslot) + + return timeslots - This method used the class member `fetched_timeslot_data`` to iterate - over and extend timeslot data. + @private + def add_playlists_to_timeslots(self, timeslots) -> list: """ - # store fetched entries => do not have to fetch playlist_id more than once - fetched_entries = [] + Fetch and assign playlists to every timeslots. - try: - for timeslot in self.fetched_timeslot_data: - - # Get IDs of specific, default and fallback playlists - playlist_id = self.get_playlist_id(timeslot, "playlist_id") - default_schedule_playlist_id = self.get_playlist_id( - timeslot, "default_schedule_playlist_id" - ) - default_show_playlist_id = self.get_playlist_id( - timeslot, "default_show_playlist_id" - ) - schedule_fallback_id = self.get_playlist_id(timeslot, "schedule_fallback_id") - show_fallback_id = self.get_playlist_id(timeslot, "show_fallback_id") - station_fallback_id = self.get_playlist_id(timeslot, "station_fallback_id") - - # Retrieve playlist, default and the fallback playlists for every timeslot. - # If a playlist (like station_fallback) is already fetched, it is not fetched again - # but reused - timeslot["playlist"] = self.fetch_playlist(playlist_id, fetched_entries) - timeslot["default_schedule_playlist"] = self.fetch_playlist( - default_schedule_playlist_id, fetched_entries - ) - timeslot["default_show_playlist"] = self.fetch_playlist( - default_show_playlist_id, fetched_entries - ) - timeslot["schedule_fallback"] = self.fetch_playlist( - schedule_fallback_id, fetched_entries - ) - timeslot["show_fallback"] = self.fetch_playlist(show_fallback_id, fetched_entries) - timeslot["station_fallback"] = self.fetch_playlist( - station_fallback_id, fetched_entries - ) + This method retrieve all playlist types per timeslot. If a playlist is already fetched, + it is not fetched again but reused. - except Exception as e: - self.logger.error("Error while fetching playlists from API endpoints: " + str(e), e) + Args: + timeslots([Timeslot]): All timeslots fetched from Steering API + + Returns: + ([Timeslot]): All timeslots with additional playlist records from Tank API + + @private + """ + playlists = [] + + for timeslot in timeslots: + id_playlist = timeslot.get(ApiFetcher.MODEL_PLID_PLAYLIST) + id_schedule = timeslot.get(ApiFetcher.MODEL_PLID_SCHEDULE) + id_show = timeslot.get(ApiFetcher.MODEL_PLID_SHOW) + + timeslot["playlist"] = self.fetch_playlist(id_playlist, playlists) + timeslot["default_schedule_playlist"] = self.fetch_playlist(id_schedule, playlists) + timeslot["default_show_playlist"] = self.fetch_playlist(id_show, playlists) - def fetch_playlist(self, playlist_id, fetched_playlists): - """Fetch the playlist for a given timeslot. + return timeslots + + @private + def fetch_playlist(self, playlist_id: int, fetched_playlists): + """ + Fetch a playlist from Tank. + + If a playlist was already fetched within this round, it uses the existing one. Args: - playlist_id (String): The ID of the playlist - fetched_playlists ([dict]): Previously fetched playlists to avoid re-fetching + playlist_id (int): The ID of the playlist + fetched_playlists (dict): Previously fetched playlists to avoid re-fetching Returns: (Playlist): Playlist for `playlist_id` + @private """ if not playlist_id: return None + playlist_id = str(playlist_id) - playlist = None - url = self.tank_playlist_url.replace("${ID}", playlist_id) - headers = { - "Authorization": "Bearer %s:%s" % (self.tank_session, self.tank_secret), - "content-type": "application/json", - } - - # If playlist is already fetched in this round, use the existing one for playlist in fetched_playlists: if playlist["id"] == playlist_id: - self.logger.debug("Playlist #%s already fetched" % playlist_id) + self.logger.debug(f"Playlist #{playlist_id} already fetched") return playlist + playlist = None + url = self.url_api_playlist.replace("${ID}", playlist_id) self.logger.debug("Fetch playlist from Tank API...") - result = self.api.get(url, headers=headers) + result = self.api.get(url, headers=self.tank_headers) playlist = result.json if playlist: fetched_playlists.append(playlist) return playlist - def get_playlist_id(self, timeslot, id_name): - """Extract the playlist ID for a given playlist (fallback) type. - - Args: - timeslot (dict): The timeslot dictionary - id_name (String): The dictionary key holding the playlist ID - - Returns: - (Integer): The playlist ID - + @private + def filter_timeslots(self, timeslots): """ - if id_name not in timeslot: - return None - - playlist_id = str(timeslot[id_name]) - if not playlist_id or playlist_id == "None": - self.logger.debug(f"Timeslot {timeslot['id']}.{id_name} has no value") - return None - - return playlist_id - - def polish_timeslots(self, timeslots): - """Remove all timeslots which are not relevant for further processing. + Remove all timeslots which are not relevant for further processing. Also, add transparent timeslot ID assignment for more expressive use. + + Args: + timeslots (dict): The timeslots to be filtered + @private """ count_before = len(timeslots) timeslots = TimeslotFilter.filter_24h(timeslots) timeslots = TimeslotFilter.filter_past(timeslots) + timeslots = TimeslotFilter.filter_invalid(timeslots) count_after = len(timeslots) count_removed = count_before - count_after + msg = f"Removed {count_removed} unnecessary timeslots, {count_after} left" self.logger.debug(msg) - - for t in timeslots: - t["timeslot_id"] = t["id"] return timeslots - def terminate(self): - """Terminate the thread.""" - self.logger.info(SU.yellow("[ApiFetcher] Shutting down...")) - self.stop_event.set() + @private + def remap_record(self, record: dict, mapping: dict) -> dict: + """ + Maps certain fields from an API record to the fields of the internal model. + + It changes the record in place, but also returns the updated record. + + Args: + record (dict): The record to apply a re-mapping on + mapping (dict): A dictionary with source (value) and target (key) mappings + + Returns: + dict: The updated record + + @private + """ + for key, value in mapping.items(): + if key != value: + record[key] = record.pop(value) + return record diff --git a/src/aura_engine/scheduling/programme.py b/src/aura_engine/scheduling/programme.py index 63ac1a49..59ee6c9b 100644 --- a/src/aura_engine/scheduling/programme.py +++ b/src/aura_engine/scheduling/programme.py @@ -65,7 +65,7 @@ class ProgrammeService: self.logger.debug("Trying to fetch new programme from API endpoints...") # Create a fetching thread and wait until it is done - self.api_fetcher = ApiFetcher(self.config) + self.api_fetcher = ApiFetcher() self.api_fetcher.start() response = self.api_fetcher.get_fetched_data() @@ -187,7 +187,7 @@ class ProgrammeService: Retrieve the playlist to be scheduled. If no specific playlist is assigned, the default schedule or show playlist is returned. - This method does not respect any defined fallback playlists. + This method does not respect any defined default playlists. Returns: (dict, Playlist): A dictionary holding the playlist type, the currently assigned @@ -332,22 +332,6 @@ class ProgrammeStore: timeslot_db.default_show_playlist_id, timeslot["default_show_playlist"], ) - if timeslot_db.schedule_fallback_id: - self.store_playlist( - timeslot_db, - timeslot_db.schedule_fallback_id, - timeslot["schedule_fallback"], - ) - if timeslot_db.show_fallback_id: - self.store_playlist( - timeslot_db, timeslot_db.show_fallback_id, timeslot["show_fallback"] - ) - if timeslot_db.station_fallback_id: - self.store_playlist( - timeslot_db, - timeslot_db.station_fallback_id, - timeslot["station_fallback"], - ) return timeslots @@ -443,12 +427,6 @@ class ProgrammeStore: timeslot_db.default_schedule_playlist_id = timeslot["default_schedule_playlist_id"] if "default_show_playlist_id" in timeslot: timeslot_db.default_show_playlist_id = timeslot["default_show_playlist_id"] - if "schedule_fallback_id" in timeslot: - timeslot_db.schedule_fallback_id = timeslot["schedule_fallback_id"] - if "show_fallback_id" in timeslot: - timeslot_db.show_fallback_id = timeslot["show_fallback_id"] - if "station_fallback_id" in timeslot: - timeslot_db.station_fallback_id = timeslot["station_fallback_id"] self.logger.debug( SU.pink(f"Store/Update TIMESLOT havetoadd={havetoadd} - data: " + str(timeslot)) diff --git a/src/aura_engine/scheduling/utils.py b/src/aura_engine/scheduling/utils.py index 49f1526a..bbc14954 100644 --- a/src/aura_engine/scheduling/utils.py +++ b/src/aura_engine/scheduling/utils.py @@ -44,6 +44,23 @@ class TimeslotFilter: Filters timeslot dictionaries with various criteria. """ + @staticmethod + def filter_invalid(timeslots) -> bool: + """ + Remove invalid timeslots. + + Args: + timeslots (list): The timeslot record + + Returns: + (list): filtered timeslots + """ + items = [] + for timeslot in timeslots: + if "start" in timeslot and "end" in timeslot: + items.append(timeslot) + return items + @staticmethod def filter_24h(timeslots): """Filter timeslot of the last 24 hours. -- GitLab