Newer
Older
#
# Aura Engine API (https://gitlab.servus.at/aura/engine-api)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from aura_engine_api.base.node import NodeType
from aura_engine_api.models import (
ActivityLog,
ClockInfo,
ClockInfoSchema,
HealthHistory,
HealthHistorySchema,
PlayLog,
PlayLogSchema,
TrackSchema,
)
from aura_engine_api.rest import util
"""
config = None
logger = None
node_type = None
sync_host = None
main_hosts = None
def __init__(self, config, logger, node_type):
self.config = config
self.logger = logger
self.main_hosts = [config.get("main_host_1"), config.get("main_host_2")]
if not self.main_hosts[0] and not self.main_hosts[1]:
self.logger.warn("Not a single main host defined. Be aware what you are doing.")
msg = "No sync possible as no host nodes are configured"
else:
msg = "Syncing data of hosts '%s'" % (self.main_hosts)
self.node_type = NodeType.MAIN
self.sync_host = config.get("sync_host")
if not self.sync_host:
if not self.config.get("enable_federation") == "false":
msg = "Pushing data to '%s'" % (self.sync_host)
if source:
self.active_source = source.source_number
if not self.active_source:
if self.node_type == NodeType.MAIN:
source_number = self.config.get("host_id")
else:
source_number = self.config.get("default_source")
self.logger.info("Active source: %s" % self.active_source)
# Init Sync API endpoints
self.api_playlog = self.config.get("sync_api_store_playlog")
self.api_healthlog = self.config.get("sync_api_store_healthlog")
self.api_clockinfo = self.config.get("sync_api_store_clockinfo")
self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg))
"""
track = PlayLog.select_current()
return track_schema.dump(track)
def list_tracks(self, page=None, size=None, from_time=None, to_time=None):
page (Integer): The number of the page to return
size (Integer): The numbers of items to return
from_time (datetime): Optionally, get entries after this timestamp
(e.g. "2020-08-29T09:12:33.001Z")
to_time (datetime): Optionally, get entries before this timestamp
(e.g. "2020-08-29T09:12:33.001Z")
if not size or size > 50 or size < 1:
size = 20
tracklist = PlayLog.paginate(page, size, from_time, to_time, False)
tracklist_schema = TrackSchema(many=True)
return tracklist_schema.dump(tracklist)
def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False):
Get paginated playlog entries for since the given timestamp.
Args:
page (Integer): The number of items to skip before starting to collect the result set
size (Integer): The numbers of items to return per page
from_time (datetime): Optionally, get entries after this timestamp
(e.g. "2020-08-29T09:12:33.001Z")
to_time (datetime): Optionally, get entries before this timestamp
(e.g. "2020-08-29T09:12:33.001Z")
skip_synced (Boolean): Optionally, don't return entries which have been posted directly
before (sync node only)
tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced)
tracklist_schema = PlayLogSchema(many=True)
return tracklist_schema.dump(tracklist)
def store_playlog(self, data, plain_json):
"""
Stores the passed playlog entry.
Args:
data (Object): The data to store in the playlog
# Main Node: Always log entry, independent of the source
# Sync Node: Only log entry when it's coming from an active source
if self.node_type == NodeType.MAIN or (
self.node_type == NodeType.SYNC and data.log_source == self.active_source
):
try:
playlog = PlayLog(data)
playlog.save()
self.logger.debug("Stored playlog for '%s'" % data.track_start)
self.logger.info(
"Playlog for '%s' is already existing in local database. Skipping..."
% data.track_start
)
if self.config.get("enable_federation") == "false":
return
if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog:
r = requests.post(api_url, json=plain_json)
self.logger.info(
"Successfully pushed playlog for '%s' to '%s'"
% (playlog.track_start, self.sync_host)
)
playlog.is_synced = True
playlog.save()
else:
self.logger.error(
"Error while pushing playlog to sync-node: " + str(r.json())
)
"Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e))
else:
self.logger.info("Ditching playlog sent from an inactive source")
Retrieves the dataset required to render the studio clock.
info = ClockInfo.get_info(self.get_active_source())
now = datetime.datetime.now()
if "current_timeslot" in info and info["current_timeslot"]["timeslot_end"]:
timeslot_end = util.deserialize_datetime(info["current_timeslot"]["timeslot_end"])
if timeslot_end < now:
info["current_timeslot"] = None
clockinfo_schema = ClockInfoSchema()
return clockinfo_schema.dump(info)
def set_clock_info(self, data, plain_json):
"""
Sets the clock info for the given source (engine1, engine2, other).
"""
clock_info = ClockInfo.get(self.get_active_source())
if clock_info:
is_existing = True
else:
clock_info = ClockInfo()
clock_info.set_info(
data.engine_source,
data.planned_playlist,
data.current_timeslot,
data.upcoming_timeslots,
if is_existing:
clock_info.update()
else:
clock_info.save()
if self.config.get("enable_federation") == "false":
return
# Main Node: Push to Sync Node, if enabled
if self.node_type == NodeType.MAIN and self.sync_host and self.api_clockinfo:
try:
api_url = self.sync_host + self.api_clockinfo
r = requests.put(api_url, json=plain_json)
if r.status_code == 204:
self.logger.info(
"Successfully pushed clock info for '%s' to '%s'"
% (clock_info.log_time, self.sync_host)
)
"HTTP %s | Error while pushing clock info to sync-node: %s",
r.status_code,
str(r.json()),
"Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e))
def set_default_source(self, source_number):
"""
Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon
first start.
Args:
source_number (Integer): Number of the default engine
"""
if ActivityLog.is_empty():
default_source = self.config.get("default_source")
epoch_source = ActivityLog(default_source)
epoch_source.log_time = parse("2020-02-22 02:20:20")
epoch_source.save()
self.logger.info(
"Created API epoch source %s:%s"
% (epoch_source.source_number, epoch_source.log_time)
)
else:
self.set_active_source(source_number)
Sets the active source (engine1, engine2, other) identified by its source number.
if self.active_source != source_number:
self.active_source = source_number
Retrieves number of the currently active source (engine1, engine2, other)
Retrieves the most recent health info of the requested source
source_number (Integer): Number of the play-out source
health = HealthHistory.get_latest_entry(source_number)
health_schema = HealthHistorySchema()
return health_schema.dump(health)
def log_source_health(self, source_number, data, plain_json):
source_number (Integer): Number of the play-out source
if not source_number:
source_number = self.host_id
# Main Node: Always log entry, independent of the source
# Sync Node: Only log entry when it's coming from an active source
if self.node_type == NodeType.MAIN or (
self.node_type == NodeType.SYNC and source_number == self.active_source
):
healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details)
self.logger.debug("Stored health info for '%s'" % str(source_number))
if self.config.get("enable_federation") == "false":
return
if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog:
# health_schema = HealthHistorySchema()
# json_healthlog = health_schema.dump(healthlog)
api_url = self.sync_host + self.api_healthlog + "/" + str(source_number)
r = requests.post(api_url, json=plain_json)
self.logger.info(
"Successfully pushed healthlog for source '%s' to '%s'"
% (str(source_number), self.sync_host)
)
healthlog.is_synced = True
healthlog.save()
"Error while pushing healthlog to sync-node: " + str(r.json())
self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e)
else:
self.logger.info("Ditching healthlog sent from an inactive source")