Skip to content
Snippets Groups Projects
service.py 13.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • David Trattnig's avatar
    David Trattnig committed
    #
    # Aura Engine API (https://gitlab.servus.at/aura/engine-api)
    #
    # Copyright (C) 2020 - The Aura Engine Team.
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    import datetime
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    import requests
    import sqlalchemy
    
    David Trattnig's avatar
    David Trattnig committed
    from dateutil.parser import parse
    
    
    from aura_engine_api.base.node import NodeType
    from aura_engine_api.models import (
    
        ActivityLog,
        ClockInfo,
        ClockInfoSchema,
        HealthHistory,
        HealthHistorySchema,
        PlayLog,
        PlayLogSchema,
        TrackSchema,
    )
    
    from aura_engine_api.rest import util
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    class ApiService:
    
    David Trattnig's avatar
    David Trattnig committed
        Service handling for API actions.
    
        """
    
        config = None
        logger = None
    
        node_type = None
        sync_host = None
        main_hosts = None
    
    David Trattnig's avatar
    David Trattnig committed
        active_source = None
    
    David Trattnig's avatar
    David Trattnig committed
        api_playlog = None
        api_healthlog = None
    
        api_clockinfo = None
    
    David Trattnig's avatar
    David Trattnig committed
    
    
        def __init__(self, config, logger, node_type):
    
            """
            Initialize Service.
            """
    
            self.config = config
            self.logger = logger
    
    
    David Trattnig's avatar
    David Trattnig committed
            # Configured as Sync Node
    
            if not node_type == NodeType.MAIN:
    
                self.node_type = NodeType.SYNC
    
                self.main_hosts = [config.get("main_host_1"), config.get("main_host_2")]
    
                if not self.main_hosts[0] and not self.main_hosts[1]:
    
                    self.logger.warn("Not a single main host defined. Be aware what you are doing.")
    
    David Trattnig's avatar
    David Trattnig committed
                    msg = "No sync possible as no host nodes are configured"
    
                else:
                    msg = "Syncing data of hosts '%s'" % (self.main_hosts)
    
    David Trattnig's avatar
    David Trattnig committed
            # Configured as Main Node
    
                msg = ""
    
                self.node_type = NodeType.MAIN
                self.sync_host = config.get("sync_host")
                if not self.sync_host:
    
    David Trattnig's avatar
    David Trattnig committed
                    msg = "No child node for synchronization defined"
    
                    if not self.config.get("enable_federation") == "false":
    
                        msg = "Pushing data to '%s'" % (self.sync_host)
    
    David Trattnig's avatar
    David Trattnig committed
            # Set active source
    
    David Trattnig's avatar
    David Trattnig committed
            source = ActivityLog.get_active_source()
    
            if source:
                self.active_source = source.source_number
    
    David Trattnig's avatar
    David Trattnig committed
            if not self.active_source:
                if self.node_type == NodeType.MAIN:
                    source_number = self.config.get("host_id")
                else:
                    source_number = self.config.get("default_source")
    
    David Trattnig's avatar
    David Trattnig committed
                self.set_default_source(source_number)
    
            self.logger.info("Active source: %s" % self.active_source)
    
    David Trattnig's avatar
    David Trattnig committed
            # Init Sync API endpoints
            self.api_playlog = self.config.get("sync_api_store_playlog")
            self.api_healthlog = self.config.get("sync_api_store_healthlog")
    
            self.api_clockinfo = self.config.get("sync_api_store_clockinfo")
    
    David Trattnig's avatar
    David Trattnig committed
            self.logger.info("Running in '%s' mode. %s." % (self.node_type, msg))
    
        def current_track(self):
    
            Retrieves the currently playing track.
    
    David Trattnig's avatar
    David Trattnig committed
                (JSON)
    
            """
            track = PlayLog.select_current()
    
    David Trattnig's avatar
    David Trattnig committed
            track_schema = TrackSchema()
    
            return track_schema.dump(track)
    
    
        def list_tracks(self, page=None, size=None, from_time=None, to_time=None):
    
    David Trattnig's avatar
    David Trattnig committed
            Lists track-service entries with pagination.
    
    David Trattnig's avatar
    David Trattnig committed
                page (Integer): The number of the page to return
                size (Integer): The numbers of items to return
    
    Lars Kruse's avatar
    Lars Kruse committed
                from_time (datetime): Optionally, get entries after this timestamp
                    (e.g. "2020-08-29T09:12:33.001Z")
                to_time (datetime): Optionally, get entries before this timestamp
                    (e.g. "2020-08-29T09:12:33.001Z")
    
    David Trattnig's avatar
    David Trattnig committed
                (JSON)
    
            if not size or size > 50 or size < 1:
                size = 20
    
            tracklist = PlayLog.paginate(page, size, from_time, to_time, False)
    
    David Trattnig's avatar
    David Trattnig committed
            tracklist_schema = TrackSchema(many=True)
            return tracklist_schema.dump(tracklist)
    
        def list_playlog(self, page=None, size=None, from_time=None, to_time=None, skip_synced=False):
    
            Get paginated playlog entries for since the given timestamp.
    
    
            Args:
                page (Integer): The number of items to skip before starting to collect the result set
                size (Integer): The numbers of items to return per page
    
    Lars Kruse's avatar
    Lars Kruse committed
                from_time (datetime): Optionally, get entries after this timestamp
                    (e.g. "2020-08-29T09:12:33.001Z")
                to_time (datetime): Optionally, get entries before this timestamp
                    (e.g. "2020-08-29T09:12:33.001Z")
                skip_synced (Boolean): Optionally, don't return entries which have been posted directly
                    before (sync node only)
    
    David Trattnig's avatar
    David Trattnig committed
                (JSON)
    
            if not page:
                page = 0
    
    David Trattnig's avatar
    David Trattnig committed
            tracklist = PlayLog.paginate(page, size, from_time, to_time, skip_synced)
    
            tracklist_schema = PlayLogSchema(many=True)
    
            return tracklist_schema.dump(tracklist)
    
    
        def store_playlog(self, data, plain_json):
    
            """
            Stores the passed playlog entry.
    
    
    David Trattnig's avatar
    David Trattnig committed
            Args:
                data (Object): The data to store in the playlog
    
            if not data.log_source:
    
                data.log_source = self.config.get("host_id")
    
            # Main Node: Always log entry, independent of the source
    
    David Trattnig's avatar
    David Trattnig committed
            # Sync Node: Only log entry when it's coming from an active source
    
            if self.node_type == NodeType.MAIN or (
                self.node_type == NodeType.SYNC and data.log_source == self.active_source
            ):
    
                try:
                    playlog = PlayLog(data)
                    playlog.save()
    
                    self.logger.debug("Stored playlog for '%s'" % data.track_start)
    
    Lars Kruse's avatar
    Lars Kruse committed
                except sqlalchemy.exc.IntegrityError:
    
                    self.logger.info(
                        "Playlog for '%s' is already existing in local database. Skipping..."
                        % data.track_start
                    )
    
                if self.config.get("enable_federation") == "false":
                    return
    
    David Trattnig's avatar
    David Trattnig committed
                # Main Node: Push to Sync Node, if enabled
    
    David Trattnig's avatar
    David Trattnig committed
                if self.node_type == NodeType.MAIN and self.sync_host and self.api_playlog:
    
    David Trattnig's avatar
    David Trattnig committed
                    try:
    
    David Trattnig's avatar
    David Trattnig committed
                        api_url = self.sync_host + self.api_playlog
    
                        r = requests.post(api_url, json=plain_json)
    
                        if r.status_code == 204:
    
                            self.logger.info(
                                "Successfully pushed playlog for '%s' to '%s'"
                                % (playlog.track_start, self.sync_host)
                            )
    
    David Trattnig's avatar
    David Trattnig committed
                            playlog.is_synced = True
                            playlog.save()
                        else:
    
                            self.logger.error(
                                "Error while pushing playlog to sync-node: " + str(r.json())
                            )
    
    David Trattnig's avatar
    David Trattnig committed
                    except Exception as e:
    
                        self.logger.error(
    
                            "Error while posting to sync-node API '%s'!\n%s" % (api_url, str(e))
    
    David Trattnig's avatar
    David Trattnig committed
            else:
                self.logger.info("Ditching playlog sent from an inactive source")
    
        def get_clock_info(self):
    
            Retrieves the dataset required to render the studio clock.
    
            info = ClockInfo.get_info(self.get_active_source())
    
            now = datetime.datetime.now()
    
            if "current_timeslot" in info and info["current_timeslot"]["timeslot_end"]:
    
                timeslot_end = util.deserialize_datetime(info["current_timeslot"]["timeslot_end"])
    
                if timeslot_end < now:
                    info["current_timeslot"] = None
    
            clockinfo_schema = ClockInfoSchema()
            return clockinfo_schema.dump(info)
    
    
        def set_clock_info(self, data, plain_json):
    
            """
            Sets the clock info for the given source (engine1, engine2, other).
            """
    
            if data.engine_source <= 0:
    
            is_existing = False
    
            clock_info = ClockInfo.get(self.get_active_source())
            if clock_info:
    
                is_existing = True
            else:
                clock_info = ClockInfo()
    
    
            clock_info.set_info(
                data.engine_source,
                data.planned_playlist,
                data.current_timeslot,
    
                data.upcoming_timeslots,
    
            if is_existing:
                clock_info.update()
            else:
                clock_info.save()
    
            if self.config.get("enable_federation") == "false":
                return
    
    
            # Main Node: Push to Sync Node, if enabled
            if self.node_type == NodeType.MAIN and self.sync_host and self.api_clockinfo:
                try:
                    api_url = self.sync_host + self.api_clockinfo
                    r = requests.put(api_url, json=plain_json)
                    if r.status_code == 204:
    
                        self.logger.info(
                            "Successfully pushed clock info for '%s' to '%s'"
                            % (clock_info.log_time, self.sync_host)
                        )
    
                        self.logger.error(
    
    Lars Kruse's avatar
    Lars Kruse committed
                            "HTTP %s | Error while pushing clock info to sync-node: %s",
                            r.status_code,
                            str(r.json()),
    
                except Exception as e:
    
                    self.logger.error(
    
                        "Error while putting clock info to sync-node API '%s'!\n%s" % (api_url, str(e))
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    David Trattnig's avatar
    David Trattnig committed
        def set_default_source(self, source_number):
            """
    
    Lars Kruse's avatar
    Lars Kruse committed
            Create initial source (API Epoch) in the ActivityLog being able to sync old entries upon
            first start.
    
    David Trattnig's avatar
    David Trattnig committed
    
            Args:
                source_number (Integer): Number of the default engine
            """
            if ActivityLog.is_empty():
                default_source = self.config.get("default_source")
                epoch_source = ActivityLog(default_source)
                epoch_source.log_time = parse("2020-02-22 02:20:20")
                epoch_source.save()
    
                self.logger.info(
                    "Created API epoch source %s:%s"
                    % (epoch_source.source_number, epoch_source.log_time)
                )
    
    David Trattnig's avatar
    David Trattnig committed
            else:
                self.set_active_source(source_number)
    
    
    David Trattnig's avatar
    David Trattnig committed
        def set_active_source(self, source_number):
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            Sets the active source (engine1, engine2, other) identified by its source number.
    
    David Trattnig's avatar
    David Trattnig committed
    
            Args:
    
    David Trattnig's avatar
    David Trattnig committed
                source_number (Integer): Number of the engine
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
            if source_number > 0:
    
                if self.active_source != source_number:
                    self.active_source = source_number
    
                    activity_log = ActivityLog(source_number)
    
                    activity_log.save()
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    David Trattnig's avatar
    David Trattnig committed
        def get_active_source(self):
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            Retrieves number of the currently active source (engine1, engine2, other)
    
    David Trattnig's avatar
    David Trattnig committed
    
            Returns:
    
    David Trattnig's avatar
    David Trattnig committed
                (Integer)
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            return self.active_source
    
    David Trattnig's avatar
    David Trattnig committed
    
    
    David Trattnig's avatar
    David Trattnig committed
        def get_source_health(self, source_number):
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            Retrieves the most recent health info of the requested source
    
    David Trattnig's avatar
    David Trattnig committed
    
            Args:
    
    David Trattnig's avatar
    David Trattnig committed
                source_number (Integer): Number of the play-out source
    
    David Trattnig's avatar
    David Trattnig committed
    
    
            Returns:
    
    David Trattnig's avatar
    David Trattnig committed
                (JSON)
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            health = HealthHistory.get_latest_entry(source_number)
            health_schema = HealthHistorySchema()
            return health_schema.dump(health)
    
    David Trattnig's avatar
    David Trattnig committed
    
    
        def log_source_health(self, source_number, data, plain_json):
    
    David Trattnig's avatar
    David Trattnig committed
            """
    
    David Trattnig's avatar
    David Trattnig committed
            Logs an health entry for the given source
    
    David Trattnig's avatar
    David Trattnig committed
    
            Args:
    
    David Trattnig's avatar
    David Trattnig committed
                source_number (Integer): Number of the play-out source
    
    David Trattnig's avatar
    David Trattnig committed
                data (Object): The data to store in the health log
    
    David Trattnig's avatar
    David Trattnig committed
    
            """
    
            if not source_number:
                source_number = self.host_id
    
            # Main Node: Always log entry, independent of the source
    
    David Trattnig's avatar
    David Trattnig committed
            # Sync Node: Only log entry when it's coming from an active source
    
            if self.node_type == NodeType.MAIN or (
                self.node_type == NodeType.SYNC and source_number == self.active_source
            ):
    
    David Trattnig's avatar
    David Trattnig committed
    
    
                healthlog = HealthHistory(source_number, data.log_time, data.is_healthy, data.details)
    
    David Trattnig's avatar
    David Trattnig committed
                healthlog.save()
    
                self.logger.debug("Stored health info for '%s'" % str(source_number))
    
                if self.config.get("enable_federation") == "false":
                    return
    
    David Trattnig's avatar
    David Trattnig committed
    
                # Main Node: Push to Sync Node, if enabled
    
                if self.node_type == NodeType.MAIN and self.sync_host and self.api_healthlog:
    
    David Trattnig's avatar
    David Trattnig committed
    
    
                    # health_schema = HealthHistorySchema()
                    # json_healthlog = health_schema.dump(healthlog)
                    api_url = self.sync_host + self.api_healthlog + "/" + str(source_number)
    
    David Trattnig's avatar
    David Trattnig committed
    
    
                        r = requests.post(api_url, json=plain_json)
    
    David Trattnig's avatar
    David Trattnig committed
                        if r.status_code == 200:
    
                            self.logger.info(
                                "Successfully pushed healthlog for source '%s' to '%s'"
                                % (str(source_number), self.sync_host)
                            )
    
                            healthlog.is_synced = True
                            healthlog.save()
    
    David Trattnig's avatar
    David Trattnig committed
                        else:
    
                            self.logger.error(
    
                                "Error while pushing healthlog to sync-node: " + str(r.json())
    
    David Trattnig's avatar
    David Trattnig committed
                    except Exception as e:
    
                        self.logger.error("Error while posting to sync-node API '%s'!" % (api_url), e)
    
    David Trattnig's avatar
    David Trattnig committed
            else:
                self.logger.info("Ditching healthlog sent from an inactive source")