diff --git a/README.md b/README.md index d99b3c6e193947846f57a3538f2f70c5eba577b5..c511c06ff1e71b06ef44c8ec2c9aea42e9a36fd9 100644 --- a/README.md +++ b/README.md @@ -57,14 +57,21 @@ To run the server on a Docker container, please execute the following from the r Adding some entry to the playlog: ```bash -curl -d '{ "track_start": "2020-06-25 15:38:44", "track_artist": "Aphex Twin", "track_title": "Windowlicker", "log_source": 1 }' -H "Content-Type: application/json" -X POST http://localhost:8008/api/v1/playlog/store +curl -d '{ "track_start": "2020-06-27 19:14:00", "track_artist": "Mazzie Star", "track_title": "Fade Into You", "log_source": 1 }' -H "Content-Type: application/json" -X POST http://localhost:8008/api/v1/playlog/store ``` -The attribute `log_source` is a numeric presentation of the engine (`1` or `2`) where this log entry -is coming from. If you are using additional audio sources, such as automated music player, you can -define your own numeric id for such device. +This newly added entry can be queried using your browser in one of the following ways: -All other API endpoints are listed in the interactive documentation: +```bash +# Get the latest entry +http://localhost:8008/api/v1/trackservice/current +# Get a set of the most recent entries +http://localhost:8008/api/v1/trackservice/ +# Filter some specific page (the three most recent entries) +http://localhost:8008/api/v1/trackservice?page=0&limit=3 +``` + +All other API endpoints are listed in the interactive documentation. ```bash http://localhost:8008/api/v1/ui/ diff --git a/src/models.py b/src/models.py index 9f315565020a3150b75fee126705bd245c06a503..c42621b813ad9ab3a1999d68330a553948eee189 100644 --- a/src/models.py +++ b/src/models.py @@ -94,13 +94,13 @@ class PlayLog(db.Model): def q(page=0, page_size=None): query = db.session.query(PlayLog).order_by(PlayLog.track_start.desc()) if isinstance(from_time, datetime.datetime): - if isinstance(to_time, datetime.datetime): - query = query.filter(PlayLog.track_start >= from_time and PlayLog.track_start < to_time) - else: - query = query.filter(PlayLog.track_start >= from_time) + query = query.filter(PlayLog.track_start >= from_time.isoformat(' ', 'seconds')) + if isinstance(to_time, datetime.datetime): + query = query.filter(PlayLog.track_start <= to_time.isoformat(' ', 'seconds')) if skip_synced == True: - query = query.filter(not PlayLog.is_synced) + query = query.filter(PlayLog.is_synced == False) listen(query, 'before_compile', apply_limit(page, page_size), retval=True) + print("Paginate Query: " + str(query)) return query def apply_limit(page, page_size): @@ -215,6 +215,15 @@ class ActivityLog(db.Model): self.is_synced = False + @staticmethod + def is_empty(): + """ + Checks if the tables is empty. + """ + db.session.commit() + return not db.session.query(ActivityLog).one_or_none() + + @staticmethod def get_active_source(): """ diff --git a/src/service.py b/src/service.py index a37d0958ed73a1b2515d8eba1e4d99e70b2e5f6e..9ae980765d1d30668f75c307d92a36af5f533e7e 100644 --- a/src/service.py +++ b/src/service.py @@ -20,6 +20,8 @@ import datetime import requests +from dateutil.parser import parse + from base.node import NodeType from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema @@ -74,7 +76,7 @@ class ApiService(): source_number = self.config.get("host_id") else: source_number = self.config.get("default_source") - self.set_active_source(source_number) + self.set_default_source(source_number) self.logger.info("Active source: %s" % self.active_source) # Init Sync API endpoints @@ -181,6 +183,23 @@ class ApiService(): return "studio clock data" + def set_default_source(self, source_number): + """ + Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon first start + + Args: + source_number (Integer): Number of the default engine + """ + if ActivityLog.is_empty(): + default_source = self.config.get("default_source") + epoch_source = ActivityLog(default_source) + epoch_source.log_time = parse("2020-02-22 02:20:20") + epoch_source.save() + self.logger.info("Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time)) + else: + self.set_active_source(source_number) + + def set_active_source(self, source_number): """ Sets the active source (engine1, engine2, other) identified by its source number. diff --git a/src/sync.py b/src/sync.py index 07a0a30461d80c5947cde7092a0c94da72a9597c..e2c0cb15b6deab6c3eba4bcc73cda7dddf46455b 100644 --- a/src/sync.py +++ b/src/sync.py @@ -19,12 +19,15 @@ import threading import requests +import time -from sqlalchemy.orm import scoped_session -from sqlalchemy.orm import sessionmaker -from sqlalchemy import create_engine +from sqlalchemy import create_engine +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import IntegrityError, InvalidRequestError -from models import PlayLog, ActivityLog +from rest.models.play_log import PlayLog as PlayLogAPI +from models import PlayLog, PlayLogSchema, ActivityLog @@ -64,7 +67,8 @@ class SyncJob(threading.Thread): engine = create_engine(self.config.get_database_uri()) session_factory = sessionmaker(autoflush=True, autocommit=False, bind=engine) self.Session = scoped_session(session_factory) - + + def run(self): """ @@ -72,8 +76,11 @@ class SyncJob(threading.Thread): """ self.synchronize() while not self.exit_event.wait(self.sync_interval): - self.synchronize() - self.logger.info("Sync cycle done.") + try: + self.synchronize() + self.logger.info("Sync cycle done.\n\n") + except Exception: + self.logger.info("Error while syncing entries. Maybe there's some connectivity issue. Aborting cycle ...") def synchronize(self): @@ -83,10 +90,9 @@ class SyncJob(threading.Thread): entries = None synced_entries = 0 unsynced_sources = self.get_unsynced_history() - self.logger.info("\nSynchronization API Nodes: There are %s sources open to be synced." % len(unsynced_sources)) + self.logger.info("Synchronization of API Nodes: There are %s sources open to be synced." % len(unsynced_sources)) for i in range(len(unsynced_sources)): - source = unsynced_sources[i] self.logger.info("Syncing source %s which is unsynced since %s" % (source.source_number, source.log_time)) @@ -94,48 +100,72 @@ class SyncJob(threading.Thread): next_source = None if i+1 < len(unsynced_sources): next_source = unsynced_sources[i+1] - + else: + next_source = self.create_next_source_log(source) + + # Get unsynced entries + page = 0 + entries = self.get_unsynced_entries(source, next_source, page) + + while entries and len(entries) > 0: + if not entries: self.logger.info("Retrieved no entries to be synced") + else: self.logger.info("Retrieved %s playlogs to be synced" % len(entries)) + + # Store unsynced entries locally + for entry in entries: + + try: + self.Session.begin_nested() + entry = PlayLogAPI.from_dict(entry) + playlog = PlayLog(entry) + playlog.is_synced = True + self.db_save(playlog) + self.logger.info("Stored synced playlog for '%s'" % playlog.track_start) + synced_entries += 1 + except (IntegrityError, InvalidRequestError) as e: + self.Session.rollback() + self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % playlog.track_start) + + + # Sleep a little to keep the effective load down low + time.sleep(self.config.get("sync_step_sleep")) + + # Get some more unsynced entries, if available + page += 1 + entries = self.get_unsynced_entries(source, next_source, page) + + # Update state of the source activity log to `synced=true` try: - # Get unsynced entries - entries = self.get_unsynced_entries(source, next_source) - - while len(entries) > 0: - if not entries: self.logger.info("Retrieved no entries to be synced") - else: self.logger.info("Retrieved %s playlogs to be synced" % len(entries)) - - # Store unsynced entries locally - for entry in entries: - - try: - playlog = PlayLog(entry) - self.Session.add(playlog) - self.Session.commit() - self.logger.info("Stored synced playlog for '%s'" % playlog.track_start) - synced_entries += 1 - except sqlalchemy.exc.IntegrityError as e: - self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % playlog.track_start) - - # Sleep a little to keep the effective load down low - time.sleep(self.config.get("sync_step_sleep")) - - # Get some more unsynced entries, if available - entries = self.get_unsynced_entries(source, next_source) - - # Update state of the source activity log to `synced=true` - try: - source.is_synced = True - self.Session.add(source) - self.Session.commit() - self.logger.info("Sync for source %s finalized!" % source.source_number) - except Exception as e: - self.logger.error("Cannot finalize sync state for source=%s - Reason: %s" % (source.source_number, str(e))) - - # For now, let it be ... if there's more to sync let's do it in the next cycle - self.logger.info("... sucessfully synchronized %s playlogs!\n\n" % synced_entries) + source.is_synced = True + self.db_save(source) + self.logger.info("Sync for source %s:%s finalized!" % (source.source_number, source.log_time)) + except Exception as e: + self.logger.error("Cannot finalize sync state for source=%s:%s - Reason: %s" % (source.source_number, source.log_time, str(e))) - except Exception: - self.logger.info("Error while syncing entries. Maybe there's a connectivity issue. Aborting cycle ...") - return + # For now, let it be ... if there's more to sync let's do it in the next cycle + self.logger.info("... successfully synchronized %s playlogs!" % synced_entries) + + + + def create_next_source_log(self, current_source): + """ + Create and store the next source in the ActivityLog. It's actually the same, + as the current, but acts as a references for the current sync and as a marker + for the entrypoint of the next sync -> to avoid unneccessary sync cycles. + """ + next_source = ActivityLog(current_source.source_number) + self.Session.add(next_source) + return next_source + + + def db_save(self, model): + """ + Store some object to the database using the local, scoped session. + """ + self.Session.add(model) + self.Session.flush() + self.Session.commit() + def get_unsynced_history(self): @@ -148,7 +178,7 @@ class SyncJob(threading.Thread): return unsynced.all() - def get_unsynced_entries(self, source, next_source): + def get_unsynced_entries(self, source, next_source, page): """ Retrieve unsynced entries from main node """ @@ -159,16 +189,18 @@ class SyncJob(threading.Thread): to_time = next_source.log_time try: - params = { "page": "0", "limit": self.sync_batch_size, "skip_synced": "true", "from_date": source.log_time, "to_date": to_time} + params = { "page": page, "limit": self.sync_batch_size, "skip_synced": "true", "from_date": source.log_time, "to_date": to_time} url = self.get_url(source.source_number) response = requests.get(url, params=params) if response.status_code == 200: - self.logger.info("Successfully retrieved unsynced entries from '%s'" % url) + self.logger.info("Response from '%s' OK (200)" % url) return response.json() else: - self.logger.error("Invalid status code while getting unsynced entries from remote API: " + str(response.status_code)) + msg = "Invalid status code while getting unsynced entries from remote API: " + str(response.status_code) + self.logger.warn(msg) + raise Exception(msg) except Exception as e: - self.logger.error("Error while getting unsynced entries from remote API '%s'!\n%s" % (url, str(e))) + self.logger.warn("Error while getting unsynced entries from remote API '%s'!\n%s" % (url, str(e))) raise e