diff --git a/config/sample/sample-development.engine-api.ini b/config/sample/sample-development.engine-api.ini index 7c706a5809570c63130c08bf3f25af0ae9f916fb..d112e08b109349dc298a4ce72d218f08c6038974 100644 --- a/config/sample/sample-development.engine-api.ini +++ b/config/sample/sample-development.engine-api.ini @@ -49,4 +49,6 @@ sync_host="http://localhost:8010" ; default_source=1 -sync_api_store_playlog="/api/v1/playlog/store" \ No newline at end of file +# API endpoints to sync data from main to child nodes +sync_api_store_playlog="/api/v1/playlog/store" +sync_api_store_healthlog="/api/v1/source/health" \ No newline at end of file diff --git a/src/models.py b/src/models.py index 9034b0c60d1e57039fceb491683bda6de1cf1282..18bb1d7d911f49c30b624a8f5e72592169bd2aca 100644 --- a/src/models.py +++ b/src/models.py @@ -201,6 +201,7 @@ class ActivityLog(db.Model): self.is_synced = False + @staticmethod def get_active_source(): """ Retrieves the currently active source. @@ -217,7 +218,7 @@ class ActivityLog(db.Model): -class HealthHistoryEntry(db.Model): +class HealthHistory(db.Model): """ Table holding an history of health information for sources. """ @@ -227,10 +228,10 @@ class HealthHistoryEntry(db.Model): log_time = Column(DateTime, primary_key=True) # Columns - source_number = Column(Integer) - is_healthy = Column(Boolean) + log_source = Column(Integer) # The source the history entry relates to + is_healthy = Column(Boolean) # Indicates if source is "healthy enough" to be used for play-out is_synced = Column(Boolean) # Only relevant for main nodes, in a multi-node setup - health_info = Column(String(2048)) + health_info = Column(String(2048)) # Stringified JSON object, or other, if needed def __init__(self, source_number, is_healthy, health_info): @@ -238,14 +239,30 @@ class HealthHistoryEntry(db.Model): Initializes an health entry. """ self.log_time = datetime.datetime.now() - self.source_number = source_number + self.log_source = source_number self.is_healthy = is_healthy self.is_synced = False self.health_info = health_info + @staticmethod + def get_latest_entry(source_number): + """ + Retrieves the most recent health history entry for the given source number. + """ + return db.session.query(HealthHistory).filter(HealthHistory.log_source == source_number).first() + + def save(self): db.session.add(self) db.session.commit() + +class HealthHistorySchema(ma.SQLAlchemyAutoSchema): + """ + Schema for health history entries. + """ + class Meta: + model = HealthHistory + sqla_session = db.session \ No newline at end of file diff --git a/src/rest/controllers/internal_controller.py b/src/rest/controllers/internal_controller.py index 41a73dae8cce87dadf011d0ecb914184c04042c1..7872e79272e9c55d3fe21d6deb61d7a25c2d7395 100644 --- a/src/rest/controllers/internal_controller.py +++ b/src/rest/controllers/internal_controller.py @@ -1,5 +1,6 @@ import connexion import six +import datetime from flask import current_app @@ -72,7 +73,8 @@ def get_source_health(number): # noqa: E501 :rtype: HealthLog """ - return 'do some magic!' + service = current_app.config['SERVICE'] + return service.get_source_health(number) def list_playlog(date_time, page=None, limit=None, skip_synced=None): # noqa: E501 @@ -96,7 +98,7 @@ def list_playlog(date_time, page=None, limit=None, skip_synced=None): # noqa: E return service.list_playlog(page, limit, date_time) -def log_source_health(number): # noqa: E501 +def log_source_health(body, number): # noqa: E501 """Log health info Logs another health entry for the given engine # noqa: E501 @@ -106,7 +108,10 @@ def log_source_health(number): # noqa: E501 :rtype: None """ - return 'do some magic!' + if connexion.request.is_json: + body = HealthLog.from_dict(connexion.request.get_json()) # noqa: E501 + service = current_app.config['SERVICE'] + service.log_source_health(number, body) def set_active_source(number): # noqa: E501 diff --git a/src/rest/swagger/swagger.yaml b/src/rest/swagger/swagger.yaml index 0675ebe79b7f3dc7281bc21f9585bfd4b37ef034..cfea2b75b60b44e1a3393d4ff35ad599a7253256 100644 --- a/src/rest/swagger/swagger.yaml +++ b/src/rest/swagger/swagger.yaml @@ -314,9 +314,14 @@ paths: style: simple explode: false schema: - maximum: 2 minimum: 1 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HealthLog' + required: true responses: "200": description: health info logged diff --git a/src/rest/test/test_internal_controller.py b/src/rest/test/test_internal_controller.py index d63b18bb53a425f633dced75a07177202bd857ab..3213fe9c6268d206ff03eddd7f2932ed04808a57 100644 --- a/src/rest/test/test_internal_controller.py +++ b/src/rest/test/test_internal_controller.py @@ -111,9 +111,12 @@ class TestInternalController(BaseTestCase): Log health info """ + body = HealthLog() response = self.client.open( - '/api/v1/source/health/{number}'.format(number=2), - method='POST') + '/source/health/{number}'.format(number=2), + method='POST', + data=json.dumps(body), + content_type='application/json') self.assert200(response, 'Response body is : ' + response.data.decode('utf-8')) diff --git a/src/service.py b/src/service.py index 1852fdca7347906fe3bde3e22b06afc9f6d9a3b3..f87e6b9d9021a22b782768a8418d3bd765e9035d 100644 --- a/src/service.py +++ b/src/service.py @@ -21,7 +21,7 @@ import datetime import requests from enum import Enum -from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog +from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema class NodeType(Enum): @@ -44,6 +44,9 @@ class ApiService(): sync_host = None main_hosts = None active_source = None + api_playlog = None + api_healthlog = None + def __init__(self, config, logger): """ @@ -78,7 +81,8 @@ class ApiService(): msg = "Pushing data to '%s'" % (self.sync_host) # Set active source - self.active_source = ActivityLog.get_active_source().source_number + source = ActivityLog.get_active_source() + if source: self.active_source = source.source_number if not self.active_source: if self.node_type == NodeType.MAIN: source_number = self.config.get("host_id") @@ -87,8 +91,11 @@ class ApiService(): self.set_active_source(source_number) self.logger.info("Active source: %s" % self.active_source) - self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg)) + # Init Sync API endpoints + self.api_playlog = self.config.get("sync_api_store_playlog") + self.api_healthlog = self.config.get("sync_api_store_healthlog") + self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg)) def current_track(self): @@ -96,7 +103,7 @@ class ApiService(): Retrieves the currently playing track. Returns: - (PlayLogEntry) + (JSON) """ track = PlayLog.select_current() track_schema = TrackSchema() @@ -112,7 +119,7 @@ class ApiService(): size (Integer): The numbers of items to return Returns: - (List[PlayLogEntry]) + (JSON) """ if not size or size > 50 or size < 1: size = 20 tracklist = PlayLog.paginate(page, size, None) @@ -130,7 +137,7 @@ class ApiService(): since_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") Returns: - (List[PlayLogEntry]) + (JSON) """ tracklist = PlayLog.paginate(page, size, since_time) tracklist_schema = TrackSchema(many=True) @@ -141,8 +148,8 @@ class ApiService(): """ Stores the passed playlog entry. - Returns: - (PlayLogEntry) + Args: + data (Object): The data to store in the playlog """ if not data.log_source: data.log_source = self.host_id @@ -157,12 +164,12 @@ class ApiService(): self.logger.debug("Stored playlog for '%s'" % playlog.track_start) # Main Node: Push to Sync Node, if enabled - if self.node_type == NodeType.MAIN and self.sync_host: + if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog: playlog_schema = PlayLogSchema() json_playlog = playlog_schema.dump(playlog) try: - api_url = self.sync_host + self.config.get("sync_api_store_playlog") + api_url = self.sync_host + self.api_playlog r = requests.post(api_url, json=json_playlog) if r.status_code == 200: self.logger.info("Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host)) @@ -175,7 +182,7 @@ class ApiService(): else: self.logger.info("Ditching playlog sent from an inactive source") - + def clock_info(self): """ Retrieves the dataset required to render the studio clock. @@ -216,17 +223,50 @@ class ApiService(): Returns: - (HealthHistoryEntry) + (JSON) """ - return 'do some magic!' + health = HealthHistory.get_latest_entry(source_number) + health_schema = HealthHistorySchema() + return health_schema.dump(health) - def log_source_health(self, source_number): + def log_source_health(self, source_number, data): """ - Logs another health entry for the given source + Logs an health entry for the given source Args: source_number (Integer): Number of the play-out source + data (Object): The data to store in the health log """ - return 'do some magic!' + if not data.log_source: + data.log_source = self.host_id + + # Main Node: Alway log entry, independed of the source + # Sync Node: Only log entry when it's coming from an active source + if self.node_type == NodeType.MAIN or \ + (self.node_type == NodeType.SYNC and data.log_source == self.active_source): + + healthlog = HealthHistory(data.log_source, data.is_healthy, data.health_info) + healthlog.save() + self.logger.debug("Stored health info for '%s'" % healthlog.log_source) + + # Main Node: Push to Sync Node, if enabled + if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog: + + health_schema = HealthHistorySchema() + json_healthlog = health_schema.dump(healthlog) + + try: + api_url = self.sync_host + api_healthlog + "/" + data.log_source + r = requests.post(api_url, json=json_healthlog) + if r.status_code == 200: + self.logger.info("Successfully pushed healthlog for source '%s' to '%s'" % (healthlog.log_source, self.sync_host)) + playlog.is_synced = True + playlog.save() + else: + self.logger.error("Error while pushing healthlog to sync-node: " + r.json()) + except Exception as e: + self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e) + else: + self.logger.info("Ditching healthlog sent from an inactive source")