Skip to content
Snippets Groups Projects
Commit b3a6b193 authored by David Trattnig's avatar David Trattnig
Browse files

refact: improve fetching as a basis for #100

parent 776d38a9
No related branches found
No related tags found
No related merge requests found
Pipeline #2271 passed
...@@ -26,6 +26,8 @@ import queue ...@@ -26,6 +26,8 @@ import queue
import threading import threading
from aura_engine.base.api import SimpleApi from aura_engine.base.api import SimpleApi
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import private
from aura_engine.base.utils import SimpleUtil as SU from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.utils import TimeslotFilter from aura_engine.scheduling.utils import TimeslotFilter
...@@ -38,246 +40,228 @@ class ApiFetcher(threading.Thread): ...@@ -38,246 +40,228 @@ class ApiFetcher(threading.Thread):
and Tank. and Tank.
""" """
API_TIMESLOT_ID = "id"
API_PLID_PLAYLIST = "playlist_id"
API_PLID_SCHEDULE = "schedule_default_playlist_id"
API_PLID_SHOW = "show_default_playlist_id"
MODEL_TIMESLOT_ID = "timeslot_id"
MODEL_PLID_PLAYLIST = "playlist_id"
MODEL_PLID_SCHEDULE = "default_schedule_playlist_id"
MODEL_PLID_SHOW = "default_show_playlist_id"
config = None config = None
logging = None logging = None
queue = None queue = None
record_mapping_timeslot = None
has_already_fetched = False has_already_fetched = False
fetched_timeslot_data = None
stop_event = None stop_event = None
api = None api = None
# Config for API Endpoints # Config for API Endpoints
steering_calendar_url = None url_api_timeslots = None
tank_playlist_url = None url_api_playlist = None
tank_session = None tank_headers = None
tank_secret = None
def __init__(self, config): def __init__(self):
self.config = config """
Initialize the API Fetcher.
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine") self.logger = logging.getLogger("AuraEngine")
self.api = SimpleApi() self.api = SimpleApi()
self.steering_calendar_url = self.config.get("api_steering_calendar") self.url_api_timeslots = self.config.get("api_steering_calendar")
self.tank_playlist_url = self.config.get("api_tank_playlist") self.url_api_playlist = self.config.get("api_tank_playlist")
self.tank_session = self.config.get("api_tank_session")
self.tank_secret = self.config.get("api_tank_secret")
self.queue = queue.Queue() self.queue = queue.Queue()
self.stop_event = threading.Event() self.stop_event = threading.Event()
threading.Thread.__init__(self)
def run(self): self.record_mapping_timeslot = {
"""Fetch timeslot data from the API. ApiFetcher.MODEL_TIMESLOT_ID: ApiFetcher.API_TIMESLOT_ID,
ApiFetcher.MODEL_PLID_PLAYLIST: ApiFetcher.API_PLID_PLAYLIST,
ApiFetcher.MODEL_PLID_SCHEDULE: ApiFetcher.API_PLID_SCHEDULE,
ApiFetcher.MODEL_PLID_SHOW: ApiFetcher.API_PLID_SHOW,
}
Returns tank_session = self.config.get("api_tank_session")
Timeslot ([dict]): An array of retrieved timeslots dictionary tank_secret = self.config.get("api_tank_secret")
self.tank_headers = {
"Authorization": f"Bearer {tank_session}:{tank_secret}",
"content-type": "application/json",
}
threading.Thread.__init__(self)
def run(self):
"""
Fetch timeslot data from the Steering and Tank API.
""" """
try: try:
fetched_timeslots = self.fetch() self.logger.debug("Fetching timeslots from STEERING")
self.logger.debug("Timeslot data fetched from API: " + str(fetched_timeslots)) timeslots = self.get_current_timeslots()
self.logger.debug("Fetching playlists from TANK")
timeslots = self.add_playlists_to_timeslots(timeslots)
# If nothing is fetched, return # If nothing is fetched, return
if not fetched_timeslots: if not timeslots:
self.queue.put("fetching_aborted Nothing fetched") self.queue.put("fetching_aborted Nothing fetched")
return None return None
# Release the mutex # Release the mutex
self.queue.put(fetched_timeslots) self.queue.put(timeslots)
except Exception as e: except Exception as e:
# Release the mutex # Release the mutex
self.logger.warning("Fetching aborted due to: %s" % str(e), e)
self.queue.put("fetching_aborted " + str(e)) self.queue.put("fetching_aborted " + str(e))
# Terminate the thread # Terminate the thread
return return
#
# METHODS
#
def get_fetched_data(self): def get_fetched_data(self):
"""Retrieve fetched data from the queue.""" """
Retrieve fetched data from the queue.
"""
return self.queue.get() return self.queue.get()
def fetch(self): def terminate(self):
"""Retrieve all required data from the API.""" """
return_data = [] Terminate the thread.
"""
self.logger.debug("Fetching timeslots from STEERING") self.logger.info(SU.yellow("[ApiFetcher] Shutting down..."))
self.fetched_timeslot_data = self.fetch_timeslot_data() self.stop_event.set()
if not self.fetched_timeslot_data:
self.logger.critical(SU.red("No timeslots fetched from API!"))
return None
for timeslot in self.fetched_timeslot_data:
if "schedule_default_playlist_id" in timeslot:
timeslot["default_schedule_playlist_id"] = timeslot["schedule_default_playlist_id"]
timeslot["schedule_fallback_id"] = None
if "show_default_playlist_id" in timeslot:
timeslot["default_show_playlist_id"] = timeslot["show_default_playlist_id"]
timeslot["show_fallback_id"] = None
self.logger.debug("Fetching playlists from TANK")
self.fetch_playlists()
try: #
for timeslot in self.fetched_timeslot_data: # private
#
# Skip timeslot if no start or end is given
if "start" not in timeslot:
self.logger.warning(
"No start of timeslot given. Skipping timeslot: " + str(timeslot)
)
timeslot = None
if "end" not in timeslot:
self.logger.warning(
"No end of timeslot given. Skipping timeslot: " + str(timeslot)
)
timeslot = None
if timeslot:
return_data.append(timeslot)
except TypeError:
self.logger.error(SU.red("Nothing fetched ..."))
self.fetched_timeslot_data = None
return None
return return_data @private
def get_current_timeslots(self):
"""
Fetch timeslot data from Steering.
def fetch_timeslot_data(self): This method also:
"""Fetch timeslot data from Steering. - Filters invalid and unnecessary timeslots.
- Remaps any API fields to there local modal representation.
Returns: Returns:
([Timeslot]): An array of timeslots ([Timeslot]): A list of timeslots
@private
""" """
timeslots = None timeslots = None
url = self.steering_calendar_url url = self.url_api_timeslots
self.logger.debug("Fetch timeslots from Steering API...") self.logger.debug("Fetch timeslots from Steering API...")
result = self.api.get(url) result = self.api.get(url)
timeslots = result.json timeslots = result.json
if not timeslots: if not timeslots:
return None return None
return self.polish_timeslots(timeslots)
def fetch_playlists(self): timeslots = self.filter_timeslots(timeslots)
"""Fetch all playlists including fallback playlists for every timeslot.
for t in timeslots:
self.remap_record(t, self.record_mapping_timeslot)
return timeslots
This method used the class member `fetched_timeslot_data`` to iterate @private
over and extend timeslot data. def add_playlists_to_timeslots(self, timeslots) -> list:
""" """
# store fetched entries => do not have to fetch playlist_id more than once Fetch and assign playlists to every timeslots.
fetched_entries = []
try: This method retrieve all playlist types per timeslot. If a playlist is already fetched,
for timeslot in self.fetched_timeslot_data: it is not fetched again but reused.
# Get IDs of specific, default and fallback playlists
playlist_id = self.get_playlist_id(timeslot, "playlist_id")
default_schedule_playlist_id = self.get_playlist_id(
timeslot, "default_schedule_playlist_id"
)
default_show_playlist_id = self.get_playlist_id(
timeslot, "default_show_playlist_id"
)
schedule_fallback_id = self.get_playlist_id(timeslot, "schedule_fallback_id")
show_fallback_id = self.get_playlist_id(timeslot, "show_fallback_id")
station_fallback_id = self.get_playlist_id(timeslot, "station_fallback_id")
# Retrieve playlist, default and the fallback playlists for every timeslot.
# If a playlist (like station_fallback) is already fetched, it is not fetched again
# but reused
timeslot["playlist"] = self.fetch_playlist(playlist_id, fetched_entries)
timeslot["default_schedule_playlist"] = self.fetch_playlist(
default_schedule_playlist_id, fetched_entries
)
timeslot["default_show_playlist"] = self.fetch_playlist(
default_show_playlist_id, fetched_entries
)
timeslot["schedule_fallback"] = self.fetch_playlist(
schedule_fallback_id, fetched_entries
)
timeslot["show_fallback"] = self.fetch_playlist(show_fallback_id, fetched_entries)
timeslot["station_fallback"] = self.fetch_playlist(
station_fallback_id, fetched_entries
)
except Exception as e: Args:
self.logger.error("Error while fetching playlists from API endpoints: " + str(e), e) timeslots([Timeslot]): All timeslots fetched from Steering API
Returns:
([Timeslot]): All timeslots with additional playlist records from Tank API
@private
"""
playlists = []
for timeslot in timeslots:
id_playlist = timeslot.get(ApiFetcher.MODEL_PLID_PLAYLIST)
id_schedule = timeslot.get(ApiFetcher.MODEL_PLID_SCHEDULE)
id_show = timeslot.get(ApiFetcher.MODEL_PLID_SHOW)
timeslot["playlist"] = self.fetch_playlist(id_playlist, playlists)
timeslot["default_schedule_playlist"] = self.fetch_playlist(id_schedule, playlists)
timeslot["default_show_playlist"] = self.fetch_playlist(id_show, playlists)
def fetch_playlist(self, playlist_id, fetched_playlists): return timeslots
"""Fetch the playlist for a given timeslot.
@private
def fetch_playlist(self, playlist_id: int, fetched_playlists):
"""
Fetch a playlist from Tank.
If a playlist was already fetched within this round, it uses the existing one.
Args: Args:
playlist_id (String): The ID of the playlist playlist_id (int): The ID of the playlist
fetched_playlists ([dict]): Previously fetched playlists to avoid re-fetching fetched_playlists (dict): Previously fetched playlists to avoid re-fetching
Returns: Returns:
(Playlist): Playlist for `playlist_id` (Playlist): Playlist for `playlist_id`
@private
""" """
if not playlist_id: if not playlist_id:
return None return None
playlist_id = str(playlist_id)
playlist = None
url = self.tank_playlist_url.replace("${ID}", playlist_id)
headers = {
"Authorization": "Bearer %s:%s" % (self.tank_session, self.tank_secret),
"content-type": "application/json",
}
# If playlist is already fetched in this round, use the existing one
for playlist in fetched_playlists: for playlist in fetched_playlists:
if playlist["id"] == playlist_id: if playlist["id"] == playlist_id:
self.logger.debug("Playlist #%s already fetched" % playlist_id) self.logger.debug(f"Playlist #{playlist_id} already fetched")
return playlist return playlist
playlist = None
url = self.url_api_playlist.replace("${ID}", playlist_id)
self.logger.debug("Fetch playlist from Tank API...") self.logger.debug("Fetch playlist from Tank API...")
result = self.api.get(url, headers=headers) result = self.api.get(url, headers=self.tank_headers)
playlist = result.json playlist = result.json
if playlist: if playlist:
fetched_playlists.append(playlist) fetched_playlists.append(playlist)
return playlist return playlist
def get_playlist_id(self, timeslot, id_name): @private
"""Extract the playlist ID for a given playlist (fallback) type. def filter_timeslots(self, timeslots):
Args:
timeslot (dict): The timeslot dictionary
id_name (String): The dictionary key holding the playlist ID
Returns:
(Integer): The playlist ID
""" """
if id_name not in timeslot: Remove all timeslots which are not relevant for further processing.
return None
playlist_id = str(timeslot[id_name])
if not playlist_id or playlist_id == "None":
self.logger.debug(f"Timeslot {timeslot['id']}.{id_name} has no value")
return None
return playlist_id
def polish_timeslots(self, timeslots):
"""Remove all timeslots which are not relevant for further processing.
Also, add transparent timeslot ID assignment for more expressive use. Also, add transparent timeslot ID assignment for more expressive use.
Args:
timeslots (dict): The timeslots to be filtered
@private
""" """
count_before = len(timeslots) count_before = len(timeslots)
timeslots = TimeslotFilter.filter_24h(timeslots) timeslots = TimeslotFilter.filter_24h(timeslots)
timeslots = TimeslotFilter.filter_past(timeslots) timeslots = TimeslotFilter.filter_past(timeslots)
timeslots = TimeslotFilter.filter_invalid(timeslots)
count_after = len(timeslots) count_after = len(timeslots)
count_removed = count_before - count_after count_removed = count_before - count_after
msg = f"Removed {count_removed} unnecessary timeslots, {count_after} left" msg = f"Removed {count_removed} unnecessary timeslots, {count_after} left"
self.logger.debug(msg) self.logger.debug(msg)
for t in timeslots:
t["timeslot_id"] = t["id"]
return timeslots return timeslots
def terminate(self): @private
"""Terminate the thread.""" def remap_record(self, record: dict, mapping: dict) -> dict:
self.logger.info(SU.yellow("[ApiFetcher] Shutting down...")) """
self.stop_event.set() Maps certain fields from an API record to the fields of the internal model.
It changes the record in place, but also returns the updated record.
Args:
record (dict): The record to apply a re-mapping on
mapping (dict): A dictionary with source (value) and target (key) mappings
Returns:
dict: The updated record
@private
"""
for key, value in mapping.items():
if key != value:
record[key] = record.pop(value)
return record
...@@ -65,7 +65,7 @@ class ProgrammeService: ...@@ -65,7 +65,7 @@ class ProgrammeService:
self.logger.debug("Trying to fetch new programme from API endpoints...") self.logger.debug("Trying to fetch new programme from API endpoints...")
# Create a fetching thread and wait until it is done # Create a fetching thread and wait until it is done
self.api_fetcher = ApiFetcher(self.config) self.api_fetcher = ApiFetcher()
self.api_fetcher.start() self.api_fetcher.start()
response = self.api_fetcher.get_fetched_data() response = self.api_fetcher.get_fetched_data()
...@@ -187,7 +187,7 @@ class ProgrammeService: ...@@ -187,7 +187,7 @@ class ProgrammeService:
Retrieve the playlist to be scheduled. Retrieve the playlist to be scheduled.
If no specific playlist is assigned, the default schedule or show playlist is returned. If no specific playlist is assigned, the default schedule or show playlist is returned.
This method does not respect any defined fallback playlists. This method does not respect any defined default playlists.
Returns: Returns:
(dict, Playlist): A dictionary holding the playlist type, the currently assigned (dict, Playlist): A dictionary holding the playlist type, the currently assigned
...@@ -332,22 +332,6 @@ class ProgrammeStore: ...@@ -332,22 +332,6 @@ class ProgrammeStore:
timeslot_db.default_show_playlist_id, timeslot_db.default_show_playlist_id,
timeslot["default_show_playlist"], timeslot["default_show_playlist"],
) )
if timeslot_db.schedule_fallback_id:
self.store_playlist(
timeslot_db,
timeslot_db.schedule_fallback_id,
timeslot["schedule_fallback"],
)
if timeslot_db.show_fallback_id:
self.store_playlist(
timeslot_db, timeslot_db.show_fallback_id, timeslot["show_fallback"]
)
if timeslot_db.station_fallback_id:
self.store_playlist(
timeslot_db,
timeslot_db.station_fallback_id,
timeslot["station_fallback"],
)
return timeslots return timeslots
...@@ -443,12 +427,6 @@ class ProgrammeStore: ...@@ -443,12 +427,6 @@ class ProgrammeStore:
timeslot_db.default_schedule_playlist_id = timeslot["default_schedule_playlist_id"] timeslot_db.default_schedule_playlist_id = timeslot["default_schedule_playlist_id"]
if "default_show_playlist_id" in timeslot: if "default_show_playlist_id" in timeslot:
timeslot_db.default_show_playlist_id = timeslot["default_show_playlist_id"] timeslot_db.default_show_playlist_id = timeslot["default_show_playlist_id"]
if "schedule_fallback_id" in timeslot:
timeslot_db.schedule_fallback_id = timeslot["schedule_fallback_id"]
if "show_fallback_id" in timeslot:
timeslot_db.show_fallback_id = timeslot["show_fallback_id"]
if "station_fallback_id" in timeslot:
timeslot_db.station_fallback_id = timeslot["station_fallback_id"]
self.logger.debug( self.logger.debug(
SU.pink(f"Store/Update TIMESLOT havetoadd={havetoadd} - data: " + str(timeslot)) SU.pink(f"Store/Update TIMESLOT havetoadd={havetoadd} - data: " + str(timeslot))
......
...@@ -44,6 +44,23 @@ class TimeslotFilter: ...@@ -44,6 +44,23 @@ class TimeslotFilter:
Filters timeslot dictionaries with various criteria. Filters timeslot dictionaries with various criteria.
""" """
@staticmethod
def filter_invalid(timeslots) -> bool:
"""
Remove invalid timeslots.
Args:
timeslots (list): The timeslot record
Returns:
(list): filtered timeslots
"""
items = []
for timeslot in timeslots:
if "start" in timeslot and "end" in timeslot:
items.append(timeslot)
return items
@staticmethod @staticmethod
def filter_24h(timeslots): def filter_24h(timeslots):
"""Filter timeslot of the last 24 hours. """Filter timeslot of the last 24 hours.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment