diff --git a/config/sample/sample-development.engine-api.ini b/config/sample/sample-development.engine-api.ini index d112e08b109349dc298a4ce72d218f08c6038974..a87c1cfe7ebc7b2be9a3150e4b688a6fdf481035 100644 --- a/config/sample/sample-development.engine-api.ini +++ b/config/sample/sample-development.engine-api.ini @@ -47,8 +47,11 @@ sync_host="http://localhost:8010" ; main_host_1="http://localhost:8008" ; main_host_2="http://localhost:8009" ; default_source=1 - +; sync_interval=3600 +; sync_batch_size=100 +; sync_step_sleep=0.23 # API endpoints to sync data from main to child nodes sync_api_store_playlog="/api/v1/playlog/store" -sync_api_store_healthlog="/api/v1/source/health" \ No newline at end of file +sync_api_store_healthlog="/api/v1/source/health" +sync_api_get_playlog="/api/v1/playlog" \ No newline at end of file diff --git a/src/app.py b/src/app.py index 5100588fd12c5ddb3dd0f265a9e4ebea9a4689d9..3b27786bb78f50dedcdeb6520c3fa3880b4d1d1b 100644 --- a/src/app.py +++ b/src/app.py @@ -19,23 +19,25 @@ import sys import os +import atexit sys.path.append(os.path.join(os.path.dirname(__file__), '..')) import connexion -from base.config import AuraConfig -from base.logger import AuraLogger -from rest import encoder -from service import ApiService - -from models import db, ma +from base.config import AuraConfig +from base.logger import AuraLogger +from rest import encoder +from service import ApiService +from sync import SyncJob +from models import db, ma # App Initialization config = AuraConfig() logger = AuraLogger(config, "engine-api").logger +sync_job = None def build_app(app): app.json_encoder = encoder.JSONEncoder @@ -62,6 +64,13 @@ def startup(): api.run(port=port) +def shutdown(): + """ + Called when the application shuts down. + """ + sync_job.exit() + + with app.app_context(): """ Initialize Server. @@ -69,6 +78,9 @@ with app.app_context(): db.create_all() service = ApiService(config, logger) app.config['SERVICE'] = service + sync_job = SyncJob(config, logger, app) + sync_job.start() + atexit.register(shutdown) logger.info("Engine API server initialized.") diff --git a/src/service.py b/src/service.py index f87e6b9d9021a22b782768a8418d3bd765e9035d..095e1c14d9e57672269faeae95df81fb37195de3 100644 --- a/src/service.py +++ b/src/service.py @@ -18,12 +18,13 @@ import datetime -import requests from enum import Enum + from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema + class NodeType(Enum): """ Types of API Server deployment models. @@ -61,7 +62,7 @@ class ApiService(): if host_id == 0: node_type = NodeType.SYNC - # Configured as Sync node + # Configured as Sync Node if not node_type == NodeType.MAIN: self.node_type = NodeType.SYNC self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ] @@ -71,7 +72,7 @@ class ApiService(): else: msg = "Syncing data of hosts '%s'" % (self.main_hosts) - # Configured as Main node + # Configured as Main Node else: self.node_type = NodeType.MAIN self.sync_host = config.get("sync_host") @@ -122,24 +123,26 @@ class ApiService(): (JSON) """ if not size or size > 50 or size < 1: size = 20 - tracklist = PlayLog.paginate(page, size, None) + tracklist = PlayLog.paginate(page, size, None, None, False) tracklist_schema = TrackSchema(many=True) return tracklist_schema.dump(tracklist) - def list_playlog(self, page=None, size=None, since_time=None): + def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False): """ Get paginated playlog entries for since the given timestamp. Args: page (Integer): The number of items to skip before starting to collect the result set size (Integer): The numbers of items to return per page - since_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") + from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z") + from_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z") + skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only) Returns: (JSON) """ - tracklist = PlayLog.paginate(page, size, since_time) + tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced) tracklist_schema = TrackSchema(many=True) return tracklist_schema.dump(tracklist) @@ -270,3 +273,4 @@ class ApiService(): self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e) else: self.logger.info("Ditching healthlog sent from an inactive source") + diff --git a/src/sync.py b/src/sync.py new file mode 100644 index 0000000000000000000000000000000000000000..49aaaccc548d7a56530dc6e0951a73553fbfe29c --- /dev/null +++ b/src/sync.py @@ -0,0 +1,190 @@ +# +# Aura Engine API (https://gitlab.servus.at/aura/engine-api) +# +# Copyright (C) 2020 - The Aura Engine Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import threading +import requests + +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import sessionmaker +from sqlalchemy import create_engine + +from models import PlayLog, ActivityLog + + + + + +class SyncJob(threading.Thread): + """ + Job periodically checking for data at one of the main nodes to be synchronized. + + This job should be running on a "Sync Node" only. + """ + + config = None + logger = None + app = None + exit_event = None + sync_interval = None + sync_batch_size = None + + # Create a scoped local database session to be thread safe + Session = None + + + def __init__(self, config, logger, app): + """ + Initialize Job. + """ + self.config = config + self.logger = logger + self.app = app + + threading.Thread.__init__(self) + self.exit_event = threading.Event() + self.sync_interval = self.config.get("sync_interval") + self.sync_batch_size = self.config.get("sync_batch_size") + self.logger.info("Initialized Sync Job - Synchronizing API Nodes every %s seconds and with a max batch-size of %s." % (self.sync_interval, self.sync_batch_size)) + + + engine = create_engine(self.config.get_database_uri()) + session_factory = sessionmaker(bind=engine) + self.Session = scoped_session(session_factory) + + + def run(self): + """ + Starts the Job. + """ + + # with self.app.app_context(): + + self.synchronize() + while not self.exit_event.wait(self.sync_interval): + self.synchronize() + self.logger.info("Sync cycle done.") + + + def synchronize(self): + """ + Performs the synchronization with the main nodes. + """ + entries = None + synced_entries = 0 + unsynced_sources = self.get_unsynced_history() + self.logger.info("\nSynchronization API Nodes: There are %s sources open to be synced." % len(unsynced_sources)) + + for i in range(len(unsynced_sources)): + + source = unsynced_sources[i] + self.logger.info("Syncing source %s which is unsynced since %s" % (source.source_number, source.log_time)) + + # Store the next source to build a datetime range + next_source = None + if i+1 < len(unsynced_sources): + next_source = unsynced_sources[i+1] + + try: + # Get unsynced entries + entries = self.get_unsynced_entries(source, next_source) + + while len(entries) > 0: + if not entries: self.logger.info("Retrieved no entries to be synced") + else: self.logger.info("Retrieved %s playlogs to be synced" % len(entries)) + + # Store unsynced entries locally + for entry in entries: + playlog = PlayLog(entry) + self.Session.add(playlog) + self.Session.commit() + + self.logger.info("Stored synced playlog for '%s'" % playlog.track_start) + synced_entries += 1 + + # Sleep a little to keep the effective load down low + time.sleep(self.config.get("sync_step_sleep")) + + # Get some more unsynced entries, if available + entries = self.get_unsynced_entries(source, next_source) + + # Update state of the source activity log to `synced=true` + source.is_synced = True + source.save() + self.logger.info("Sync for source %s finalized!" % source.source_number) + + # For now, let it be ... if there's more to sync let's do it in the next cycle + self.logger.info("... sucessfully synchronized %s playlogs!\n\n" % synced_entries) + + except Exception: + self.logger.info("Error while syncing entries. Maybe there's a connectivity issue. Aborting cycle ...") + return + + + def get_unsynced_history(self): + """ + Retrieves all sources with un-synced states + """ + self.Session.commit() + unsynced = self.Session.query(ActivityLog).filter(ActivityLog.is_synced == False).\ + order_by(ActivityLog.log_time.asc()) + return unsynced.all() + + + def get_unsynced_entries(self, source, next_source): + """ + Retrieve unsynced entries from main node + """ + response = None + url = None + to_time = None + if next_source: + to_time = next_source.log_time + + try: + params = { "page": "0", "limit": self.sync_batch_size, "skip_synced": "true", "from": source.log_time, "to": to_time} + url = self.get_url(source.source_number) + response = requests.get(url, params=params) + if response.status_code == 200: + self.logger.info("Successfully retrieved unsynced entries from '%s'" % url) + return response.json() + else: + self.logger.error("Invalid status code while getting unsynced entries from remote API: " + str(response.status_code)) + except Exception as e: + self.logger.error("Error while getting unsynced entries from remote API '%s'!" % (url), e) + raise e + + + def get_url(self, source_number): + """ + Builds an URL for the remote API. + """ + url = self.config.get("main_host_" + str(source_number)) + url += self.config.get("sync_api_get_playlog") + return url + + + def exit(self): + """ + Called when the application shuts down. + """ + self.exit_event.set() + + + +