Newer
Older
#
# Aura Engine API (https://gitlab.servus.at/aura/engine-api)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import sqlalchemy
from models import \
PlayLog, PlayLogSchema, TrackSchema, ActivityLog, \
ClockInfo, ClockInfoSchema, HealthHistory, HealthHistorySchema
"""
config = None
logger = None
node_type = None
sync_host = None
main_hosts = None
def __init__(self, config, logger, node_type):
self.config = config
self.logger = logger
self.node_type = NodeType.SYNC
self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ]
if not self.main_hosts[0] and not self.main_hosts[1]:
self.logger.warn("Not a single main host defined. Be aware what you are doing.")
msg = "No sync possible as no host nodes are configured"
else:
msg = "Syncing data of hosts '%s'" % (self.main_hosts)
else:
self.node_type = NodeType.MAIN
self.sync_host = config.get("sync_host")
if not self.sync_host:
else:
msg = "Pushing data to '%s'" % (self.sync_host)
source = ActivityLog.get_active_source()
if source: self.active_source = source.source_number
if not self.active_source:
if self.node_type == NodeType.MAIN:
source_number = self.config.get("host_id")
else:
source_number = self.config.get("default_source")
self.logger.info("Active source: %s" % self.active_source)
# Init Sync API endpoints
self.api_playlog = self.config.get("sync_api_store_playlog")
self.api_healthlog = self.config.get("sync_api_store_healthlog")
self.api_clockinfo = self.config.get("sync_api_store_clockinfo")
self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg))
def current_track(self):
"""
Retrieves the currently playing track.
Returns:
"""
track = PlayLog.select_current()
return track_schema.dump(track)
page (Integer): The number of the page to return
size (Integer): The numbers of items to return
if not size or size > 50 or size < 1: size = 20
tracklist = PlayLog.paginate(page, size, None, None, False)
tracklist_schema = TrackSchema(many=True)
return tracklist_schema.dump(tracklist)
def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False):
"""
Get paginated playlog entries for since the given timestamp.
Args:
page (Integer): The number of items to skip before starting to collect the result set
size (Integer): The numbers of items to return per page
from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z")
from_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z")
skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only)
tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced)
tracklist_schema = PlayLogSchema(many=True)
return tracklist_schema.dump(tracklist)
def store_playlog(self, data, plain_json):
"""
Stores the passed playlog entry.
Args:
data (Object): The data to store in the playlog
if self.config.get("enable_federation") == "false":
return
# Main Node: Alway log entry, independed of the source
# Sync Node: Only log entry when it's coming from an active source
if self.node_type == NodeType.MAIN or \
(self.node_type == NodeType.SYNC and data.log_source == self.active_source):
try:
playlog = PlayLog(data)
playlog.save()
self.logger.debug("Stored playlog for '%s'" % data.track_start)
except sqlalchemy.exc.IntegrityError as e:
self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % data.track_start)
if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog:
r = requests.post(api_url, json=plain_json)
self.logger.info("Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host))
playlog.is_synced = True
playlog.save()
else:
self.logger.error("Error while pushing playlog to sync-node: " + str(r.json()))
self.logger.error("Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e)))
else:
self.logger.info("Ditching playlog sent from an inactive source")
Retrieves the dataset required to render the studio clock.
info = ClockInfo.get_info(self.get_active_source())
clockinfo_schema = ClockInfoSchema()
return clockinfo_schema.dump(info)
def set_clock_info(self, data, plain_json):
"""
Sets the clock info for the given source (engine1, engine2, other).
Args:
source_number (Integer): Number of the engine
"""
clock_info = ClockInfo(data.engine_source, data.current_playlist, data.current_schedule, data.next_schedule)
if self.config.get("enable_federation") == "false":
return
# Main Node: Push to Sync Node, if enabled
if self.node_type == NodeType.MAIN and self.sync_host and self.api_clockinfo:
try:
api_url = self.sync_host + self.api_clockinfo
r = requests.put(api_url, json=plain_json)
if r.status_code == 204:
self.logger.info("Successfully pushed clock info for '%s' to '%s'" % (clock_info.log_time, self.sync_host))
else:
self.logger.error("HTTP %s | Error while pushing clock info to sync-node: " % (r.status_code, str(r.json())))
except Exception as e:
self.logger.error("Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e)))
def set_default_source(self, source_number):
"""
Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon first start
Args:
source_number (Integer): Number of the default engine
"""
if ActivityLog.is_empty():
default_source = self.config.get("default_source")
epoch_source = ActivityLog(default_source)
epoch_source.log_time = parse("2020-02-22 02:20:20")
epoch_source.save()
self.logger.info("Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time))
else:
self.set_active_source(source_number)
Sets the active source (engine1, engine2, other) identified by its source number.
if self.active_source != source_number:
self.active_source = source_number
Retrieves number of the currently active source (engine1, engine2, other)
Retrieves the most recent health info of the requested source
source_number (Integer): Number of the play-out source
health = HealthHistory.get_latest_entry(source_number)
health_schema = HealthHistorySchema()
return health_schema.dump(health)
def log_source_health(self, source_number, data, plain_json):
source_number (Integer): Number of the play-out source
if not source_number:
source_number = self.host_id
# Main Node: Alway log entry, independed of the source
# Sync Node: Only log entry when it's coming from an active source
if self.node_type == NodeType.MAIN or \
(self.node_type == NodeType.SYNC and source_number == self.active_source):
healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details)
self.logger.debug("Stored health info for '%s'" % str(source_number))
if self.config.get("enable_federation") == "false":
return
# Main Node: Push to Sync Node, if enabled
if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog:
# health_schema = HealthHistorySchema()
# json_healthlog = health_schema.dump(healthlog)
api_url = self.sync_host + self.api_healthlog + "/" + str(source_number)
try:
r = requests.post(api_url, json=plain_json)
self.logger.info("Successfully pushed healthlog for source '%s' to '%s'" % (str(source_number), self.sync_host))
healthlog.is_synced = True
healthlog.save()
self.logger.error("Error while pushing healthlog to sync-node: " + str(r.json()))
except Exception as e:
self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e)
else:
self.logger.info("Ditching healthlog sent from an inactive source")