Skip to content
Snippets Groups Projects
service.py 12.78 KiB
#
# Aura Engine API (https://gitlab.servus.at/aura/engine-api)
#
# Copyright (C) 2020 - The Aura Engine Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import datetime
import requests
import json
import sqlalchemy

from dateutil.parser import parse

from base.node import NodeType
from rest import util
from models import \
    PlayLog, PlayLogSchema, TrackSchema, ActivityLog, \
    ClockInfo, ClockInfoSchema, HealthHistory, HealthHistorySchema



class ApiService():
    """
    Service handling for API actions.
    """

    config = None
    logger = None
    node_type = None
    sync_host = None
    main_hosts = None
    active_source = None
    api_playlog = None
    api_healthlog = None
    api_clockinfo = None


    def __init__(self, config, logger, node_type):
        """
        Initialize Service.
        """
        self.config = config
        self.logger = logger

        # Configured as Sync Node
        if not node_type == NodeType.MAIN:
            self.node_type = NodeType.SYNC
            self.main_hosts = [ config.get("main_host_1"), config.get("main_host_2") ]
            if not self.main_hosts[0] and not self.main_hosts[1]:
                self.logger.warn("Not a single main host defined. Be aware what you are doing.")
                msg = "No sync possible as no host nodes are configured"
            else:
                msg = "Syncing data of hosts '%s'" % (self.main_hosts)

        # Configured as Main Node
        else:
            self.node_type = NodeType.MAIN
            self.sync_host = config.get("sync_host")
            if not self.sync_host:
                msg = "No child node for synchronization defined"
            else:
                msg = "Pushing data to '%s'" % (self.sync_host)

        # Set active source
        source = ActivityLog.get_active_source()
        if source: self.active_source = source.source_number
        if not self.active_source:
            if self.node_type == NodeType.MAIN:
                source_number = self.config.get("host_id")
            else:
                source_number = self.config.get("default_source")
            self.set_default_source(source_number)
        self.logger.info("Active source: %s" % self.active_source)

        # Init Sync API endpoints
        self.api_playlog = self.config.get("sync_api_store_playlog")
        self.api_healthlog = self.config.get("sync_api_store_healthlog")
        self.api_clockinfo = self.config.get("sync_api_store_clockinfo")

        self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg))


    def current_track(self):  
        """
        Retrieves the currently playing track.  

        Returns:
            (JSON)
        """
        track = PlayLog.select_current()
        track_schema = TrackSchema()
        return track_schema.dump(track)


    def list_tracks(self, page=None, size=None, from_time=None, to_time=None):  
        """
        Lists track-service entries with pagination.

        Args:
            page (Integer): The number of the page to return
            size (Integer): The numbers of items to return
            from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z")
            to_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z")
        Returns:
            (JSON)
        """
        if not size or size > 50 or size < 1: size = 20
        tracklist = PlayLog.paginate(page, size, from_time, to_time, False)
        tracklist_schema = TrackSchema(many=True)
        return tracklist_schema.dump(tracklist)


    def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False):  
        """
        Get paginated playlog entries for since the given timestamp.  

        Args:
            page (Integer): The number of items to skip before starting to collect the result set
            size (Integer): The numbers of items to return per page
            from_time (datetime): Optionally, get entries after this timestamp (e.g. "2020-08-29T09:12:33.001Z")
            to_time (datetime): Optionally, get entries before this timestamp (e.g. "2020-08-29T09:12:33.001Z")
            skip_synced (Boolean): Optionally, don't return entries which have been posted directly before (sync node only)

        Returns:
            (JSON)
        """
        if not page: page = 0
        tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced)
        tracklist_schema = PlayLogSchema(many=True)
        return tracklist_schema.dump(tracklist)


    def store_playlog(self, data, plain_json): 
        """
        Stores the passed playlog entry.

        Args:
            data (Object): The data to store in the playlog
        """
        if not data.log_source:
            data.log_source = self.config.get("host_id")

        # Main Node: Alway log entry, independed of the source
        # Sync Node: Only log entry when it's coming from an active source
        if self.node_type == NodeType.MAIN or \
            (self.node_type == NodeType.SYNC and data.log_source == self.active_source):

            try:
                playlog = PlayLog(data)
                playlog.save()
                self.logger.debug("Stored playlog for '%s'" % data.track_start)
            except sqlalchemy.exc.IntegrityError as e:
                self.logger.info("Playlog for '%s' is already existing in local database. Skipping..." % data.track_start)

            if self.config.get("enable_federation") == "false":
                return
                
            # Main Node: Push to Sync Node, if enabled
            if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog:
                try:
                    api_url = self.sync_host + self.api_playlog
                    r = requests.post(api_url, json=plain_json)
                    if r.status_code == 204:
                        self.logger.info("Successfully pushed playlog for '%s' to '%s'" % (playlog.track_start, self.sync_host))
                        playlog.is_synced = True
                        playlog.save()
                    else:
                        self.logger.error("Error while pushing playlog to sync-node: " + str(r.json()))
                except Exception as e:
                    self.logger.error("Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e)))
        else:
            self.logger.info("Ditching playlog sent from an inactive source")


    def get_clock_info(self):
        """
        Retrieves the dataset required to render the studio clock.  
        """
        info = ClockInfo.get_info(self.get_active_source())

        now = datetime.datetime.now()
        if info["current_timeslot"] and info["current_timeslot"]["timeslot_end"]:
            timeslot_end = util.deserialize_datetime(info["current_timeslot"]["timeslot_end"])
            if timeslot_end < now:
                info["current_timeslot"] = None
          
        clockinfo_schema = ClockInfoSchema()
        return clockinfo_schema.dump(info)


    def set_clock_info(self, data, plain_json):
        """
        Sets the clock info for the given source (engine1, engine2, other).

        Args:
            source_number (Integer): Number of the engine
        """
        if data.engine_source <= 0:
            return
        
        is_existing = False
        clock_info = ClockInfo.get(self.get_active_source()) 
        if clock_info: 
            is_existing = True
        else:
            clock_info = ClockInfo()

        clock_info.set_info(data.engine_source, data.current_playlist, data.current_timeslot, data.next_timeslot)
        
        if is_existing:
            clock_info.update()
        else:
            clock_info.save()

        if self.config.get("enable_federation") == "false":
            return

        # Main Node: Push to Sync Node, if enabled
        if self.node_type == NodeType.MAIN and self.sync_host and self.api_clockinfo:
            try:
                api_url = self.sync_host + self.api_clockinfo
                r = requests.put(api_url, json=plain_json)
                if r.status_code == 204:
                    self.logger.info("Successfully pushed clock info for '%s' to '%s'" % (clock_info.log_time, self.sync_host))
                else:
                    self.logger.error("HTTP %s | Error while pushing clock info to sync-node: " % (r.status_code, str(r.json())))
            except Exception as e:
                self.logger.error("Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e)))              


    def set_default_source(self, source_number):
        """
        Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon first start

        Args:
            source_number (Integer): Number of the default engine
        """
        if ActivityLog.is_empty():
            default_source = self.config.get("default_source")
            epoch_source = ActivityLog(default_source)
            epoch_source.log_time = parse("2020-02-22 02:20:20")
            epoch_source.save()
            self.logger.info("Created API epoch source %s:%s" % (epoch_source.source_number, epoch_source.log_time))
        else:
            self.set_active_source(source_number)


    def set_active_source(self, source_number):
        """
        Sets the active source (engine1, engine2, other) identified by its source number.

        Args:
            source_number (Integer): Number of the engine
        """
        if source_number > 0:
            if self.active_source != source_number:
                self.active_source = source_number
                activity_log = ActivityLog(source_number)
                activity_log.save()


    def get_active_source(self):
        """
        Retrieves number of the currently active source (engine1, engine2, other)

        Returns:
            (Integer)
        """
        return self.active_source


    def get_source_health(self, source_number):
        """
        Retrieves the most recent health info of the requested source

        Args:
            source_number (Integer): Number of the play-out source


        Returns:
            (JSON)
        """
        health = HealthHistory.get_latest_entry(source_number)
        health_schema = HealthHistorySchema()
        return health_schema.dump(health)


    def log_source_health(self, source_number, data, plain_json):
        """
        Logs an health entry for the given source

        Args:
            source_number (Integer): Number of the play-out source
            data (Object): The data to store in the health log

        """
        if not source_number:
            source_number = self.host_id

        # Main Node: Alway log entry, independed of the source
        # Sync Node: Only log entry when it's coming from an active source
        if self.node_type == NodeType.MAIN or \
            (self.node_type == NodeType.SYNC and source_number == self.active_source):

            healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details)
            healthlog.save()
            self.logger.debug("Stored health info for '%s'" % str(source_number))

            if self.config.get("enable_federation") == "false":
                return

            # Main Node: Push to Sync Node, if enabled
            if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog:

                # health_schema = HealthHistorySchema()
                # json_healthlog = health_schema.dump(healthlog)
                api_url = self.sync_host + self.api_healthlog + "/" + str(source_number)

                try:                    
                    r = requests.post(api_url, json=plain_json)
                    if r.status_code == 200:
                        self.logger.info("Successfully pushed healthlog for source '%s' to '%s'" % (str(source_number), self.sync_host))
                        healthlog.is_synced = True
                        healthlog.save()
                    else:
                        self.logger.error("Error while pushing healthlog to sync-node: " + str(r.json()))
                except Exception as e:
                    self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e)
        else:
            self.logger.info("Ditching healthlog sent from an inactive source")