From 27fba39f2a5867833d3a984edb0901c152b9273c Mon Sep 17 00:00:00 2001 From: Lars Kruse <devel@sumpfralle.de> Date: Mon, 14 Mar 2022 20:00:45 +0100 Subject: [PATCH] style: apply black and isort style --- meta.py | 2 +- setup.py | 12 +- src/app.py | 49 ++++---- src/base/config.py | 29 ++--- src/base/logger.py | 19 ++- src/base/node.py | 14 ++- src/models.py | 287 +++++++++++++++++++++++++-------------------- src/service.py | 165 ++++++++++++++++---------- src/sync.py | 123 +++++++++++-------- 9 files changed, 407 insertions(+), 293 deletions(-) diff --git a/meta.py b/meta.py index 1d232ff..b2857d9 100644 --- a/meta.py +++ b/meta.py @@ -7,4 +7,4 @@ __version__ = "0.9.0" __version_info__ = (0, 9, 0) __maintainer__ = "David Trattnig" __email__ = "david.trattnig@subsquare.at" -__status__ = "Development" \ No newline at end of file +__status__ = "Development" diff --git a/setup.py b/setup.py index f9c96b8..412590f 100644 --- a/setup.py +++ b/setup.py @@ -17,9 +17,9 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. - import sys -from setuptools import setup, find_packages + +from setuptools import find_packages, setup NAME = "engine-api" VERSION = "0.1.0" @@ -42,12 +42,10 @@ setup( keywords=["OpenAPI", "AURA Engine API"], install_requires=REQUIRES, packages=find_packages(), - package_data={'': ['src/rest/swagger/swagger.yaml']}, + package_data={"": ["src/rest/swagger/swagger.yaml"]}, include_package_data=True, - entry_points={ - 'console_scripts': ['src.app=src.app.__main__:main'] - }, + entry_points={"console_scripts": ["src.app=src.app.__main__:main"]}, long_description="""\ This is the AURA Engine API. Read more at https://gitlab.servus.at/aura/engine. - """ + """, ) diff --git a/src/app.py b/src/app.py index 18e3042..fba8d53 100644 --- a/src/app.py +++ b/src/app.py @@ -17,22 +17,22 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import sys -import os import atexit +import os +import sys -sys.path.append(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) import connexion import meta -from base.config import AuraConfig -from base.logger import AuraLogger -from base.node import NodeType -from rest import encoder -from service import ApiService -from sync import SyncJob -from models import db, ma +from base.config import AuraConfig +from base.logger import AuraLogger +from base.node import NodeType +from models import db, ma +from rest import encoder +from service import ApiService +from sync import SyncJob # Read command line parameters @@ -48,28 +48,34 @@ config = AuraConfig(config_file) logger = AuraLogger(config, "engine-api").logger sync_job = None + def build_app(app): app.json_encoder = encoder.JSONEncoder app.config["SQLALCHEMY_DATABASE_URI"] = config.get_database_uri() - app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False - app.config['ENV'] = "development" - app.config['FLASK_ENV'] = "development" + app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + app.config["ENV"] = "development" + app.config["FLASK_ENV"] = "development" if config.get("debug_flask") == "true": - app.config['DEBUG'] = True + app.config["DEBUG"] = True db.init_app(app) ma.init_app(app) return app -api = connexion.App(__name__, specification_dir='rest/swagger', arguments={'title': 'AURA Engine API'}) -api.add_api('swagger.yaml', pythonic_params=True) + +api = connexion.App( + __name__, specification_dir="rest/swagger", arguments={"title": "AURA Engine API"} +) +api.add_api("swagger.yaml", pythonic_params=True) app = build_app(api.app) + @app.after_request def after_request(response): header = response.headers - header['Access-Control-Allow-Origin'] = config.get("api_cors") + header["Access-Control-Allow-Origin"] = config.get("api_cors") return response + def startup(): """ Startup Server. @@ -98,7 +104,7 @@ with app.app_context(): node_type = NodeType.SYNC service = ApiService(config, logger, node_type) - app.config['SERVICE'] = service + app.config["SERVICE"] = service # Run sync job only in SYNC NODE mode if node_type == NodeType.SYNC: @@ -115,9 +121,12 @@ with app.app_context(): federation = "enabled" if config.get("enable_federation") == "false": federation = "disabled" - splash = "\n\n â–‘Eâ–‘Nâ–‘Gâ–‘Iâ–‘Nâ–‘Eâ–‘â–‘â–‘Aâ–‘Pâ–‘Iâ–‘ - v%s running as %s node - Federation %s.\n\n" % (meta.__version__, type, federation) + splash = ( + "\n\n â–‘Eâ–‘Nâ–‘Gâ–‘Iâ–‘Nâ–‘Eâ–‘â–‘â–‘Aâ–‘Pâ–‘Iâ–‘ - v%s running as %s node - Federation %s.\n\n" + % (meta.__version__, type, federation) + ) logger.info(splash) -if __name__ == '__main__': +if __name__ == "__main__": startup() diff --git a/src/base/config.py b/src/base/config.py index c5547a9..51bdc56 100644 --- a/src/base/config.py +++ b/src/base/config.py @@ -17,13 +17,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. +import logging import os import os.path import sys -import logging - -from pathlib import Path from configparser import ConfigParser +from pathlib import Path class AuraConfig: @@ -32,10 +31,10 @@ class AuraConfig: Holds the Aura Configuration as in the file `engine-api.ini`. """ + ini_path = "" logger = None - def __init__(self, ini_path="/etc/aura/engine-api.ini"): """ Initializes the configuration, defaults to `/etc/aura/engine.ini`. @@ -45,7 +44,9 @@ class AuraConfig: Args: ini_path(String): The path to the configuration file `engine-api.ini` """ - default_ini_path = "%s/config/engine-api.ini" % Path(__file__).parent.parent.parent.absolute() + default_ini_path = ( + "%s/config/engine-api.ini" % Path(__file__).parent.parent.parent.absolute() + ) if ini_path: config_file = Path(ini_path) @@ -58,8 +59,6 @@ class AuraConfig: self.logger = logging.getLogger("AuraEngineApi") self.load_config() - - def set(self, key, value): """ Setter for some specific config property. @@ -73,8 +72,6 @@ class AuraConfig: except: self.__dict__[key] = str(value) - - def get(self, key, default=None): """ Getter for some specific config property. @@ -87,7 +84,9 @@ class AuraConfig: if default: self.set(key, default) else: - self.logger.warning("Key " + key + " not found in configfile " + self.ini_path + "!") + self.logger.warning( + "Key " + key + " not found in configfile " + self.ini_path + "!" + ) return None if key == "loglevel": @@ -112,8 +111,6 @@ class AuraConfig: value = os.path.expandvars(value) return value - - def load_config(self): """ Set config defaults and load settings from file @@ -123,7 +120,7 @@ class AuraConfig: sys.exit(1) # Read the file - f = open(self.ini_path, 'r') + f = open(self.ini_path, "r") ini_str = f.read() f.close() @@ -137,15 +134,13 @@ class AuraConfig: for section in config_parser.sections(): for key, value in config_parser.items(section): - v = config_parser.get(section, key).replace('"', '').strip() + v = config_parser.get(section, key).replace('"', "").strip() self.set(key, v) # Custom overrides and defaults self.set("install_dir", os.path.realpath(__file__ + "..")) self.set("api_prefix", "/api/v1") - - def get_database_uri(self): """ Retrieves the database connection string. @@ -165,4 +160,4 @@ class AuraConfig: # "db_name" is expected to be either a relative or an absolute path to the sqlite file return f"sqlite:///{db_name}.db" else: - return f"Error: invalid database type '{db_type}'" \ No newline at end of file + return f"Error: invalid database type '{db_type}'" diff --git a/src/base/logger.py b/src/base/logger.py index 3825a92..2dd11b9 100644 --- a/src/base/logger.py +++ b/src/base/logger.py @@ -22,25 +22,24 @@ import logging from .config import AuraConfig -class AuraLogger(): +class AuraLogger: """ AuraLogger Class Logger for all Aura Engine components. The default logger is `AuraEngine`. Other loggers are defined - by passing a custom name on instantiation. - + by passing a custom name on instantiation. + The logger respects the log-level as defined in the engine's configuration file. """ + config = None logger = None - - def __init__(self, config, name="AuraEngine"): """ - Constructor to create a new logger defined by + Constructor to create a new logger defined by the passed name. Args: @@ -49,8 +48,6 @@ class AuraLogger(): self.config = config self.__create_logger(name) - - def __create_logger(self, name): """ Creates the logger instance for the given name. @@ -66,7 +63,9 @@ class AuraLogger(): if not self.logger.hasHandlers(): # create file handler for logger - file_handler = logging.FileHandler(self.config.get("logdir") + "/"+name+".log") + file_handler = logging.FileHandler( + self.config.get("logdir") + "/" + name + ".log" + ) file_handler.setLevel(lvl) # create stream handler for logger @@ -89,4 +88,4 @@ class AuraLogger(): self.logger.debug("Added handlers to logger") else: - self.logger.debug("Reused logger") \ No newline at end of file + self.logger.debug("Reused logger") diff --git a/src/base/node.py b/src/base/node.py index 4d8ee8d..c371205 100644 --- a/src/base/node.py +++ b/src/base/node.py @@ -1,4 +1,3 @@ - # # Aura Engine API (https://gitlab.servus.at/aura/engine-api) # @@ -20,13 +19,20 @@ from enum import Enum -from models import PlayLog, PlayLogSchema, TrackSchema, ActivityLog, HealthHistory, HealthHistorySchema - +from models import ( + ActivityLog, + HealthHistory, + HealthHistorySchema, + PlayLog, + PlayLogSchema, + TrackSchema, +) class NodeType(Enum): """ Types of API Server deployment models. """ + MAIN = "main" - SYNC = "sync" \ No newline at end of file + SYNC = "sync" diff --git a/src/models.py b/src/models.py index 42b7a2c..af75c76 100644 --- a/src/models.py +++ b/src/models.py @@ -1,4 +1,3 @@ - # # Aura Engine API (https://gitlab.servus.at/aura/engine-api) # @@ -21,11 +20,11 @@ import datetime import json -from sqlalchemy import create_engine, Column, DateTime, String, Integer, Boolean -from sqlalchemy.event import listen -from flask_sqlalchemy import SQLAlchemy -from flask_marshmallow import Marshmallow -from marshmallow import Schema, fields, post_dump +from flask_marshmallow import Marshmallow +from flask_sqlalchemy import SQLAlchemy +from marshmallow import Schema, fields, post_dump +from sqlalchemy import Boolean, Column, DateTime, Integer, String, create_engine +from sqlalchemy.event import listen db = SQLAlchemy() ma = Marshmallow() @@ -35,65 +34,66 @@ class PlayLog(db.Model): """ Table holding play-log entries. """ - __tablename__ = 'playlog' + + __tablename__ = "playlog" # Primary Key - track_start = Column(DateTime, primary_key=True) + track_start = Column(DateTime, primary_key=True) # Columns - track_artist = Column(String(256)) - track_album = Column(String(256)) - track_title = Column(String(256)) - track_duration = Column(Integer) - track_type = Column(Integer) - track_num = Column(Integer) - playlist_id = Column(Integer) - timeslot_id = Column(Integer) - show_id = Column(Integer) - show_name = Column(String(256)) - log_source = Column(Integer) # The play-out source which this log is coming from (e.g. engine1, engine2) - is_synced = Column(Boolean) # Only relevant for main nodes, in a multi-node setup - - + track_artist = Column(String(256)) + track_album = Column(String(256)) + track_title = Column(String(256)) + track_duration = Column(Integer) + track_type = Column(Integer) + track_num = Column(Integer) + playlist_id = Column(Integer) + timeslot_id = Column(Integer) + show_id = Column(Integer) + show_name = Column(String(256)) + log_source = Column( + Integer + ) # The play-out source which this log is coming from (e.g. engine1, engine2) + is_synced = Column(Boolean) # Only relevant for main nodes, in a multi-node setup def __init__(self, data): """ Initializes a trackservice entry """ - self.track_start = data.track_start - self.track_artist = data.track_artist - self.track_album = data.track_album - self.track_title = data.track_title - self.track_duration = data.track_duration - self.track_type = data.track_type - self.track_num = data.track_num - self.playlist_id = data.playlist_id - self.timeslot_id = data.timeslot_id - self.show_id = data.show_id - self.show_name = data.show_name - self.log_source = data.log_source - self.is_synced = False + self.track_start = data.track_start + self.track_artist = data.track_artist + self.track_album = data.track_album + self.track_title = data.track_title + self.track_duration = data.track_duration + self.track_type = data.track_type + self.track_num = data.track_num + self.playlist_id = data.playlist_id + self.timeslot_id = data.timeslot_id + self.show_id = data.show_id + self.show_name = data.show_name + self.log_source = data.log_source + self.is_synced = False if not self.track_duration: self.track_duration = 0 - def save(self): db.session.add(self) db.session.commit() - @staticmethod def get(start_time): """ Selects the playlog identified by start time. """ db.session.commit() - track = db.session.query(PlayLog).\ - filter(PlayLog.track_start <= str(start_time)).\ - order_by(PlayLog.track_start.desc()).first() + track = ( + db.session.query(PlayLog) + .filter(PlayLog.track_start <= str(start_time)) + .order_by(PlayLog.track_start.desc()) + .first() + ) return track - @staticmethod def select_recent(): """ @@ -102,14 +102,15 @@ class PlayLog(db.Model): db.session.commit() now = datetime.datetime.now() - track = db.session.query(PlayLog).\ - order_by(PlayLog.track_start.desc()).\ - filter(PlayLog.track_start <= str(now)).first() + track = ( + db.session.query(PlayLog) + .order_by(PlayLog.track_start.desc()) + .filter(PlayLog.track_start <= str(now)) + .first() + ) return track - - @staticmethod def select_current(): """ @@ -131,7 +132,6 @@ class PlayLog(db.Model): return None - @staticmethod def select_for_timeslot(timeslot_id): """ @@ -147,10 +147,12 @@ class PlayLog(db.Model): # Invalid Timeslot ID if timeslot_id == -1: playlogs = [] - result = db.session.query(PlayLog).\ - order_by(PlayLog.track_start.desc()).\ - filter(PlayLog.track_start >= str(before12h)).\ - limit(50) + result = ( + db.session.query(PlayLog) + .order_by(PlayLog.track_start.desc()) + .filter(PlayLog.track_start >= str(before12h)) + .limit(50) + ) for playlog in result.all(): if playlog.timeslot_id != -1: break @@ -158,14 +160,15 @@ class PlayLog(db.Model): # Valid Timeslot ID else: - result = db.session.query(PlayLog).\ - order_by(PlayLog.track_start.desc()).\ - filter(PlayLog.timeslot_id == timeslot_id) + result = ( + db.session.query(PlayLog) + .order_by(PlayLog.track_start.desc()) + .filter(PlayLog.timeslot_id == timeslot_id) + ) playlogs = result.all() return playlogs - @staticmethod def paginate(page, page_size, from_time=None, to_time=None, skip_synced=False): """ @@ -179,12 +182,16 @@ class PlayLog(db.Model): def q(page=0, page_size=None): query = db.session.query(PlayLog).order_by(PlayLog.track_start.desc()) if isinstance(from_time, datetime.datetime): - query = query.filter(PlayLog.track_start >= from_time.isoformat(' ', 'seconds')) + query = query.filter( + PlayLog.track_start >= from_time.isoformat(" ", "seconds") + ) if isinstance(to_time, datetime.datetime): - query = query.filter(PlayLog.track_start <= to_time.isoformat(' ', 'seconds')) + query = query.filter( + PlayLog.track_start <= to_time.isoformat(" ", "seconds") + ) if skip_synced == True: query = query.filter(PlayLog.is_synced == False) - listen(query, 'before_compile', apply_limit(page, page_size), retval=True) + listen(query, "before_compile", apply_limit(page, page_size), retval=True) print("Paginate Query: " + str(query)) return query @@ -195,11 +202,11 @@ class PlayLog(db.Model): if page: query = query.offset(page * page_size) return query + return wrapped return q(page, page_size) - @staticmethod def select_last_hours(n): """ @@ -207,11 +214,14 @@ class PlayLog(db.Model): """ db.session.commit() last_hours = datetime.datetime.today() - datetime.timedelta(hours=n) - tracks = db.session.query(PlayLog).filter(PlayLog.track_start >= str(last_hours)).\ - order_by(PlayLog.track_start.desc()).all() + tracks = ( + db.session.query(PlayLog) + .filter(PlayLog.track_start >= str(last_hours)) + .order_by(PlayLog.track_start.desc()) + .all() + ) return tracks - @staticmethod def select_by_day(day): """ @@ -219,32 +229,44 @@ class PlayLog(db.Model): """ db.session.commit() day_plus_one = day + datetime.timedelta(days=1) - tracks = db.session.query(PlayLog).\ - filter(PlayLog.track_start >= str(day), PlayLog.track_start < str(day_plus_one)).\ - order_by(PlayLog.track_start.desc()).all() + tracks = ( + db.session.query(PlayLog) + .filter( + PlayLog.track_start >= str(day), PlayLog.track_start < str(day_plus_one) + ) + .order_by(PlayLog.track_start.desc()) + .all() + ) return tracks - @staticmethod def select_by_range(from_day, to_day): """ Selects the track-service items for a day range. """ db.session.commit() - tracks = db.session.query(PlayLog).filter(PlayLog.track_start >= str(from_day),\ - PlayLog.track_start < str(to_day)).order_by(PlayLog.track_start.desc()).all() + tracks = ( + db.session.query(PlayLog) + .filter( + PlayLog.track_start >= str(from_day), PlayLog.track_start < str(to_day) + ) + .order_by(PlayLog.track_start.desc()) + .all() + ) return tracks - def __str__(self): - return "Track [track_start: %s, track_title: %s]" % (str(self.track_start), str(self.track_title)) - + return "Track [track_start: %s, track_title: %s]" % ( + str(self.track_start), + str(self.track_title), + ) class PlayLogSchema(ma.SQLAlchemyAutoSchema): """ Schema for playlog entries. """ + class Meta: model = PlayLog sqla_session = db.session @@ -254,8 +276,7 @@ class PlayLogSchema(ma.SQLAlchemyAutoSchema): @post_dump def remove_skip_values(self, data, many=False): return { - key: value for key, value in data.items() - if value not in self.SKIP_VALUES + key: value for key, value in data.items() if value not in self.SKIP_VALUES } @@ -263,6 +284,7 @@ class TrackSchema(ma.SQLAlchemySchema): """ Schema for trackservice entries. """ + class Meta: model = PlayLog sqla_session = db.session @@ -277,9 +299,8 @@ class TrackSchema(ma.SQLAlchemySchema): "playlist_id", "timeslot_id", "show_id", - "show_name" - ) - + "show_name", + ) class ActivityLog(db.Model): @@ -288,15 +309,15 @@ class ActivityLog(db.Model): Only used in "SYNC" deployment mode. """ - __tablename__ = 'activity_log' + + __tablename__ = "activity_log" # Primary Key - log_time = Column(DateTime, primary_key=True) + log_time = Column(DateTime, primary_key=True) # Columns - source_number = Column(Integer) - is_synced = Column(Boolean) - + source_number = Column(Integer) + is_synced = Column(Boolean) def __init__(self, source_number): """ @@ -306,7 +327,6 @@ class ActivityLog(db.Model): self.source_number = source_number self.is_synced = False - @staticmethod def is_empty(): """ @@ -315,39 +335,39 @@ class ActivityLog(db.Model): db.session.commit() return not db.session.query(ActivityLog).one_or_none() - @staticmethod def get_active_source(): """ Retrieves the currently active source. """ db.session.commit() - source = db.session.query(ActivityLog).\ - order_by(ActivityLog.log_time.desc()).first() + source = ( + db.session.query(ActivityLog).order_by(ActivityLog.log_time.desc()).first() + ) return source - def save(self): db.session.add(self) db.session.commit() - class HealthHistory(db.Model): """ Table holding an history of health information for sources. """ - __tablename__ = 'health_history' + + __tablename__ = "health_history" # Primary Key - log_time = Column(DateTime, primary_key=True) + log_time = Column(DateTime, primary_key=True) # Columns - log_source = Column(Integer) # The source the history entry relates to - is_healthy = Column(Boolean) # Indicates if source is "healthy enough" to be used for play-out - is_synced = Column(Boolean) # Only relevant for main nodes, in a multi-node setup - health_info = Column(String(4096)) # Stringified JSON object or other, if needed - + log_source = Column(Integer) # The source the history entry relates to + is_healthy = Column( + Boolean + ) # Indicates if source is "healthy enough" to be used for play-out + is_synced = Column(Boolean) # Only relevant for main nodes, in a multi-node setup + health_info = Column(String(4096)) # Stringified JSON object or other, if needed def __init__(self, source_number, log_time, is_healthy, health_info): """ @@ -359,32 +379,33 @@ class HealthHistory(db.Model): self.is_synced = False self.health_info = health_info - @staticmethod def get_latest_entry(source_number): """ Retrieves the most recent health history entry for the given source number. """ - return db.session.query(HealthHistory).filter(HealthHistory.log_source == source_number).\ - order_by(HealthHistory.log_time.desc()).first() - + return ( + db.session.query(HealthHistory) + .filter(HealthHistory.log_source == source_number) + .order_by(HealthHistory.log_time.desc()) + .first() + ) def save(self): db.session.add(self) db.session.commit() - class HealthHistorySchema(ma.SQLAlchemyAutoSchema): """ Schema for health history entries. """ + class Meta: model = HealthHistory sqla_session = db.session - class ClockInfo(db.Model): """ Table holding information for the current and next show to be displayed by the studio clock. @@ -396,26 +417,35 @@ class ClockInfo(db.Model): The stringified objects allow easy, future extension of the properties, without the need to change the database model. """ - __tablename__ = 'clock_info' + + __tablename__ = "clock_info" # Primary Key - log_source = Column(Integer, primary_key=True) # The source this entry was updated from ("1" for engine1, "2" for engine2) + log_source = Column( + Integer, primary_key=True + ) # The source this entry was updated from ("1" for engine1, "2" for engine2) # Columns - log_time = Column(DateTime) - current_track = None # Populated live from within `get_info(..)` - planned_playlist = Column(String(4096)) # Stringified "#/components/schemas/Playlist" OpenAPI JSON object - current_timeslot = Column(String(2048)) # Stringified "#/components/schemas/Timeslot" OpenAPI JSON object - next_timeslot = Column(String(2048)) # Stringified "#/components/schemas/Timeslot" OpenAPI JSON object - + log_time = Column(DateTime) + current_track = None # Populated live from within `get_info(..)` + planned_playlist = Column( + String(4096) + ) # Stringified "#/components/schemas/Playlist" OpenAPI JSON object + current_timeslot = Column( + String(2048) + ) # Stringified "#/components/schemas/Timeslot" OpenAPI JSON object + next_timeslot = Column( + String(2048) + ) # Stringified "#/components/schemas/Timeslot" OpenAPI JSON object def __init__(self): """ Initializes an clock info entry. """ - - def set_info(self, source_number, planned_playlist, current_timeslot, next_timeslot): + def set_info( + self, source_number, planned_playlist, current_timeslot, next_timeslot + ): """ Sets the values for a clock info entry. """ @@ -434,14 +464,16 @@ class ClockInfo(db.Model): else: self.next_timeslot = None - @staticmethod def get(source_number): """ Retrieves the clock info for the given source number. """ - return db.session.query(ClockInfo).filter(ClockInfo.log_source == source_number).first() - + return ( + db.session.query(ClockInfo) + .filter(ClockInfo.log_source == source_number) + .first() + ) @staticmethod def get_info(source_number): @@ -450,7 +482,11 @@ class ClockInfo(db.Model): """ track_schema = TrackSchema() info = dict() - data = db.session.query(ClockInfo).filter(ClockInfo.log_source == source_number).first() + data = ( + db.session.query(ClockInfo) + .filter(ClockInfo.log_source == source_number) + .first() + ) current_track = PlayLog.select_current() planned_playlist_id = -1 playlogs = None @@ -478,11 +514,16 @@ class ClockInfo(db.Model): most_recent_track = PlayLog.select_recent() # Is the most recent track part of the current timeslot? - if most_recent_track.timeslot_id == info["current_timeslot"]["timeslot_id"]: + if ( + most_recent_track.timeslot_id + == info["current_timeslot"]["timeslot_id"] + ): # Get the actual playlogs of the current timeslot, until now playlog_schema = PlayLogSchema(many=True) - playlogs = PlayLog.select_for_timeslot(most_recent_track.timeslot_id) + playlogs = PlayLog.select_for_timeslot( + most_recent_track.timeslot_id + ) playlogs.sort(key=lambda track: track.track_start, reverse=False) info["current_playlogs"] = playlog_schema.dump(playlogs) if info["current_playlogs"] == None: @@ -491,7 +532,9 @@ class ClockInfo(db.Model): # Invalid timeslots (e.g. in fallback scenarios) get a virtual start date of the first fallback track if info["current_timeslot"]["timeslot_id"] == -1: if playlogs and playlogs[0]: - info["current_timeslot"]["timeslot_start"] = playlogs[0].track_start + info["current_timeslot"]["timeslot_start"] = playlogs[ + 0 + ].track_start # Get the next timeslot if data.next_timeslot: @@ -499,26 +542,22 @@ class ClockInfo(db.Model): else: info["next_timeslot"] = {} - return info - - def save(self): db.session.add(self) db.session.commit() - def update(self): db.session.merge(self) db.session.commit() - class ClockInfoSchema(ma.SQLAlchemySchema): """ Schema for trackservice entries. """ + class Meta: model = ClockInfo sqla_session = db.session @@ -529,5 +568,5 @@ class ClockInfoSchema(ma.SQLAlchemySchema): "planned_playlist", "current_playlogs", "current_timeslot", - "next_timeslot" - ) \ No newline at end of file + "next_timeslot", + ) diff --git a/src/service.py b/src/service.py index f343932..0c0e829 100644 --- a/src/service.py +++ b/src/service.py @@ -18,21 +18,27 @@ import datetime -import requests import json -import sqlalchemy +import requests +import sqlalchemy from dateutil.parser import parse from base.node import NodeType +from models import ( + ActivityLog, + ClockInfo, + ClockInfoSchema, + HealthHistory, + HealthHistorySchema, + PlayLog, + PlayLogSchema, + TrackSchema, +) from rest import util -from models import \ - PlayLog, PlayLogSchema, TrackSchema, ActivityLog, \ - ClockInfo, ClockInfoSchema, HealthHistory, HealthHistorySchema - -class ApiService(): +class ApiService: """ Service handling for API actions. """ @@ -47,7 +53,6 @@ class ApiService(): api_healthlog = None api_clockinfo = None - def __init__(self, config, logger, node_type): """ Initialize Service. @@ -58,9 +63,11 @@ class ApiService(): # Configured as Sync Node if not node_type == NodeType.MAIN: self.node_type = NodeType.SYNC - self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ] + self.main_hosts = [config.get("main_host_1"), config.get("main_host_2")] if not self.main_hosts[0] and not self.main_hosts[1]: - self.logger.warn("Not a single main host defined. Be aware what you are doing.") + self.logger.warn( + "Not a single main host defined. Be aware what you are doing." + ) msg = "No sync possible as no host nodes are configured" else: msg = "Syncing data of hosts '%s'" % (self.main_hosts) @@ -74,12 +81,12 @@ class ApiService(): msg = "No child node for synchronization defined" else: if not self.config.get("enable_federation") == "false": - msg = "Pushing data to '%s'" % (self.sync_host) - + msg = "Pushing data to '%s'" % (self.sync_host) # Set active source source = ActivityLog.get_active_source() - if source: self.active_source = source.source_number + if source: + self.active_source = source.source_number if not self.active_source: if self.node_type == NodeType.MAIN: source_number = self.config.get("host_id") @@ -95,10 +102,9 @@ class ApiService(): self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg)) - - def current_track(self): + def current_track(self): """ - Retrieves the currently playing track. + Retrieves the currently playing track. Returns: (JSON) @@ -107,8 +113,7 @@ class ApiService(): track_schema = TrackSchema() return track_schema.dump(track) - - def list_tracks(self, page=None, size=None, from_time=None, to_time=None): + def list_tracks(self, page=None, size=None, from_time=None, to_time=None): """ Lists track-service entries with pagination. @@ -120,15 +125,17 @@ class ApiService(): Returns: (JSON) """ - if not size or size > 50 or size < 1: size = 20 + if not size or size > 50 or size < 1: + size = 20 tracklist = PlayLog.paginate(page, size, from_time, to_time, False) tracklist_schema = TrackSchema(many=True) return tracklist_schema.dump(tracklist) - - def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False): + def list_playlog( + self, page=None, size=None, from_time=None, to_time=None, skip_synced=False + ): """ - Get paginated playlog entries for since the given timestamp. + Get paginated playlog entries for since the given timestamp. Args: page (Integer): The number of items to skip before starting to collect the result set @@ -140,13 +147,13 @@ class ApiService(): Returns: (JSON) """ - if not page: page = 0 + if not page: + page = 0 tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced) tracklist_schema = PlayLogSchema(many=True) return tracklist_schema.dump(tracklist) - - def store_playlog(self, data, plain_json): + def store_playlog(self, data, plain_json): """ Stores the passed playlog entry. @@ -158,68 +165,85 @@ class ApiService(): # Main Node: Alway log entry, independed of the source # Sync Node: Only log entry when it's coming from an active source - if self.node_type == NodeType.MAIN or \ - (self.node_type == NodeType.SYNC and data.log_source == self.active_source): + if self.node_type == NodeType.MAIN or ( + self.node_type == NodeType.SYNC and data.log_source == self.active_source + ): try: playlog = PlayLog(data) playlog.save() self.logger.debug("Stored playlog for '%s'" % data.track_start) except sqlalchemy.exc.IntegrityError as e: - self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % data.track_start) + self.logger.info( + "Playlog for '%s' is already existing in local database. Skipping..." + % data.track_start + ) if self.config.get("enable_federation") == "false": return - + # Main Node: Push to Sync Node, if enabled if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog: try: api_url = self.sync_host + self.api_playlog r = requests.post(api_url, json=plain_json) if r.status_code == 204: - self.logger.info("Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host)) + self.logger.info( + "Successfully pushed playlog for '%s' to '%s'" + % (playlog.track_start, self.sync_host) + ) playlog.is_synced = True playlog.save() else: - self.logger.error("Error while pushing playlog to sync-node: " + str(r.json())) + self.logger.error( + "Error while pushing playlog to sync-node: " + str(r.json()) + ) except Exception as e: - self.logger.error("Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e))) + self.logger.error( + "Error while posting to sync-node API '%s'!\n%s" + % (api_url, str(e)) + ) else: self.logger.info("Ditching playlog sent from an inactive source") - def get_clock_info(self): """ - Retrieves the dataset required to render the studio clock. + Retrieves the dataset required to render the studio clock. """ info = ClockInfo.get_info(self.get_active_source()) now = datetime.datetime.now() if "current_timeslot" in info and info["current_timeslot"]["timeslot_end"]: - timeslot_end = util.deserialize_datetime(info["current_timeslot"]["timeslot_end"]) + timeslot_end = util.deserialize_datetime( + info["current_timeslot"]["timeslot_end"] + ) if timeslot_end < now: info["current_timeslot"] = None - + clockinfo_schema = ClockInfoSchema() return clockinfo_schema.dump(info) - def set_clock_info(self, data, plain_json): """ Sets the clock info for the given source (engine1, engine2, other). """ if data.engine_source <= 0: return - + is_existing = False - clock_info = ClockInfo.get(self.get_active_source()) - if clock_info: + clock_info = ClockInfo.get(self.get_active_source()) + if clock_info: is_existing = True else: clock_info = ClockInfo() - clock_info.set_info(data.engine_source, data.planned_playlist, data.current_timeslot, data.next_timeslot) - + clock_info.set_info( + data.engine_source, + data.planned_playlist, + data.current_timeslot, + data.next_timeslot, + ) + if is_existing: clock_info.update() else: @@ -234,12 +258,20 @@ class ApiService(): api_url = self.sync_host + self.api_clockinfo r = requests.put(api_url, json=plain_json) if r.status_code == 204: - self.logger.info("Successfully pushed clock info for '%s' to '%s'" % (clock_info.log_time, self.sync_host)) + self.logger.info( + "Successfully pushed clock info for '%s' to '%s'" + % (clock_info.log_time, self.sync_host) + ) else: - self.logger.error("HTTP %s | Error while pushing clock info to sync-node: " % (r.status_code, str(r.json()))) + self.logger.error( + "HTTP %s | Error while pushing clock info to sync-node: " + % (r.status_code, str(r.json())) + ) except Exception as e: - self.logger.error("Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e))) - + self.logger.error( + "Error while putting clock info to sync-node API '%s'!\n%s" + % (api_url, str(e)) + ) def set_default_source(self, source_number): """ @@ -253,11 +285,13 @@ class ApiService(): epoch_source = ActivityLog(default_source) epoch_source.log_time = parse("2020-02-22 02:20:20") epoch_source.save() - self.logger.info("Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time)) + self.logger.info( + "Created API epoch source %s:%s" + % (epoch_source.source_number, epoch_source.log_time) + ) else: self.set_active_source(source_number) - def set_active_source(self, source_number): """ Sets the active source (engine1, engine2, other) identified by its source number. @@ -271,7 +305,6 @@ class ApiService(): activity_log = ActivityLog(source_number) activity_log.save() - def get_active_source(self): """ Retrieves number of the currently active source (engine1, engine2, other) @@ -281,7 +314,6 @@ class ApiService(): """ return self.active_source - def get_source_health(self, source_number): """ Retrieves the most recent health info of the requested source @@ -297,7 +329,6 @@ class ApiService(): health_schema = HealthHistorySchema() return health_schema.dump(health) - def log_source_health(self, source_number, data, plain_json): """ Logs an health entry for the given source @@ -312,10 +343,13 @@ class ApiService(): # Main Node: Alway log entry, independed of the source # Sync Node: Only log entry when it's coming from an active source - if self.node_type == NodeType.MAIN or \ - (self.node_type == NodeType.SYNC and source_number == self.active_source): + if self.node_type == NodeType.MAIN or ( + self.node_type == NodeType.SYNC and source_number == self.active_source + ): - healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details) + healthlog = HealthHistory( + source_number, data.log_time, data.is_healthy, data.details + ) healthlog.save() self.logger.debug("Stored health info for '%s'" % str(source_number)) @@ -323,22 +357,33 @@ class ApiService(): return # Main Node: Push to Sync Node, if enabled - if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog: + if ( + self.node_type == NodeType.MAIN + and self.sync_host + and self.api_healthlog + ): # health_schema = HealthHistorySchema() # json_healthlog = health_schema.dump(healthlog) api_url = self.sync_host + self.api_healthlog + "/" + str(source_number) - try: + try: r = requests.post(api_url, json=plain_json) if r.status_code == 200: - self.logger.info("Successfully pushed healthlog for source '%s' to '%s'" % (str(source_number), self.sync_host)) + self.logger.info( + "Successfully pushed healthlog for source '%s' to '%s'" + % (str(source_number), self.sync_host) + ) healthlog.is_synced = True healthlog.save() else: - self.logger.error("Error while pushing healthlog to sync-node: " + str(r.json())) + self.logger.error( + "Error while pushing healthlog to sync-node: " + + str(r.json()) + ) except Exception as e: - self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e) + self.logger.error( + "Error while posting to sync-node API '%s'!" % (api_url), e + ) else: self.logger.info("Ditching healthlog sent from an inactive source") - diff --git a/src/sync.py b/src/sync.py index 52b5224..cceaba3 100644 --- a/src/sync.py +++ b/src/sync.py @@ -18,18 +18,15 @@ import threading -import requests import time -from sqlalchemy import create_engine -from sqlalchemy.orm import scoped_session -from sqlalchemy.orm import sessionmaker -from sqlalchemy.exc import IntegrityError, InvalidRequestError - -from rest.models.play_log import PlayLog as PlayLogAPI -from models import PlayLog, PlayLogSchema, ActivityLog - +import requests +from sqlalchemy import create_engine +from sqlalchemy.exc import IntegrityError, InvalidRequestError +from sqlalchemy.orm import scoped_session, sessionmaker +from models import ActivityLog, PlayLog, PlayLogSchema +from rest.models.play_log import PlayLog as PlayLogAPI class SyncJob(threading.Thread): @@ -48,7 +45,6 @@ class SyncJob(threading.Thread): Session = None - def __init__(self, config, logger, app): """ Initialize Job. @@ -61,27 +57,29 @@ class SyncJob(threading.Thread): self.exit_event = threading.Event() self.sync_interval = self.config.get("sync_interval") self.sync_batch_size = self.config.get("sync_batch_size") - self.logger.info("Initialized Sync Job - Synchronizing API Nodes every %s seconds and with a max batch-size of %s." % (self.sync_interval, self.sync_batch_size)) + self.logger.info( + "Initialized Sync Job - Synchronizing API Nodes every %s seconds and with a max batch-size of %s." + % (self.sync_interval, self.sync_batch_size) + ) # Create a scoped local database session to be thread safe engine = create_engine(self.config.get_database_uri()) session_factory = sessionmaker(autoflush=True, autocommit=False, bind=engine) self.Session = scoped_session(session_factory) - - def run(self): """ Starts the Job. - """ + """ self.synchronize() while not self.exit_event.wait(self.sync_interval): - try: + try: self.synchronize() self.logger.info("Sync cycle done.\n\n") except Exception: - self.logger.info("Error while syncing entries. Maybe there's some connectivity issue. Aborting cycle ...") - + self.logger.info( + "Error while syncing entries. Maybe there's some connectivity issue. Aborting cycle ..." + ) def synchronize(self): """ @@ -90,16 +88,22 @@ class SyncJob(threading.Thread): entries = None synced_entries = 0 unsynced_sources = self.get_unsynced_history() - self.logger.info("Synchronization of API Nodes: There are %s sources open to be synced." % len(unsynced_sources)) + self.logger.info( + "Synchronization of API Nodes: There are %s sources open to be synced." + % len(unsynced_sources) + ) for i in range(len(unsynced_sources)): source = unsynced_sources[i] - self.logger.info("Syncing source %s which is unsynced since %s" % (source.source_number, source.log_time)) + self.logger.info( + "Syncing source %s which is unsynced since %s" + % (source.source_number, source.log_time) + ) # Store the next source to build a datetime range next_source = None - if i+1 < len(unsynced_sources): - next_source = unsynced_sources[i+1] + if i + 1 < len(unsynced_sources): + next_source = unsynced_sources[i + 1] else: next_source = self.create_next_source_log(source) @@ -108,24 +112,32 @@ class SyncJob(threading.Thread): entries = self.get_unsynced_entries(source, next_source, page) while entries and len(entries) > 0: - if not entries: self.logger.info("Retrieved no entries to be synced") - else: self.logger.info("Retrieved %s playlogs to be synced" % len(entries)) + if not entries: + self.logger.info("Retrieved no entries to be synced") + else: + self.logger.info( + "Retrieved %s playlogs to be synced" % len(entries) + ) # Store unsynced entries locally for entry in entries: try: self.Session.begin_nested() - entry = PlayLogAPI.from_dict(entry) + entry = PlayLogAPI.from_dict(entry) playlog = PlayLog(entry) playlog.is_synced = True self.db_save(playlog) - self.logger.info("Stored synced playlog for '%s'" % playlog.track_start) + self.logger.info( + "Stored synced playlog for '%s'" % playlog.track_start + ) synced_entries += 1 except (IntegrityError, InvalidRequestError) as e: self.Session.rollback() - self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % playlog.track_start) - + self.logger.info( + "Playlog for '%s' is already existing in local database. Skipping..." + % playlog.track_start + ) # Sleep a little to keep the effective load down low time.sleep(self.config.get("sync_step_sleep")) @@ -138,26 +150,31 @@ class SyncJob(threading.Thread): try: source.is_synced = True self.db_save(source) - self.logger.info("Sync for source %s:%s finalized!" % (source.source_number, source.log_time)) + self.logger.info( + "Sync for source %s:%s finalized!" + % (source.source_number, source.log_time) + ) except Exception as e: - self.logger.error("Cannot finalize sync state for source=%s:%s - Reason: %s" % (source.source_number, source.log_time, str(e))) + self.logger.error( + "Cannot finalize sync state for source=%s:%s - Reason: %s" + % (source.source_number, source.log_time, str(e)) + ) # For now, let it be ... if there's more to sync let's do it in the next cycle - self.logger.info("... successfully synchronized %s playlogs!" % synced_entries) - - + self.logger.info( + "... successfully synchronized %s playlogs!" % synced_entries + ) def create_next_source_log(self, current_source): """ Create and store the next source in the ActivityLog. It's actually the same, - as the current, but acts as a references for the current sync and as a marker - for the entrypoint of the next sync -> to avoid unneccessary sync cycles. + as the current, but acts as a references for the current sync and as a marker + for the entrypoint of the next sync -> to avoid unneccessary sync cycles. """ next_source = ActivityLog(current_source.source_number) self.Session.add(next_source) return next_source - def db_save(self, model): """ Store some object to the database using the local, scoped session. @@ -165,19 +182,19 @@ class SyncJob(threading.Thread): self.Session.add(model) self.Session.flush() self.Session.commit() - - def get_unsynced_history(self): """ Retrieves all sources with un-synced states - """ + """ self.Session.commit() - unsynced = self.Session.query(ActivityLog).filter(ActivityLog.is_synced == False).\ - order_by(ActivityLog.log_time.asc()) + unsynced = ( + self.Session.query(ActivityLog) + .filter(ActivityLog.is_synced == False) + .order_by(ActivityLog.log_time.asc()) + ) return unsynced.all() - def get_unsynced_entries(self, source, next_source, page): """ Retrieve unsynced entries from main node @@ -189,21 +206,32 @@ class SyncJob(threading.Thread): to_time = next_source.log_time try: - params = { "page": page, "limit": self.sync_batch_size, "skip_synced": "true", "from_date": source.log_time, "to_date": to_time} + params = { + "page": page, + "limit": self.sync_batch_size, + "skip_synced": "true", + "from_date": source.log_time, + "to_date": to_time, + } url = self.get_url(source.source_number) response = requests.get(url, params=params) if response.status_code == 200: self.logger.info("Response from '%s' OK (200)" % url) return response.json() else: - msg = "Invalid status code while getting unsynced entries from remote API: " + str(response.status_code) + msg = ( + "Invalid status code while getting unsynced entries from remote API: " + + str(response.status_code) + ) self.logger.warn(msg) raise Exception(msg) except Exception as e: - self.logger.warn("Error while getting unsynced entries from remote API '%s'!\n%s" % (url, str(e))) + self.logger.warn( + "Error while getting unsynced entries from remote API '%s'!\n%s" + % (url, str(e)) + ) raise e - def get_url(self, source_number): """ Builds an URL for the remote API. @@ -212,13 +240,8 @@ class SyncJob(threading.Thread): url += self.config.get("sync_api_get_playlog") return url - def exit(self): """ Called when the application shuts down. """ self.exit_event.set() - - - - -- GitLab