Skip to content
Snippets Groups Projects
Commit 9ba83127 authored by David Trattnig's avatar David Trattnig
Browse files

Sync Logic.

parent 72d32a5d
No related branches found
No related tags found
No related merge requests found
Pipeline #759 passed
......@@ -47,8 +47,11 @@ sync_host="http://localhost:8010"
; main_host_1="http://localhost:8008"
; main_host_2="http://localhost:8009"
; default_source=1
; sync_interval=3600
; sync_batch_size=100
; sync_step_sleep=0.23
# API endpoints to sync data from main to child nodes
sync_api_store_playlog="/api/v1/playlog/store"
sync_api_store_healthlog="/api/v1/source/health"
\ No newline at end of file
sync_api_store_healthlog="/api/v1/source/health"
sync_api_get_playlog="/api/v1/playlog"
\ No newline at end of file
......@@ -19,23 +19,25 @@
import sys
import os
import atexit
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import connexion
from base.config import AuraConfig
from base.logger import AuraLogger
from rest import encoder
from service import ApiService
from models import db, ma
from base.config import AuraConfig
from base.logger import AuraLogger
from rest import encoder
from service import ApiService
from sync import SyncJob
from models import db, ma
# App Initialization
config = AuraConfig()
logger = AuraLogger(config, "engine-api").logger
sync_job = None
def build_app(app):
app.json_encoder = encoder.JSONEncoder
......@@ -62,6 +64,13 @@ def startup():
api.run(port=port)
def shutdown():
"""
Called when the application shuts down.
"""
sync_job.exit()
with app.app_context():
"""
Initialize Server.
......@@ -69,6 +78,9 @@ with app.app_context():
db.create_all()
service = ApiService(config, logger)
app.config['SERVICE'] = service
sync_job = SyncJob(config, logger, app)
sync_job.start()
atexit.register(shutdown)
logger.info("Engine API server initialized.")
......
......@@ -18,12 +18,13 @@
import datetime
import requests
from enum import Enum
from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema
class NodeType(Enum):
"""
Types of API Server deployment models.
......@@ -61,7 +62,7 @@ class ApiService():
if host_id == 0:
node_type = NodeType.SYNC
# Configured as Sync node
# Configured as Sync Node
if not node_type == NodeType.MAIN:
self.node_type = NodeType.SYNC
self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ]
......@@ -71,7 +72,7 @@ class ApiService():
else:
msg = "Syncing data of hosts '%s'" % (self.main_hosts)
# Configured as Main node
# Configured as Main Node
else:
self.node_type = NodeType.MAIN
self.sync_host = config.get("sync_host")
......@@ -122,24 +123,26 @@ class ApiService():
(JSON)
"""
if not size or size > 50 or size < 1: size = 20
tracklist = PlayLog.paginate(page, size, None)
tracklist = PlayLog.paginate(page, size, None, None, False)
tracklist_schema = TrackSchema(many=True)
return tracklist_schema.dump(tracklist)
def list_playlog(self, page=None, size=None, since_time=None):
def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False):
"""
Get paginated playlog entries for since the given timestamp.
Args:
page (Integer): The number of items to skip before starting to collect the result set
size (Integer): The numbers of items to return per page
since_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z")
from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z")
from_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z")
skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only)
Returns:
(JSON)
"""
tracklist = PlayLog.paginate(page, size, since_time)
tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced)
tracklist_schema = TrackSchema(many=True)
return tracklist_schema.dump(tracklist)
......@@ -270,3 +273,4 @@ class ApiService():
self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e)
else:
self.logger.info("Ditching healthlog sent from an inactive source")
#
# Aura Engine API (https://gitlab.servus.at/aura/engine-api)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
import requests
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import PlayLog, ActivityLog
class SyncJob(threading.Thread):
"""
Job periodically checking for data at one of the main nodes to be synchronized.
This job should be running on a "Sync Node" only.
"""
config = None
logger = None
app = None
exit_event = None
sync_interval = None
sync_batch_size = None
# Create a scoped local database session to be thread safe
Session = None
def __init__(self, config, logger, app):
"""
Initialize Job.
"""
self.config = config
self.logger = logger
self.app = app
threading.Thread.__init__(self)
self.exit_event = threading.Event()
self.sync_interval = self.config.get("sync_interval")
self.sync_batch_size = self.config.get("sync_batch_size")
self.logger.info("Initialized Sync Job - Synchronizing API Nodes every %s seconds and with a max batch-size of %s." % (self.sync_interval, self.sync_batch_size))
engine = create_engine(self.config.get_database_uri())
session_factory = sessionmaker(bind=engine)
self.Session = scoped_session(session_factory)
def run(self):
"""
Starts the Job.
"""
# with self.app.app_context():
self.synchronize()
while not self.exit_event.wait(self.sync_interval):
self.synchronize()
self.logger.info("Sync cycle done.")
def synchronize(self):
"""
Performs the synchronization with the main nodes.
"""
entries = None
synced_entries = 0
unsynced_sources = self.get_unsynced_history()
self.logger.info("\nSynchronization API Nodes: There are %s sources open to be synced." % len(unsynced_sources))
for i in range(len(unsynced_sources)):
source = unsynced_sources[i]
self.logger.info("Syncing source %s which is unsynced since %s" % (source.source_number, source.log_time))
# Store the next source to build a datetime range
next_source = None
if i+1 < len(unsynced_sources):
next_source = unsynced_sources[i+1]
try:
# Get unsynced entries
entries = self.get_unsynced_entries(source, next_source)
while len(entries) > 0:
if not entries: self.logger.info("Retrieved no entries to be synced")
else: self.logger.info("Retrieved %s playlogs to be synced" % len(entries))
# Store unsynced entries locally
for entry in entries:
playlog = PlayLog(entry)
self.Session.add(playlog)
self.Session.commit()
self.logger.info("Stored synced playlog for '%s'" % playlog.track_start)
synced_entries += 1
# Sleep a little to keep the effective load down low
time.sleep(self.config.get("sync_step_sleep"))
# Get some more unsynced entries, if available
entries = self.get_unsynced_entries(source, next_source)
# Update state of the source activity log to `synced=true`
source.is_synced = True
source.save()
self.logger.info("Sync for source %s finalized!" % source.source_number)
# For now, let it be ... if there's more to sync let's do it in the next cycle
self.logger.info("... sucessfully synchronized %s playlogs!\n\n" % synced_entries)
except Exception:
self.logger.info("Error while syncing entries. Maybe there's a connectivity issue. Aborting cycle ...")
return
def get_unsynced_history(self):
"""
Retrieves all sources with un-synced states
"""
self.Session.commit()
unsynced = self.Session.query(ActivityLog).filter(ActivityLog.is_synced == False).\
order_by(ActivityLog.log_time.asc())
return unsynced.all()
def get_unsynced_entries(self, source, next_source):
"""
Retrieve unsynced entries from main node
"""
response = None
url = None
to_time = None
if next_source:
to_time = next_source.log_time
try:
params = { "page": "0", "limit": self.sync_batch_size, "skip_synced": "true", "from": source.log_time, "to": to_time}
url = self.get_url(source.source_number)
response = requests.get(url, params=params)
if response.status_code == 200:
self.logger.info("Successfully retrieved unsynced entries from '%s'" % url)
return response.json()
else:
self.logger.error("Invalid status code while getting unsynced entries from remote API: " + str(response.status_code))
except Exception as e:
self.logger.error("Error while getting unsynced entries from remote API '%s'!" % (url), e)
raise e
def get_url(self, source_number):
"""
Builds an URL for the remote API.
"""
url = self.config.get("main_host_" + str(source_number))
url += self.config.get("sync_api_get_playlog")
return url
def exit(self):
"""
Called when the application shuts down.
"""
self.exit_event.set()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment