Skip to content
Snippets Groups Projects
timetable.py 15.35 KiB
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


"""
Services representing and dealing with the program timetable.

    - TimetableService: Representation of the timetable containing timeslots for being scheduled.
    - TimetableMerger: Processor to merge remote timeslots with the local ones.
"""

from __future__ import annotations

import logging
import os
from datetime import datetime

import confuse
import jsonpickle

import aura_engine.engine as engine
import aura_engine.scheduling.api as api
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot


class TimetableService:
    """
    Timetable Service.

    The current program timetable as per scheduling calendar. The timetable is a set of
    timeslots over the period of 24 hours.
    """

    config: confuse.Configuration
    logger: logging.Logger
    timetable: list[Timeslot] | None = None
    api_fetcher: api.ApiFetcher | None = None
    last_successful_fetch: datetime | None = None
    cache_location: str | None = None
    timetable_file: str | None = None
    restorable: bool

    def __init__(self, cache_location: str):
        """
        Initialize.

        Args:
            cache_location (str): Path to folder where timetable.json is stored.
        """
        self.config = AuraConfig.instance.config
        self.logger = logging.getLogger("engine")
        jsonpickle.set_encoder_options("json", sort_keys=False, indent=2)

        if cache_location[-1] != "/":
            cache_location += "/"
        cache_location += "timetable"
        os.makedirs(cache_location, exist_ok=True)
        self.cache_location = cache_location
        self.timetable_file = self.cache_location + "/timetable.json"
        self.restorable = False

    @synchronized
    def refresh(self):
        """
        Update the timetable.

            1. Fetch the latest timeslots from the API or the API cache.
            2. Merge with the current timetable.
            3. Persist to `timetable.json`

        """
        self.logger.debug("Trying to fetch new timeslots from API endpoints...")

        if not self.api_fetcher:
            self.api_fetcher = api.ApiFetcher()

        self.api_fetcher.start()
        response = self.api_fetcher.fetch()
        self.api_fetcher = None

        if response.code == 0:
            if len(response.timeslots) > 0:
                self.last_successful_fetch = datetime.now()
                msg = f"Finished fetching current timeslots from API ({len(response)})"
                self.logger.info(SU.green(msg))
                self.merge_timetable(response.timeslots)
                # FIXME: for some unknown reason storing the timetable fails. This does not affect
                # the playout, since storing the timetable is not in use atm.
                # self.persist_timetable()
            else:
                self.logger.warning("Program fetched from Steering contains no timeslots!")
        else:
            msg = SU.red(f"Keep using current timetable, as API returned: {response.message}")
            self.logger.warning(msg)
            self.logger.debug("Exception in API Fetcher: \n" + str(response.exception))

    def merge_timetable(self, remote_timeslots: list):
        """
        Merge the fetched timeslots with the current local timetable.

        Args:
            remote_timeslots (list): the timeslots fetched from the API.
        """
        merger: TimetableMerger = TimetableMerger()
        self.timetable = merger.merge(self.timetable, remote_timeslots)
        self.logger.info(SU.green("Merged timetable"))

    def persist_timetable(self):
        """
        Stores the current timetable to the local JSON file used for caching.
        """
        if self.timetable_file is None:
            self.logger.error(SU.red("No timetable file specified."))
            return

        # Ensure the directory exists
        os.makedirs(os.path.dirname(self.timetable_file), exist_ok=True)

        try:
            with open(self.timetable_file, "w") as file:
                file.write(jsonpickle.encode(self.timetable, unpicklable=self.restorable))
                self.logger.info(SU.green("timetable.json stored"))
        except FileNotFoundError as fnf_error:
            self.logger.error(SU.red(f"File not found: {self.timetable_file}. {fnf_error}"))
        except PermissionError as perm_error:
            self.logger.error(
                SU.red(f"Permission denied for file: {self.timetable_file}. {perm_error}")
            )
        except Exception as e:
            self.logger.error(SU.red(f"Unexpected error while storing {self.timetable_file}: {e}"))

    def get_current_item(self) -> PlaylistItem | None:
        """
        Retrieve the current playlist item which should be played as per timetable.

        Returns:
            (PlaylistItem): The track which is (or should) currently be on air.
        """
        now: float = engine.Engine.engine_time()

        # If necessary initialize timetable
        if not self.timetable:
            self.refresh()

        # Check for current timeslot
        current_timeslot: Timeslot = self.get_current_timeslot()
        if not current_timeslot:
            self.logger.warning(SU.red("There's no active timeslot"))
            return None

        # Check for scheduled playlist
        current_playlist: Playlist = current_timeslot.get_current_playlist()
        if not current_playlist:
            msg = (
                "There's no (default) playlist assigned to the current timeslot."
                " Most likely a fallback will make things okay again."
            )
            self.logger.warning(SU.red(msg))
            return None

        # Iterate over playlist items and store the current one
        current_item = None
        for item in current_playlist.items:
            if item.get_start() <= now and now <= item.get_end():
                current_item = item
                break

        if not current_item:
            # Nothing playing ... fallback will kick-in
            msg = f"There's no current item in playlist {current_playlist}"
            self.logger.warning(SU.red(msg))
            return None

        return current_item

    def get_current_timeslot(self) -> Timeslot:
        """
        Retrieve the timeslot currently to be played.

        Returns:
            (Timeslot): The current timeslot.
        """
        current_timeslot = None
        now = engine.Engine.engine_time()

        # Iterate over all timeslots and find the one to be played right now
        if self.timetable:
            timeslot: Timeslot
            for timeslot in self.timetable:
                if timeslot.get_start() <= now and now < timeslot.get_end():
                    current_timeslot = timeslot
                    break

        return current_timeslot

    def get_next_timeslots(self, max_count: int = 0, window_aware=False) -> list[Timeslot]:
        """
        Retrieve the timeslots to be played after the current one.

        The method allows to return only a max count of timeslots. Also, timeslots which are
        not within the scheduling window can be optionally omitted.

        The scheduling window behaviour has these effects, when scheduling timeslots:

            - Before the scheduling window: Timeslots can still be deleted in Steering and the
              playout will respect this.
            - During the scheduling window: Timeslots and it's playlists are queued as timed
              commands.
            - After the scheduling window: Such timeslots are ignored, because it doesn't make
              sense anymore to schedule them before the next timeslot starts.

        Args:
            max_count (Integer): Maximum of timeslots to return. If `0` is passed, all existing
                ones are returned.
            window_aware (bool): If set to true, only timeslots within the scheduling window are
                returned.

        Returns:
            ([Timeslot]): The upcoming timeslots.
        """
        now = engine.Engine.engine_time()
        next_timeslots = []

        if not self.timetable:
            return []

        ts: Timeslot
        for ts in self.timetable:
            if ts.get_start() > now:
                in_window = self.is_timeslot_in_window(ts)
                if not window_aware or in_window:
                    if (len(next_timeslots) < max_count) or max_count == 0:
                        next_timeslots.append(ts)
                    else:
                        break
                else:
                    start = SU.fmt_time(ts.get_start())
                    end = SU.fmt_time(ts.get_end())
                    t1: int = self.config.scheduler.scheduling_window_start
                    t2: int = self.config.scheduler.scheduling_window_end
                    msg = f"Skip timeslot #{ts.get_id()} [{start} - {end}] "
                    msg += f"- not in scheduling window T¹-{t1}s to T²-{t2}s"
                    self.logger.debug(msg)

        return next_timeslots

    def is_timeslot_in_window(self, timeslot: Timeslot) -> bool:
        """
        Check if the timeslot is within the scheduling window.

        The scheduling window represents the soonest and latest
        point some timeslot can be scheduled.

        Args:
            timeslot (Timeslot): The timeslot to check.

        Returns
            (bool): True if it is within the window.
        """
        now = engine.Engine.engine_time()
        window_start = self.config.scheduler.scheduling_window_start
        window_end = self.config.scheduler.scheduling_window_end

        if timeslot.get_start() - window_start < now and timeslot.get_end() - window_end > now:
            return True
        return False

    def terminate(self):
        """
        Call this when the thread is stopped or a signal to terminate is received.
        """
        self.logger.info(SU.yellow("[TimetableService] Shutting down..."))
        if self.api_fetcher:
            self.api_fetcher.terminate()


class TimetableMerger:
    """
    Compare and merge the local with the remote set of timeslots.
    """

    config: confuse.Configuration
    logger: logging.Logger

    def __init__(self):
        """
        Initialize.
        """
        self.config = AuraConfig.instance.config
        self.logger = logging.getLogger("engine")

    def build_map(
        self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
    ) -> dict[str, dict]:
        """
        Build a map of local and remote timeslots relations.

        The start time of any timeslot acts as the key. The map value holds a dictionary, where
        the `local` key is the local timeslot, and the `remote` key holds the remote timeslot.

        Args:
            local_timeslots ([Timeslot]): The current timetable.
            remote_timeslots ([Timeslot]): The latest timeslots from the API.

        Returns:
            ({str: {}}): Map with timestamp as key and a dictionary referencing timeslots.
        """
        timeslot_map: dict[str, dict] = {}
        if local_timeslots:
            for ts in local_timeslots:
                if not timeslot_map.get(str(ts.get_start())):
                    timeslot_map[str(ts.get_start())] = {}

                idx = timeslot_map[str(ts.get_start())]

                # Existing virtual timeslots are ignored, because they are likely updated remotely.
                # if not ts.is_virtual():
                idx["local"] = ts
                idx["remote"] = None

        if remote_timeslots:
            for ts in remote_timeslots:
                if not timeslot_map.get(str(ts.get_start())):
                    timeslot_map[str(ts.get_start())] = {}

                idx = timeslot_map[str(ts.get_start())]
                idx["remote"] = ts
                if not idx.get("local"):
                    idx["local"] = None
        return timeslot_map

    @synchronized
    def merge(
        self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
    ) -> list[Timeslot]:
        """
        Merge strategy for local and remote timeslots.

        Args:
            local_timeslots (Timeslot]): Local timeslots.
            remote_timeslots (Timeslot]): Remote timeslots.

        Returns:
            [Timeslot]: The merged timeslots.
        """
        merged_ts = []
        timeslot_map = self.build_map(local_timeslots, remote_timeslots)
        if not timeslot_map:
            return merged_ts

        scheduling_window_start = self.config.scheduler.scheduling_window_start
        now: float = SU.timestamp()
        resolution: str = ""
        merge_info = SU.cyan("\nMap for timetable merge:")

        for timestamp, ts_relation in timeslot_map.items():
            local: Timeslot = ts_relation.get("local")
            remote: Timeslot = ts_relation.get("remote")

            if (float(timestamp) - scheduling_window_start) < now:
                # It's past the scheduling window start, so keep the local one as is
                if local:
                    # Only keep timeslots which have not yet ended.
                    if local.end > now:
                        merged_ts.append(local)
                        resolution = "add | currently playing"
                    else:
                        resolution = "skip | out of scheduling window"
                else:
                    # No local timeslot, so add it in any case.
                    resolution = "add"
                    merged_ts.append(remote)
            else:
                if local and not remote:
                    # Timeslot was deleted remotely, remove any local one
                    resolution = "remove"
                elif not local and remote:
                    # Timeslot was added remotely
                    resolution = "add"
                    merged_ts.append(remote)
                elif local and remote:
                    # Timeslot existing locally, was updated or did not change remotely
                    # Update the local timeslot with possibly changed data
                    local.update(remote)
                    merged_ts.append(local)
                    resolution = "update"
                else:
                    # Relations w/o local and remote timeslots should not happen
                    self.logger.critical(SU.red("Skipping invalid merge case!"))
                    resolution = "skip | invalid merge case"

            local_str = "local:" + str(local).ljust(25)
            remote_str = "remote: " + str(remote).ljust(25)
            merge_info += SU.cyan(
                f"\n\tTIME:{timestamp} - {local_str} | {remote_str} ↦ ({resolution})"
            )

        self.logger.debug(merge_info + "\n")
        return merged_ts