# # Aura Engine API (https://gitlab.servus.at/aura/engine-api) # # Copyright (C) 2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import datetime import requests from dateutil.parser import parse from base.node import NodeType from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema class ApiService(): """ Service handling for API actions. """ config = None logger = None node_type = None sync_host = None main_hosts = None active_source = None api_playlog = None api_healthlog = None def __init__(self, config, logger, node_type): """ Initialize Service. """ self.config = config self.logger = logger # Configured as Sync Node if not node_type == NodeType.MAIN: self.node_type = NodeType.SYNC self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ] if not self.main_hosts[0] and not self.main_hosts[1]: self.logger.warn("Not a single main host defined. Be aware what you are doing.") msg = "No sync possible as no host nodes are configured" else: msg = "Syncing data of hosts '%s'" % (self.main_hosts) # Configured as Main Node else: self.node_type = NodeType.MAIN self.sync_host = config.get("sync_host") if not self.sync_host: msg = "No child node for synchronization defined" else: msg = "Pushing data to '%s'" % (self.sync_host) # Set active source source = ActivityLog.get_active_source() if source: self.active_source = source.source_number if not self.active_source: if self.node_type == NodeType.MAIN: source_number = self.config.get("host_id") else: source_number = self.config.get("default_source") self.set_default_source(source_number) self.logger.info("Active source: %s" % self.active_source) # Init Sync API endpoints self.api_playlog = self.config.get("sync_api_store_playlog") self.api_healthlog = self.config.get("sync_api_store_healthlog") self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg)) def current_track(self): """ Retrieves the currently playing track. Returns: (JSON) """ track = PlayLog.select_current() track_schema = TrackSchema() return track_schema.dump(track) def list_tracks(self, page=None, size=None): """ Lists track-service entries with pagination. Args: page (Integer): The number of the page to return size (Integer): The numbers of items to return Returns: (JSON) """ if not size or size > 50 or size < 1: size = 20 tracklist = PlayLog.paginate(page, size, None, None, False) tracklist_schema = TrackSchema(many=True) return tracklist_schema.dump(tracklist) def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False): """ Get paginated playlog entries for since the given timestamp. Args: page (Integer): The number of items to skip before starting to collect the result set size (Integer): The numbers of items to return per page from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") from_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z") skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only) Returns: (JSON) """ tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced) tracklist_schema = PlayLogSchema(many=True) return tracklist_schema.dump(tracklist) def store_playlog(self, data): """ Stores the passed playlog entry. Args: data (Object): The data to store in the playlog """ if not data.log_source: data.log_source = self.config.get("host_id") # Main Node: Alway log entry, independed of the source # Sync Node: Only log entry when it's coming from an active source if self.node_type == NodeType.MAIN or \ (self.node_type == NodeType.SYNC and data.log_source == self.active_source): try: playlog = PlayLog(data) playlog.save() self.logger.debug("Stored playlog for '%s'" % playlog.track_start) except sqlalchemy.exc.IntegrityError as e: self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % playlog.track_start) # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog: playlog_schema = PlayLogSchema() json_playlog = playlog_schema.dump(playlog) try: api_url = self.sync_host + self.api_playlog r = requests.post(api_url, json=json_playlog) if r.status_code == 204: self.logger.info("Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host)) playlog.is_synced = True playlog.save() else: self.logger.error("Error while pushing playlog to sync-node: " + str(r.json())) except Exception as e: self.logger.error("Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e))) else: self.logger.info("Ditching playlog sent from an inactive source") def clock_info(self): """ Retrieves the dataset required to render the studio clock. """ return "studio clock data" def set_default_source(self, source_number): """ Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon first start Args: source_number (Integer): Number of the default engine """ if ActivityLog.is_empty(): default_source = self.config.get("default_source") epoch_source = ActivityLog(default_source) epoch_source.log_time = parse("2020-02-22 02:20:20") epoch_source.save() self.logger.info("Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time)) else: self.set_active_source(source_number) def set_active_source(self, source_number): """ Sets the active source (engine1, engine2, other) identified by its source number. Args: source_number (Integer): Number of the engine """ if source_number > 0: if self.active_source != source_number: self.active_source = source_number activity_log = ActivityLog(source_number) activity_log.save() def get_active_source(self): """ Retrieves number of the currently active source (engine1, engine2, other) Returns: (Integer) """ return self.active_source def get_source_health(self, source_number): """ Retrieves the most recent health info of the requested source Args: source_number (Integer): Number of the play-out source Returns: (JSON) """ health = HealthHistory.get_latest_entry(source_number) health_schema = HealthHistorySchema() return health_schema.dump(health) def log_source_health(self, source_number, data): """ Logs an health entry for the given source Args: source_number (Integer): Number of the play-out source data (Object): The data to store in the health log """ if not data.log_source: data.log_source = self.host_id # Main Node: Alway log entry, independed of the source # Sync Node: Only log entry when it's coming from an active source if self.node_type == NodeType.MAIN or \ (self.node_type == NodeType.SYNC and data.log_source == self.active_source): healthlog = HealthHistory(data.log_source, data.is_healthy, data.health_info) healthlog.save() self.logger.debug("Stored health info for '%s'" % healthlog.log_source) # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog: health_schema = HealthHistorySchema() json_healthlog = health_schema.dump(healthlog) try: api_url = self.sync_host + api_healthlog + "/" + data.log_source r = requests.post(api_url, json=json_healthlog) if r.status_code == 200: self.logger.info("Successfully pushed healthlog for source '%s' to '%s'" % (healthlog.log_source, self.sync_host)) playlog.is_synced = True playlog.save() else: self.logger.error("Error while pushing healthlog to sync-node: " + str(r.json())) except Exception as e: self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e) else: self.logger.info("Ditching healthlog sent from an inactive source")