# # Aura Engine API (https://gitlab.servus.at/aura/engine-api) # # Copyright (C) 2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import datetime import requests import sqlalchemy from dateutil.parser import parse from aura_engine_api.base.node import NodeType from aura_engine_api.models import ( ActivityLog, ClockInfo, ClockInfoSchema, HealthHistory, HealthHistorySchema, PlayLog, PlayLogSchema, TrackSchema, ) from aura_engine_api.rest import util class ApiService: """ Service handling for API actions. """ config = None logger = None node_type = None sync_host = None main_hosts = None active_source = None api_playlog = None api_healthlog = None api_clockinfo = None def __init__(self, config, logger, node_type): """ Initialize Service. """ self.config = config self.logger = logger # Configured as Sync Node if not node_type == NodeType.MAIN: self.node_type = NodeType.SYNC self.main_hosts = [config.get("main_host_1"), config.get("main_host_2")] if not self.main_hosts[0] and not self.main_hosts[1]: self.logger.warn("Not a single main host defined. Be aware what you are doing.") msg = "No sync possible as no host nodes are configured" else: msg = "Syncing data of hosts '%s'" % (self.main_hosts) # Configured as Main Node else: msg = "" self.node_type = NodeType.MAIN self.sync_host = config.get("sync_host") if not self.sync_host: msg = "No child node for synchronization defined" else: if not self.config.get("enable_federation") == "false": msg = "Pushing data to '%s'" % (self.sync_host) # Set active source source = ActivityLog.get_active_source() if source: self.active_source = source.source_number if not self.active_source: if self.node_type == NodeType.MAIN: source_number = self.config.get("host_id") else: source_number = self.config.get("default_source") self.set_default_source(source_number) self.logger.info("Active source: %s" % self.active_source) # Init Sync API endpoints self.api_playlog = self.config.get("sync_api_store_playlog") self.api_healthlog = self.config.get("sync_api_store_healthlog") self.api_clockinfo = self.config.get("sync_api_store_clockinfo") self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg)) def current_track(self): """ Retrieves the currently playing track. Returns: (JSON) """ track = PlayLog.select_current() track_schema = TrackSchema() return track_schema.dump(track) def list_tracks(self, page=None, size=None, from_time=None, to_time=None): """ Lists track-service entries with pagination. Args: page (Integer): The number of the page to return size (Integer): The numbers of items to return from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") to_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z") Returns: (JSON) """ if not size or size > 50 or size < 1: size = 20 tracklist = PlayLog.paginate(page, size, from_time, to_time, False) tracklist_schema = TrackSchema(many=True) return tracklist_schema.dump(tracklist) def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False): """ Get paginated playlog entries for since the given timestamp. Args: page (Integer): The number of items to skip before starting to collect the result set size (Integer): The numbers of items to return per page from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") to_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z") skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only) Returns: (JSON) """ if not page: page = 0 tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced) tracklist_schema = PlayLogSchema(many=True) return tracklist_schema.dump(tracklist) def store_playlog(self, data, plain_json): """ Stores the passed playlog entry. Args: data (Object): The data to store in the playlog """ if not data.log_source: data.log_source = self.config.get("host_id") # Main Node: Always log entry, independent of the source # Sync Node: Only log entry when it's coming from an active source if self.node_type == NodeType.MAIN or ( self.node_type == NodeType.SYNC and data.log_source == self.active_source ): try: playlog = PlayLog(data) playlog.save() self.logger.debug("Stored playlog for '%s'" % data.track_start) except sqlalchemy.exc.IntegrityError: self.logger.info( "Playlog for '%s' is already existing in local database. Skipping..." % data.track_start ) if self.config.get("enable_federation") == "false": return # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog: try: api_url = self.sync_host + self.api_playlog r = requests.post(api_url, json=plain_json) if r.status_code == 204: self.logger.info( "Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host) ) playlog.is_synced = True playlog.save() else: self.logger.error( "Error while pushing playlog to sync-node: " + str(r.json()) ) except Exception as e: self.logger.error( "Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e)) ) else: self.logger.info("Ditching playlog sent from an inactive source") def get_clock_info(self): """ Retrieves the dataset required to render the studio clock. """ info = ClockInfo.get_info(self.get_active_source()) now = datetime.datetime.now() if "current_timeslot" in info and info["current_timeslot"]["timeslot_end"]: timeslot_end = util.deserialize_datetime(info["current_timeslot"]["timeslot_end"]) if timeslot_end < now: info["current_timeslot"] = None clockinfo_schema = ClockInfoSchema() return clockinfo_schema.dump(info) def set_clock_info(self, data, plain_json): """ Sets the clock info for the given source (engine1, engine2, other). """ if data.engine_source <= 0: return is_existing = False clock_info = ClockInfo.get(self.get_active_source()) if clock_info: is_existing = True else: clock_info = ClockInfo() clock_info.set_info( data.engine_source, data.planned_playlist, data.current_timeslot, data.upcoming_timeslots, ) if is_existing: clock_info.update() else: clock_info.save() if self.config.get("enable_federation") == "false": return # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_clockinfo: try: api_url = self.sync_host + self.api_clockinfo r = requests.put(api_url, json=plain_json) if r.status_code == 204: self.logger.info( "Successfully pushed clock info for '%s' to '%s'" % (clock_info.log_time, self.sync_host) ) else: self.logger.error( "HTTP %s | Error while pushing clock info to sync-node: %s", r.status_code, str(r.json()), ) except Exception as e: self.logger.error( "Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e)) ) def set_default_source(self, source_number): """ Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon first start. Args: source_number (Integer): Number of the default engine """ if ActivityLog.is_empty(): default_source = self.config.get("default_source") epoch_source = ActivityLog(default_source) epoch_source.log_time = parse("2020-02-22 02:20:20") epoch_source.save() self.logger.info( "Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time) ) else: self.set_active_source(source_number) def set_active_source(self, source_number): """ Sets the active source (engine1, engine2, other) identified by its source number. Args: source_number (Integer): Number of the engine """ if source_number > 0: if self.active_source != source_number: self.active_source = source_number activity_log = ActivityLog(source_number) activity_log.save() def get_active_source(self): """ Retrieves number of the currently active source (engine1, engine2, other) Returns: (Integer) """ return self.active_source def get_source_health(self, source_number): """ Retrieves the most recent health info of the requested source Args: source_number (Integer): Number of the play-out source Returns: (JSON) """ health = HealthHistory.get_latest_entry(source_number) health_schema = HealthHistorySchema() return health_schema.dump(health) def log_source_health(self, source_number, data, plain_json): """ Logs an health entry for the given source Args: source_number (Integer): Number of the play-out source data (Object): The data to store in the health log """ if not source_number: source_number = self.host_id # Main Node: Always log entry, independent of the source # Sync Node: Only log entry when it's coming from an active source if self.node_type == NodeType.MAIN or ( self.node_type == NodeType.SYNC and source_number == self.active_source ): healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details) healthlog.save() self.logger.debug("Stored health info for '%s'" % str(source_number)) if self.config.get("enable_federation") == "false": return # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog: # health_schema = HealthHistorySchema() # json_healthlog = health_schema.dump(healthlog) api_url = self.sync_host + self.api_healthlog + "/" + str(source_number) try: r = requests.post(api_url, json=plain_json) if r.status_code == 200: self.logger.info( "Successfully pushed healthlog for source '%s' to '%s'" % (str(source_number), self.sync_host) ) healthlog.is_synced = True healthlog.save() else: self.logger.error( "Error while pushing healthlog to sync-node: " + str(r.json()) ) except Exception as e: self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e) else: self.logger.info("Ditching healthlog sent from an inactive source")