Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Show changes
Showing
with 2431 additions and 0 deletions
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Service and command classes used for scheduling.
- AuraScheduler: Threaded service which performs the scheduling.
- TimeslotCommand: Inherits `EngineExecuter` to perform timeslot commands.
- PlayCommand: Inherits `EngineExecuter` to perform play commands on playlist items.
"""
from __future__ import annotations
import logging
import threading
import time
import confuse
import aura_engine.engine as engine
import aura_engine.scheduling.timetable as timetable
import aura_engine.scheduling.utils as utils
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.control import EngineExecutor
from aura_engine.core.channels import LoadSourceException
from aura_engine.resources import ResourceClass, ResourceUtil
from aura_engine.scheduling.domain import PlaylistItem, PlayState, Timeslot
#
# EngineExecutor Commands
#
class TimeslotCommand(EngineExecutor):
"""
Command for triggering start and end of timeslot events.
"""
engine: engine.Engine
config: confuse.Configuration
def __init__(self, engine: engine.Engine, timeslot: Timeslot):
"""
Initialize the timeslot command.
Args:
engine (engine.Engine): The engine
timeslot (Timeslot): The timeslot which is starting at this time
"""
self.config = AuraConfig.instance.config
self.engine = engine
now = SU.timestamp()
fade_out_time = float(self.config.scheduler.fade_out_time)
start_fade_in = timeslot.get_start() - now
start_fade_out = timeslot.get_end() - now - fade_out_time
self.logger.debug(
f"Fading in timeslot in {start_fade_in} seconds at {SU.fmt_time(timeslot.get_start())}"
f" | Timeslot: {timeslot}"
)
self.logger.debug(
f"Fading out timeslot in {start_fade_out} seconds at"
f" {SU.fmt_time(timeslot.get_end() - fade_out_time)} | Timeslot: {timeslot}"
)
# Initialize the "fade in" EngineExecuter and instantiate a connected child EngineExecuter
# for "fade out" when the parent is ready
id = timeslot.get_id()
super().__init__(
f"TIMESLOT#{id}",
None,
timeslot.get_start(),
self.do_start_timeslot,
timeslot,
)
EngineExecutor(
f"TIMESLOT#{id}",
self,
timeslot.get_end() - fade_out_time,
self.do_end_timeslot,
timeslot,
)
def do_start_timeslot(self, timeslot: Timeslot):
"""
Indicate the start of the timeslot by sending a `on_timeslot_start` event.
"""
self.logger.info(SU.cyan(f"=== on_timeslot_start('{timeslot}') ==="))
self.engine.event_dispatcher.on_timeslot_start(timeslot)
def do_end_timeslot(self, timeslot: Timeslot):
"""
Indicate the start of the timeslot by sending a `on_timeslot_end` event.
Also resetting the used channel.
"""
self.logger.info(SU.cyan(f"=== on_timeslot_end('{timeslot}') ==="))
self.engine.event_dispatcher.on_timeslot_end(timeslot)
def has_direct_successor():
timetable: timetable.TimetableService = self.engine.scheduler.timetable
next_timeslot: Timeslot = timetable.get_next_timeslots(1)
if next_timeslot:
next_timeslot = next_timeslot[0]
if next_timeslot.get_start() > timeslot.get_end():
return False
return True
else:
return False
if not has_direct_successor():
self.engine.player.stop(engine.Player.TransitionType.FADE)
class PlayCommand(EngineExecutor):
"""
Command for triggering timed preloading and playing as a child command.
"""
engine: engine.Engine
config: confuse.Configuration
def __init__(self, engine: engine.Engine, items: list[PlaylistItem]):
"""Initialize the play command.
Args:
engine (engine.Engine): The engine
items ([PlaylistItem]): One or more playlist items to be started
"""
self.config = AuraConfig.instance.config
self.engine = engine
preload_offset = self.config.scheduler.preload_offset
first_item: PlaylistItem = items[0]
start_preload = first_item.get_start() - preload_offset
# TODO: Question: could get_start() return int instead of float?
start_play = int(first_item.get_start())
msg = (
f"Preloading items at {SU.fmt_time(start_preload)}, {preload_offset} seconds before"
f" playing it at {SU.fmt_time(start_play)}"
)
self.logger.info(msg)
# Initialize the "preload" EngineExecuter and attach a child `PlayCommand` to the
# "on_ready" event handler
id = first_item.get_playlist().get_timeslot().get_id()
super().__init__(f"PRELOAD#{id}", None, start_preload, self.do_preload, items)
EngineExecutor(f"PLAY#{id}", self, start_play, self.do_play, items)
def do_preload(self, items: list[PlaylistItem]):
"""
Preload the items.
Args:
items ([PlaylistItem]): The set of playlist items to be pre-loaded.
"""
items_str = ResourceUtil.get_items_string(items)
try:
first_item: PlaylistItem = items[0]
if first_item.get_content_type() in ResourceClass.FILE.types:
self.logger.info(SU.cyan(f"=== preload_group('{items_str}') ==="))
self.engine.player.preload_group(items)
else:
self.logger.info(SU.cyan(f"=== preload('{items_str}') ==="))
self.engine.player.preload(items[0])
except LoadSourceException:
self.logger.critical(SU.red(f"Could not preload items {items_str}"))
last_item: PlaylistItem = items[-1]
if not last_item.play.is_ready():
msg = f"Items didn't reach 'ready' state during preloading (Items: {items_str})"
last_item.play.state = PlayState.PlayStateType.TIMEOUT
self.logger.warning(SU.red(msg))
def do_play(self, items: list[PlaylistItem]):
"""
Play the items.
Args:
items ([PlaylistItem]): The set of playlist items to be played.
"""
log_interval = 3
last_log = 0
items_str = ResourceUtil.get_items_string(items)
self.logger.info(SU.cyan(f"=== play('{items_str}') ==="))
last_item: PlaylistItem = items[-1]
# NOTE: this may not break and keep on displaying messages if the play state
# is never set to READY or TIMEOUT.
while not last_item.play.is_ready():
now = SU.timestamp()
if now - last_log > log_interval:
self.logger.critical(SU.red(f"PLAY: Item(s) not yet ready ({items_str})"))
last_log = now
if last_item.play.state == PlayState.PlayStateType.TIMEOUT:
self.logger.warning(SU.red("PLAY: Preloading timed out."))
break
time.sleep(1)
self.engine.player.play(items[0], engine.Player.TransitionType.FADE)
timetable_renderer: utils.TimetableRenderer = self.engine.scheduler.timetable_renderer
self.logger.info(timetable_renderer.get_ascii_timeslots())
#
# Scheduler
#
class AuraScheduler(threading.Thread):
"""
The Scheduler.
The program scheduler has two main duties:
- Retrieve scheduling data from the API, stored in the timetable.
- Execute engine action commands in an automated, timed fashion.
"""
# Command class initialization. Can be overridden in test cases.
TimeslotCommandClass: EngineExecutor = TimeslotCommand
PlayCommandClass: EngineExecutor = PlayCommand
config: confuse.Configuration
logger: logging.Logger
engine: engine.Engine
timetable: timetable.TimetableService
timetable_renderer: utils.TimetableRenderer
exit_event: threading.Event
is_initialized: bool
is_engine_ready: bool
def __init__(self, engine: engine.Engine):
"""
Initialize the scheduler.
Args:
engine (engine.Engine): The engine.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
self.timetable = timetable.TimetableService(self.config.general.cache_dir)
self.timetable_renderer = utils.TimetableRenderer(self)
self.engine = engine
self.engine.scheduler = self
# FIXME: is_soundsytem_init is never used
self.is_soundsytem_init = False
self.exit_event: threading.Event
# Scheduler Initialization
self.is_initialized = False
self.is_engine_ready = False
def boot(self):
"""
Start the scheduler in a thread.
"""
# Init scheduling thread
threading.Thread.__init__(self)
self.exit_event = threading.Event()
self.start()
def run(self):
"""
Execute the scheduler.
Called when thread is started via `start()`. It does the following:
1. Refresh timetable periodically from the API depending on the
`fetching_frequency` defined in the engine configuration.
2. Merge the timetable with the latest, local version.
3. Queue all timeslots of the timetable
On every cycle the configuration file is reloaded, to allow modifications while running
the engine.
"""
while not self.exit_event.is_set():
try:
AuraConfig.instance.load_config()
seconds_to_wait = self.config.scheduler.fetching_frequency
msg = f"== start fetching new timeslots (every {seconds_to_wait} seconds) =="
self.logger.info(SU.cyan(msg))
# Load some stuff from the API in any case
self.timetable.refresh()
# Queue only when the engine is ready to play
if self.is_initialized:
self.queue_program()
except Exception as e:
msg = f"Unhandled error while fetching & scheduling new program! ({str(e)})"
self.logger.critical(SU.red(msg), e)
# Keep on working anyway
EngineExecutor.log_commands()
self.exit_event.wait(seconds_to_wait)
#
# EVENT HANDLERS
#
def on_ready(self):
"""
Handle `on_ready` event.
Called when the engine has finished booting and is ready to play.
1. First the item currently to be played is scheduled.
2. All following items are queued.
"""
self.is_initialized = True
self.logger.info(self.timetable_renderer.get_ascii_timeslots())
self.play_active_item()
self.queue_startup_items()
#
# METHODS
#
def get_timetable(self) -> timetable.TimetableService:
"""
Get the timetable service.
Returns:
(TimetableService): The timetable service.
"""
return self.timetable
def play_active_item(self):
"""
Play currently active playlist item, as per timetable.
Plays the item scheduled for the very current moment and forwards to the scheduled
position in time. Usually called when the Engine boots.
"""
sleep_offset = 10
active_timeslot: Timeslot = self.timetable.get_current_timeslot()
# Schedule any available fallback playlist
if active_timeslot:
if active_timeslot.is_virtual():
self.logger.info(f"Skipping virtual timeslot: {active_timeslot}")
else:
# Create command timer to indicate the start of the timeslot
AuraScheduler.TimeslotCommandClass(self.engine, active_timeslot)
active_item: PlaylistItem | None = self.timetable.get_current_item()
if not active_item:
return
# In case of a file-system source, we need to fast-forward to the current marker as per
# position in the timeslot
if active_item.get_content_type() in ResourceClass.FILE.types:
# Calculate the seconds we have to fast-forward
now = engine.Engine.engine_time()
seconds_to_roll = now - active_item.get_start()
# If the roll exceeds the length of the current track,
# there's no need to do anything - the scheduler takes care of the rest
if (seconds_to_roll + sleep_offset) > active_item.duration:
msg = "The FFWD [>>] range exceeds the length of the item. \
Drink some tea and wait for the sound of the next item."
self.logger.info(msg)
return
else:
# Preload and play active item
AuraScheduler.PlayCommandClass(self.engine, [active_item])
# Fast-forward to the scheduled position
# TODO The roll should happen before the channel is active, avoiding the roll being
# audible.
if seconds_to_roll > 0:
# Without plenty of timeout (10s) the roll doesn't work
# TODO Check if this is still the case with Liquidsoap 2
# TODO Think if this should be externalized as a command for better testability
def async_preroll(seconds_to_roll):
seconds_to_roll += sleep_offset
time.sleep(sleep_offset)
self.logger.info(f"Going to fast-forward {seconds_to_roll} seconds")
rolled = self.engine.player.roll(active_item.play.channel, seconds_to_roll)
if rolled:
self.logger.info("Pre-roll done")
thread = threading.Thread(target=async_preroll, args=(seconds_to_roll,))
thread.start()
elif (
active_item.get_content_type() in ResourceClass.STREAM.types
or active_item.get_content_type() in ResourceClass.LIVE.types
):
# Preload and play active item
AuraScheduler.PlayCommandClass(self.engine, [active_item])
else:
self.logger.critical(f"Unknown content type for item {active_item}")
return
def queue_program(self):
"""
Queue the current program.
Playlists of every timeslot are queued by creating timed commands to the sound-system to
enable the individual tracks of playlists.
"""
# Get a clean set of the timeslots within the scheduling window
timeslots = self.timetable.get_next_timeslots(window_aware=True)
# Queue the timeslots, their playlists and items
if not timeslots:
self.logger.info(SU.green("No program to be queued."))
else:
next_timeslot: Timeslot
for next_timeslot in timeslots:
if next_timeslot.is_virtual():
self.logger.info(f"Skipping virtual timeslot: {next_timeslot}")
continue
# Create command timer to indicate the start of the timeslot
AuraScheduler.TimeslotCommandClass(self.engine, next_timeslot)
playlist = next_timeslot.get_current_playlist()
if playlist:
self.queue_playlist_items(next_timeslot, playlist.items)
self.logger.info(SU.green("Finished queuing program."))
def queue_startup_items(self):
"""
Queue all items after the one currently playing upon startup.
Don't use this method in any other scenario, as it doesn't respect the scheduling window.
"""
current_timeslot: Timeslot = self.timetable.get_current_timeslot()
# Queue the (rest of the) currently playing timeslot upon startup
if current_timeslot:
current_playlist = current_timeslot.get_current_playlist()
if current_playlist:
active_item: PlaylistItem = self.timetable.get_current_item()
if active_item:
# Queue open items for current playlist
rest_of_playlist = active_item.get_all_next(True)
self.queue_playlist_items(current_timeslot, rest_of_playlist)
def queue_playlist_items(self, timeslot: Timeslot, items: list[PlaylistItem]):
"""
Create player commands for all playlist items to be executed at the scheduled time.
Since each scheduled playlist can consist of multiple item types such as *file*, *live*,
and *stream*, the play-out of the timeslot is actually a bit more complex.
Before any playlist items of the timeslot can be turned into sound, they need to be
aggregated, queued and pre-loaded.
1. First, all items are aggregated when they hold filesystem items.
Given you have a playlist with 10 items, the first 4 are consisting of files, the
next two of a a stream and a live source. The last 4 are files again.
These items are now aggregated into 4 groups: one for the files, one for the stream,
one for the live item and another one for files.
For each group a timer for executing the next step is created.
2. Now, the playlist items are going to be "pre-loaded".
This means that filesystem items are queued and pre-loaded and items which are
based on audio streams are buffered.
This is required to allow a seamless play-out, when its time to do so (in the next
stage).
Due to their nature, playlist items which hold live audio sources are not affected by
this stage at all.
Args:
timeslot (Timeslot): The timeslot this items belong to.
items ([PlaylistItem]): The playlist items to be scheduled for playout.
"""
item_groups = []
item_groups.append([])
index = 0
# Group/aggregate all filesystem items, allowing them to be queued at once
item: PlaylistItem
previous_item: PlaylistItem = None
for item in items:
if previous_item is None or (
previous_item.get_content_type() == item.get_content_type()
and item.get_content_type() in ResourceClass.FILE.types
):
item_groups[index].append(item)
else:
index += 1
item_groups.append([])
item_groups[index].append(item)
previous_item = item
self.logger.info(f"Built {len(item_groups)} item group(s)")
# Timeslot function calls
if len(items) > 0 and len(item_groups) > 0:
for items in item_groups:
if not isinstance(items, list):
raise ValueError(f"Invalid Item Group: {str(items)}")
# Create command timers for each item group
AuraScheduler.PlayCommandClass(self.engine, items)
else:
self.logger.warn(SU.red(f"Nothing to schedule for timeslot {timeslot}"))
def terminate(self):
"""
Call when thread is stopped or termination signal is received.
"""
self.logger.info(SU.yellow("[Scheduler] Shutting down..."))
self.timetable.terminate()
self.exit_event.set()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Services representing and dealing with the program timetable.
- TimetableService: Representation of the timetable containing timeslots for being scheduled.
- TimetableMerger: Processor to merge remote timeslots with the local ones.
"""
from __future__ import annotations
import logging
import os
from datetime import datetime
import confuse
import jsonpickle
import aura_engine.engine as engine
import aura_engine.scheduling.api as api
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
class TimetableService:
"""
Timetable Service.
The current program timetable as per scheduling calendar. The timetable is a set of
timeslots over the period of 24 hours.
"""
config: confuse.Configuration
logger: logging.Logger
timetable: list[Timeslot] | None = None
api_fetcher: api.ApiFetcher | None = None
last_successful_fetch: datetime | None = None
cache_location: str | None = None
timetable_file: str | None = None
restorable: bool
def __init__(self, cache_location: str):
"""
Initialize.
Args:
cache_location (str): Path to folder where timetable.json is stored.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
jsonpickle.set_encoder_options("json", sort_keys=False, indent=2)
if cache_location[-1] != "/":
cache_location += "/"
cache_location += "timetable"
os.makedirs(cache_location, exist_ok=True)
self.cache_location = cache_location
self.timetable_file = self.cache_location + "/timetable.json"
self.restorable = False
@synchronized
def refresh(self):
"""
Update the timetable.
1. Fetch the latest timeslots from the API or the API cache.
2. Merge with the current timetable.
3. Persist to `timetable.json`
"""
self.logger.debug("Trying to fetch new timeslots from API endpoints...")
if not self.api_fetcher:
self.api_fetcher = api.ApiFetcher()
self.api_fetcher.start()
response = self.api_fetcher.fetch()
self.api_fetcher = None
if response.code == 0:
if len(response.timeslots) > 0:
self.last_successful_fetch = datetime.now()
msg = f"Finished fetching current timeslots from API ({len(response)})"
self.logger.info(SU.green(msg))
self.merge_timetable(response.timeslots)
# FIXME: for some unknown reason storing the timetable fails. This does not affect
# the playout, since storing the timetable is not in use atm.
# self.persist_timetable()
else:
self.logger.warning("Program fetched from Steering contains no timeslots!")
else:
msg = SU.red(f"Keep using current timetable, as API returned: {response.message}")
self.logger.warning(msg)
self.logger.debug("Exception in API Fetcher: \n" + str(response.exception))
def merge_timetable(self, remote_timeslots: list):
"""
Merge the fetched timeslots with the current local timetable.
Args:
remote_timeslots (list): the timeslots fetched from the API.
"""
merger: TimetableMerger = TimetableMerger()
self.timetable = merger.merge(self.timetable, remote_timeslots)
self.logger.info(SU.green("Merged timetable"))
def persist_timetable(self):
"""
Stores the current timetable to the local JSON file used for caching.
"""
if self.timetable_file is None:
self.logger.error(SU.red("No timetable file specified."))
return
# Ensure the directory exists
os.makedirs(os.path.dirname(self.timetable_file), exist_ok=True)
try:
with open(self.timetable_file, "w") as file:
file.write(jsonpickle.encode(self.timetable, unpicklable=self.restorable))
self.logger.info(SU.green("timetable.json stored"))
except FileNotFoundError as fnf_error:
self.logger.error(SU.red(f"File not found: {self.timetable_file}. {fnf_error}"))
except PermissionError as perm_error:
self.logger.error(
SU.red(f"Permission denied for file: {self.timetable_file}. {perm_error}")
)
except Exception as e:
self.logger.error(SU.red(f"Unexpected error while storing {self.timetable_file}: {e}"))
def get_current_item(self) -> PlaylistItem | None:
"""
Retrieve the current playlist item which should be played as per timetable.
Returns:
(PlaylistItem): The track which is (or should) currently be on air.
"""
now: float = engine.Engine.engine_time()
# If necessary initialize timetable
if not self.timetable:
self.refresh()
# Check for current timeslot
current_timeslot: Timeslot = self.get_current_timeslot()
if not current_timeslot:
self.logger.warning(SU.red("There's no active timeslot"))
return None
# Check for scheduled playlist
current_playlist: Playlist = current_timeslot.get_current_playlist()
if not current_playlist:
msg = (
"There's no (default) playlist assigned to the current timeslot."
" Most likely a fallback will make things okay again."
)
self.logger.warning(SU.red(msg))
return None
# Iterate over playlist items and store the current one
current_item = None
for item in current_playlist.items:
if item.get_start() <= now and now <= item.get_end():
current_item = item
break
if not current_item:
# Nothing playing ... fallback will kick-in
msg = f"There's no current item in playlist {current_playlist}"
self.logger.warning(SU.red(msg))
return None
return current_item
def get_current_timeslot(self) -> Timeslot:
"""
Retrieve the timeslot currently to be played.
Returns:
(Timeslot): The current timeslot.
"""
current_timeslot = None
now = engine.Engine.engine_time()
# Iterate over all timeslots and find the one to be played right now
if self.timetable:
timeslot: Timeslot
for timeslot in self.timetable:
if timeslot.get_start() <= now and now < timeslot.get_end():
current_timeslot = timeslot
break
return current_timeslot
def get_next_timeslots(self, max_count: int = 0, window_aware=False) -> list[Timeslot]:
"""
Retrieve the timeslots to be played after the current one.
The method allows to return only a max count of timeslots. Also, timeslots which are
not within the scheduling window can be optionally omitted.
The scheduling window behaviour has these effects, when scheduling timeslots:
- Before the scheduling window: Timeslots can still be deleted in Steering and the
playout will respect this.
- During the scheduling window: Timeslots and it's playlists are queued as timed
commands.
- After the scheduling window: Such timeslots are ignored, because it doesn't make
sense anymore to schedule them before the next timeslot starts.
Args:
max_count (Integer): Maximum of timeslots to return. If `0` is passed, all existing
ones are returned.
window_aware (bool): If set to true, only timeslots within the scheduling window are
returned.
Returns:
([Timeslot]): The upcoming timeslots.
"""
now = engine.Engine.engine_time()
next_timeslots = []
if not self.timetable:
return []
ts: Timeslot
for ts in self.timetable:
if ts.get_start() > now:
in_window = self.is_timeslot_in_window(ts)
if not window_aware or in_window:
if (len(next_timeslots) < max_count) or max_count == 0:
next_timeslots.append(ts)
else:
break
else:
start = SU.fmt_time(ts.get_start())
end = SU.fmt_time(ts.get_end())
t1: int = self.config.scheduler.scheduling_window_start
t2: int = self.config.scheduler.scheduling_window_end
msg = f"Skip timeslot #{ts.get_id()} [{start} - {end}] "
msg += f"- not in scheduling window T¹-{t1}s to T²-{t2}s"
self.logger.debug(msg)
return next_timeslots
def is_timeslot_in_window(self, timeslot: Timeslot) -> bool:
"""
Check if the timeslot is within the scheduling window.
The scheduling window represents the soonest and latest
point some timeslot can be scheduled.
Args:
timeslot (Timeslot): The timeslot to check.
Returns
(bool): True if it is within the window.
"""
now = engine.Engine.engine_time()
window_start = self.config.scheduler.scheduling_window_start
window_end = self.config.scheduler.scheduling_window_end
if timeslot.get_start() - window_start < now and timeslot.get_end() - window_end > now:
return True
return False
def terminate(self):
"""
Call this when the thread is stopped or a signal to terminate is received.
"""
self.logger.info(SU.yellow("[TimetableService] Shutting down..."))
if self.api_fetcher:
self.api_fetcher.terminate()
class TimetableMerger:
"""
Compare and merge the local with the remote set of timeslots.
"""
config: confuse.Configuration
logger: logging.Logger
def __init__(self):
"""
Initialize.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
def build_map(
self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
) -> dict[str, dict]:
"""
Build a map of local and remote timeslots relations.
The start time of any timeslot acts as the key. The map value holds a dictionary, where
the `local` key is the local timeslot, and the `remote` key holds the remote timeslot.
Args:
local_timeslots ([Timeslot]): The current timetable.
remote_timeslots ([Timeslot]): The latest timeslots from the API.
Returns:
({str: {}}): Map with timestamp as key and a dictionary referencing timeslots.
"""
timeslot_map: dict[str, dict] = {}
if local_timeslots:
for ts in local_timeslots:
if not timeslot_map.get(str(ts.get_start())):
timeslot_map[str(ts.get_start())] = {}
idx = timeslot_map[str(ts.get_start())]
# Existing virtual timeslots are ignored, because they are likely updated remotely.
# if not ts.is_virtual():
idx["local"] = ts
idx["remote"] = None
if remote_timeslots:
for ts in remote_timeslots:
if not timeslot_map.get(str(ts.get_start())):
timeslot_map[str(ts.get_start())] = {}
idx = timeslot_map[str(ts.get_start())]
idx["remote"] = ts
if not idx.get("local"):
idx["local"] = None
return timeslot_map
@synchronized
def merge(
self, local_timeslots: list[Timeslot] | None, remote_timeslots: list[Timeslot] | None
) -> list[Timeslot]:
"""
Merge strategy for local and remote timeslots.
Args:
local_timeslots (Timeslot]): Local timeslots.
remote_timeslots (Timeslot]): Remote timeslots.
Returns:
[Timeslot]: The merged timeslots.
"""
merged_ts = []
timeslot_map = self.build_map(local_timeslots, remote_timeslots)
if not timeslot_map:
return merged_ts
scheduling_window_start = self.config.scheduler.scheduling_window_start
now: float = SU.timestamp()
resolution: str = ""
merge_info = SU.cyan("\nMap for timetable merge:")
for timestamp, ts_relation in timeslot_map.items():
local: Timeslot = ts_relation.get("local")
remote: Timeslot = ts_relation.get("remote")
if (float(timestamp) - scheduling_window_start) < now:
# It's past the scheduling window start, so keep the local one as is
if local:
# Only keep timeslots which have not yet ended.
if local.end > now:
merged_ts.append(local)
resolution = "add | currently playing"
else:
resolution = "skip | out of scheduling window"
else:
# No local timeslot, so add it in any case.
resolution = "add"
merged_ts.append(remote)
else:
if local and not remote:
# Timeslot was deleted remotely, remove any local one
resolution = "remove"
elif not local and remote:
# Timeslot was added remotely
resolution = "add"
merged_ts.append(remote)
elif local and remote:
# Timeslot existing locally, was updated or did not change remotely
# Update the local timeslot with possibly changed data
local.update(remote)
merged_ts.append(local)
resolution = "update"
else:
# Relations w/o local and remote timeslots should not happen
self.logger.critical(SU.red("Skipping invalid merge case!"))
resolution = "skip | invalid merge case"
local_str = "local:" + str(local).ljust(25)
remote_str = "remote: " + str(remote).ljust(25)
merge_info += SU.cyan(
f"\n\tTIME:{timestamp} - {local_str} | {remote_str} ↦ ({resolution})"
)
self.logger.debug(merge_info + "\n")
return merged_ts
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A collection of utilities used for scheduling.
"""
from __future__ import annotations
import logging
import confuse
import aura_engine.scheduling as scheduling
from aura_engine.base.config import AuraConfig
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.resources import ResourceUtil
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
class M3UPlaylistProcessor:
"""
Render a M3U Playlist as a engine compatible playlist dictionary.
"""
logging: logging.Logger
config: confuse.Configuration
playlist_folder = None
def __init__(self):
"""
Initialize.
"""
self.config = AuraConfig.instance.config
self.logger = logging.getLogger("engine")
self.playlist_folder = ResourceUtil.playlist_folder_path()
def spread(self, item: PlaylistItem) -> list:
"""
Generate individual playlist items out of any given M3U playlist.
The expected source format is 'm3u://%FILENAME%'. If that is the case, it reads the
M3U file is read and builds individual `PlaylistItem` objects based on the M3U contents.
The evaluation is skipped if no playlist folder is configured. If the M3U file is not
readable, it fails silently by returning an empty list.
Args:
item (PlaylistItem): The potential M3U items to spread.
Returns:
[PlaylitItem]: List containing one or more items.
"""
if not self.playlist_folder:
return item
result = []
m3u_items = None
# It's a M3U Playlist which needs to be spread
if item.source and item.source.startswith("m3u://"):
playlist_name = item.source.split("m3u://")[1]
self.logger.info(f"Spreading items of M3U playlist '{playlist_name}'")
m3u_items = self.read_m3u_file(self.playlist_folder + playlist_name)
result += m3u_items
# It's an ordinary item to be taken as it is
else:
result.append(item)
return result
def read_m3u_file(self, source_file) -> list:
"""
Read items from an M3U file.
Args:
source_file (str): Path to the M3U file.
Returns:
[PlaylistItem]: One or more playlist items.
"""
items: PlaylistItem = []
try:
with open(source_file, "r") as file:
lines = file.readlines()
for i in range(0, len(lines)):
if lines[i].startswith("#EXTINF:"):
metadata = lines[i].split("#EXTINF:")[1].split(",")
source = "file://" + lines[i + 1].strip()
duration = int(metadata[0])
artist = metadata[1].split(" - ")[0].strip()
title = metadata[1].split(" - ")[1].strip()
m = PlaylistItem.Metadata(artist, "", title)
item = PlaylistItem(source, duration, 100, m)
items.append(item)
except EnvironmentError:
self.logger.error(f"Skipping unreadable M3U playlist file '{source_file}'")
return items
class TimetableRenderer:
"""
Display current and next timeslots in ASCII for maintenance and debugging.
"""
logger: logging.Logger
scheduler: scheduling.scheduler.AuraScheduler
timetable: scheduling.timetable.TimetableService
def __init__(self, scheduler: scheduling.scheduler.AuraScheduler):
self.logger = logging.getLogger("engine")
self.scheduler = scheduler
self.timetable = scheduler.get_timetable()
def get_ascii_timeslots(self) -> str:
"""
Create a printable version of the current timetable.
The output contains playlists and items as per timeslot.
Returns:
(String): An ASCII representation of the current and next timeslots.
"""
active_timeslot: Timeslot = self.timetable.get_current_timeslot()
s = "\n\n SCHEDULED NOW:"
s += "\n".ljust(102, "-")
if active_timeslot:
planned_playlist: Playlist = active_timeslot.get_current_playlist()
s += f"\n│ Playing timeslot {active_timeslot}"
if planned_playlist:
t = planned_playlist.get_type()
s += f"\n│ └── {t}: {planned_playlist}"
active_item: PlaylistItem = self.timetable.get_current_item()
item_num: int = 1
if active_item:
# Display previous playlist items
prev_items: list[PlaylistItem] = active_item.get_all_prev()
for item in prev_items:
s += self.build_item_string("\n│ └── ", item_num, item, True)
item_num += 1
# Display current playlist item
active_str = SU.green(f"PLAYING > {active_item}]")
s += f"\n│ └── Item {item_num} | {active_str}"
item_num += 1
# Display next playlist items
next_items: list[PlaylistItem] = active_item.get_all_next()
items = self.preprocess_items(next_items, False)
s += self.build_playlist_string(items, item_num)
else:
s += "\n"
s += SU.red("No playlist assigned. Station fallback will take over.")
else:
s += "\n│ Nothing. "
s += "\n".ljust(102, "-")
s += "\n SCHEDULED NEXT:"
s += "\n".ljust(102, "-")
next_timeslots: list[Timeslot] = self.timetable.get_next_timeslots()
if not next_timeslots:
s += "\n│ Nothing. "
else:
timeslot: Timeslot
for timeslot in next_timeslots:
playlist: Playlist = timeslot.get_current_playlist()
if playlist:
t = SU.cyan(str(playlist.get_type()))
s += f"\n│ Queued timeslot {timeslot} "
s += f"\n│ └── {t}: {playlist}"
if playlist.get_duration() > timeslot.get_end() - timeslot.get_start():
warning = SU.red("↑↑↑ Playlist ends after timeslot!")
s += f"\n{warning}! "
items = self.preprocess_items(playlist.items, False)
s += self.build_playlist_string(items, 1)
else:
s += f"\n│ Fallback ({timeslot}). "
s += "\n".ljust(102, "-") + "\n\n"
return s
def build_playlist_string(self, items: list[PlaylistItem], num_start: int) -> str:
"""
Return a stringified list of items.
Args:
items ([PlaylistItem]): The items.
num_start (int): The number of the first item.
Returns:
(str): The resulting string to be displayed.
"""
s = ""
is_out_of_timeslot = False
item: PlaylistItem
for item in items:
if (
item.play_queue_state == PlaylistItem.QueueState.PAST_TIMESLOT
and not is_out_of_timeslot
):
warning = SU.red("↓↓↓ These 'out of timeslot' items won't be played.")
s += f"\n{warning}"
is_out_of_timeslot = True
s += self.build_item_string("\n│ └── ", num_start, item, is_out_of_timeslot)
num_start += 1
return s
def build_item_string(self, prefix: str, num: int, item: PlaylistItem, strike: bool) -> str:
"""
Return an stringified item.
Args:
prefix (str): The prefix.
num (int): The number of the item in the playlist.
strike (bool): If the string should be striked through.
Returns:
(str): The item as string to be displayed.
"""
s = ""
if item.play_queue_state == PlaylistItem.QueueState.CUT:
warning = SU.red("↓↓↓ This item is going to be cut.")
s = f"\n{warning}"
if strike:
item_str = SU.strike(item)
else:
item_str = str(item)
return s + f"{prefix}Item {num} | {item_str}"
def preprocess_items(self, items: list[PlaylistItem], rm_past_ts: bool) -> list[PlaylistItem]:
"""
Analyse and mark items which are going to be cut or excluded.
Args:
items ([PlaylistItem]): The playlist items to be scheduled for playout
rm_past_ts (Boolean): If `True` items which are 'out of timeslot' are not returned
Returns:
([PlaylistItem]): The list of processed playlist items
"""
clean_items = []
item: PlaylistItem
for item in items:
timeslot: Timeslot = item.get_playlist().get_timeslot()
if item.get_start() >= timeslot.get_end():
msg = f"Filtered item ({item}) after end-of timeslot ({timeslot}) ... SKIPPED"
self.logger.debug(msg)
item.play_queue_state = PlaylistItem.QueueState.PAST_TIMESLOT
elif item.get_end() > timeslot.get_end():
item.play_queue_state = PlaylistItem.QueueState.CUT
else:
item.play_queue_state = PlaylistItem.QueueState.OKAY
if (
not item.play_queue_state == PlaylistItem.QueueState.PAST_TIMESLOT
or not rm_past_ts
):
clean_items.append(item)
return clean_items
""" Contains all the data models used in inputs/outputs """
from .playout_episode import PlayoutEpisode
from .playout_program_entry import PlayoutProgramEntry
from .playout_schedule import PlayoutSchedule
from .playout_show import PlayoutShow
from .time_slot import TimeSlot
__all__ = (
"PlayoutEpisode",
"PlayoutProgramEntry",
"PlayoutSchedule",
"PlayoutShow",
"TimeSlot",
)
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="PlayoutEpisode")
@attr.s(auto_attribs=True)
class PlayoutEpisode:
"""
Attributes:
id (int):
title (Union[Unset, str]): Title of the note.
"""
id: int
title: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
id = self.id
title = self.title
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"id": id,
}
)
if title is not UNSET:
field_dict["title"] = title
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
id = d.pop("id")
title = d.pop("title", UNSET)
playout_episode = cls(
id=id,
title=title,
)
playout_episode.additional_properties = d
return playout_episode
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, TypeVar
import attr
from dateutil.parser import isoparse
if TYPE_CHECKING:
from ..models.playout_episode import PlayoutEpisode
from ..models.playout_schedule import PlayoutSchedule
from ..models.playout_show import PlayoutShow
from ..models.time_slot import TimeSlot
T = TypeVar("T", bound="PlayoutProgramEntry")
@attr.s(auto_attribs=True)
class PlayoutProgramEntry:
"""
Attributes:
id (str):
start (datetime.datetime):
end (datetime.datetime):
show_id (int):
show (PlayoutShow):
timeslot_id (Optional[int]):
playlist_id (Optional[int]):
timeslot (Optional[TimeSlot]):
episode (Optional[PlayoutEpisode]):
schedule (Optional[PlayoutSchedule]):
"""
id: str
start: datetime.datetime
end: datetime.datetime
show_id: int
show: "PlayoutShow"
timeslot_id: Optional[int]
playlist_id: Optional[int]
timeslot: Optional["TimeSlot"]
episode: Optional["PlayoutEpisode"]
schedule: Optional["PlayoutSchedule"]
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
id = self.id
start = self.start.isoformat()
end = self.end.isoformat()
show_id = self.show_id
show = self.show.to_dict()
timeslot_id = self.timeslot_id
playlist_id = self.playlist_id
timeslot = self.timeslot.to_dict() if self.timeslot else None
episode = self.episode.to_dict() if self.episode else None
schedule = self.schedule.to_dict() if self.schedule else None
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"id": id,
"start": start,
"end": end,
"showId": show_id,
"show": show,
"timeslotId": timeslot_id,
"playlistId": playlist_id,
"timeslot": timeslot,
"episode": episode,
"schedule": schedule,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.playout_episode import PlayoutEpisode
from ..models.playout_schedule import PlayoutSchedule
from ..models.playout_show import PlayoutShow
from ..models.time_slot import TimeSlot
d = src_dict.copy()
id = d.pop("id")
start = isoparse(d.pop("start"))
end = isoparse(d.pop("end"))
show_id = d.pop("showId")
show = PlayoutShow.from_dict(d.pop("show"))
timeslot_id = d.pop("timeslotId")
playlist_id = d.pop("playlistId")
_timeslot = d.pop("timeslot")
timeslot: Optional[TimeSlot]
if _timeslot is None:
timeslot = None
else:
timeslot = TimeSlot.from_dict(_timeslot)
_episode = d.pop("episode")
episode: Optional[PlayoutEpisode]
if _episode is None:
episode = None
else:
episode = PlayoutEpisode.from_dict(_episode)
_schedule = d.pop("schedule")
schedule: Optional[PlayoutSchedule]
if _schedule is None:
schedule = None
else:
schedule = PlayoutSchedule.from_dict(_schedule)
playout_program_entry = cls(
id=id,
start=start,
end=end,
show_id=show_id,
show=show,
timeslot_id=timeslot_id,
playlist_id=playlist_id,
timeslot=timeslot,
episode=episode,
schedule=schedule,
)
playout_program_entry.additional_properties = d
return playout_program_entry
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="PlayoutSchedule")
@attr.s(auto_attribs=True)
class PlayoutSchedule:
"""
Attributes:
id (int):
default_playlist_id (Union[Unset, None, int]): A tank ID in case the timeslot's playlist_id is empty.
"""
id: int
default_playlist_id: Union[Unset, None, int] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
id = self.id
default_playlist_id = self.default_playlist_id
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"id": id,
}
)
if default_playlist_id is not UNSET:
field_dict["defaultPlaylistId"] = default_playlist_id
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
id = d.pop("id")
default_playlist_id = d.pop("defaultPlaylistId", UNSET)
playout_schedule = cls(
id=id,
default_playlist_id=default_playlist_id,
)
playout_schedule.additional_properties = d
return playout_schedule
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="PlayoutShow")
@attr.s(auto_attribs=True)
class PlayoutShow:
"""
Attributes:
id (int):
name (str): Name of this Show.
default_playlist_id (Union[Unset, None, int]):
"""
id: int
name: str
default_playlist_id: Union[Unset, None, int] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
id = self.id
name = self.name
default_playlist_id = self.default_playlist_id
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"id": id,
"name": name,
}
)
if default_playlist_id is not UNSET:
field_dict["defaultPlaylistId"] = default_playlist_id
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
id = d.pop("id")
name = d.pop("name")
default_playlist_id = d.pop("defaultPlaylistId", UNSET)
playout_show = cls(
id=id,
name=name,
default_playlist_id=default_playlist_id,
)
playout_show.additional_properties = d
return playout_show
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
import datetime
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from dateutil.parser import isoparse
from ..types import UNSET, Unset
T = TypeVar("T", bound="TimeSlot")
@attr.s(auto_attribs=True)
class TimeSlot:
"""
Attributes:
end (datetime.datetime):
id (int):
note_id (int):
show_id (int):
start (datetime.datetime):
memo (Union[Unset, str]): Memo for this timeslot.
playlist_id (Union[Unset, None, int]): Playlist ID of this timeslot.
repetition_of_id (Union[Unset, None, int]): This timeslot is a repetition of `Timeslot` ID.
schedule_id (Union[Unset, int]): `Schedule` ID of this timeslot.
"""
end: datetime.datetime
id: int
note_id: int
show_id: int
start: datetime.datetime
memo: Union[Unset, str] = UNSET
playlist_id: Union[Unset, None, int] = UNSET
repetition_of_id: Union[Unset, None, int] = UNSET
schedule_id: Union[Unset, int] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
end = self.end.isoformat()
id = self.id
note_id = self.note_id
show_id = self.show_id
start = self.start.isoformat()
memo = self.memo
playlist_id = self.playlist_id
repetition_of_id = self.repetition_of_id
schedule_id = self.schedule_id
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"end": end,
"id": id,
"noteId": note_id,
"showId": show_id,
"start": start,
}
)
if memo is not UNSET:
field_dict["memo"] = memo
if playlist_id is not UNSET:
field_dict["playlistId"] = playlist_id
if repetition_of_id is not UNSET:
field_dict["repetitionOfId"] = repetition_of_id
if schedule_id is not UNSET:
field_dict["scheduleId"] = schedule_id
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
end = isoparse(d.pop("end"))
id = d.pop("id")
note_id = d.pop("noteId")
show_id = d.pop("showId")
start = isoparse(d.pop("start"))
memo = d.pop("memo", UNSET)
playlist_id = d.pop("playlistId", UNSET)
repetition_of_id = d.pop("repetitionOfId", UNSET)
schedule_id = d.pop("scheduleId", UNSET)
time_slot = cls(
end=end,
id=id,
note_id=note_id,
show_id=show_id,
start=start,
memo=memo,
playlist_id=playlist_id,
repetition_of_id=repetition_of_id,
schedule_id=schedule_id,
)
time_slot.additional_properties = d
return time_slot
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
# Marker file for PEP 561
\ No newline at end of file
""" Contains some shared types for properties """
from http import HTTPStatus
from typing import BinaryIO, Generic, Literal, MutableMapping, Optional, Tuple, TypeVar
import attr
class Unset:
def __bool__(self) -> Literal[False]:
return False
UNSET: Unset = Unset()
FileJsonType = Tuple[Optional[str], BinaryIO, Optional[str]]
@attr.s(auto_attribs=True)
class File:
"""Contains information for file uploads"""
payload: BinaryIO
file_name: Optional[str] = None
mime_type: Optional[str] = None
def to_tuple(self) -> FileJsonType:
"""Return a tuple representation that httpx will accept for multipart/form-data"""
return self.file_name, self.payload, self.mime_type
T = TypeVar("T")
@attr.s(auto_attribs=True)
class Response(Generic[T]):
"""A response from an endpoint"""
status_code: HTTPStatus
content: bytes
headers: MutableMapping[str, str]
parsed: Optional[T]
__all__ = ["File", "Response", "FileJsonType"]
""" Contains all the data models used in inputs/outputs """
from .file import File
from .file_metadata import FileMetadata
from .file_source import FileSource
from .import_ import Import
from .playlist import Playlist
from .playlist_entry import PlaylistEntry
__all__ = (
"File",
"FileMetadata",
"FileSource",
"Import",
"Playlist",
"PlaylistEntry",
)
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.file_metadata import FileMetadata
from ..models.file_source import FileSource
T = TypeVar("T", bound="File")
@attr.s(auto_attribs=True)
class File:
"""
Attributes:
created (Union[Unset, str]):
duration (Union[Unset, float]):
id (Union[Unset, int]):
metadata (Union[Unset, FileMetadata]):
show_name (Union[Unset, str]):
size (Union[Unset, int]):
source (Union[Unset, FileSource]):
updated (Union[Unset, str]):
"""
created: Union[Unset, str] = UNSET
duration: Union[Unset, float] = UNSET
id: Union[Unset, int] = UNSET
metadata: Union[Unset, "FileMetadata"] = UNSET
show_name: Union[Unset, str] = UNSET
size: Union[Unset, int] = UNSET
source: Union[Unset, "FileSource"] = UNSET
updated: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
created = self.created
duration = self.duration
id = self.id
metadata: Union[Unset, Dict[str, Any]] = UNSET
if not isinstance(self.metadata, Unset):
metadata = self.metadata.to_dict()
show_name = self.show_name
size = self.size
source: Union[Unset, Dict[str, Any]] = UNSET
if not isinstance(self.source, Unset):
source = self.source.to_dict()
updated = self.updated
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if created is not UNSET:
field_dict["created"] = created
if duration is not UNSET:
field_dict["duration"] = duration
if id is not UNSET:
field_dict["id"] = id
if metadata is not UNSET:
field_dict["metadata"] = metadata
if show_name is not UNSET:
field_dict["showName"] = show_name
if size is not UNSET:
field_dict["size"] = size
if source is not UNSET:
field_dict["source"] = source
if updated is not UNSET:
field_dict["updated"] = updated
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.file_metadata import FileMetadata
from ..models.file_source import FileSource
d = src_dict.copy()
created = d.pop("created", UNSET)
duration = d.pop("duration", UNSET)
id = d.pop("id", UNSET)
_metadata = d.pop("metadata", UNSET)
metadata: Union[Unset, FileMetadata]
if isinstance(_metadata, Unset):
metadata = UNSET
else:
metadata = FileMetadata.from_dict(_metadata)
show_name = d.pop("showName", UNSET)
size = d.pop("size", UNSET)
_source = d.pop("source", UNSET)
source: Union[Unset, FileSource]
if isinstance(_source, Unset):
source = UNSET
else:
source = FileSource.from_dict(_source)
updated = d.pop("updated", UNSET)
file = cls(
created=created,
duration=duration,
id=id,
metadata=metadata,
show_name=show_name,
size=size,
source=source,
updated=updated,
)
file.additional_properties = d
return file
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="FileMetadata")
@attr.s(auto_attribs=True)
class FileMetadata:
"""
Attributes:
album (Union[Unset, str]):
artist (Union[Unset, str]): actually a full-text index would be nice here...
isrc (Union[Unset, str]):
organization (Union[Unset, str]):
title (Union[Unset, str]):
"""
album: Union[Unset, str] = UNSET
artist: Union[Unset, str] = UNSET
isrc: Union[Unset, str] = UNSET
organization: Union[Unset, str] = UNSET
title: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
album = self.album
artist = self.artist
isrc = self.isrc
organization = self.organization
title = self.title
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if album is not UNSET:
field_dict["album"] = album
if artist is not UNSET:
field_dict["artist"] = artist
if isrc is not UNSET:
field_dict["isrc"] = isrc
if organization is not UNSET:
field_dict["organization"] = organization
if title is not UNSET:
field_dict["title"] = title
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
album = d.pop("album", UNSET)
artist = d.pop("artist", UNSET)
isrc = d.pop("isrc", UNSET)
organization = d.pop("organization", UNSET)
title = d.pop("title", UNSET)
file_metadata = cls(
album=album,
artist=artist,
isrc=isrc,
organization=organization,
title=title,
)
file_metadata.additional_properties = d
return file_metadata
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.import_ import Import
T = TypeVar("T", bound="FileSource")
@attr.s(auto_attribs=True)
class FileSource:
"""
Attributes:
hash_ (Union[Unset, str]):
import_ (Union[Unset, Import]):
uri (Union[Unset, str]):
"""
hash_: Union[Unset, str] = UNSET
import_: Union[Unset, "Import"] = UNSET
uri: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
hash_ = self.hash_
import_: Union[Unset, Dict[str, Any]] = UNSET
if not isinstance(self.import_, Unset):
import_ = self.import_.to_dict()
uri = self.uri
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if hash_ is not UNSET:
field_dict["hash"] = hash_
if import_ is not UNSET:
field_dict["import"] = import_
if uri is not UNSET:
field_dict["uri"] = uri
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.import_ import Import
d = src_dict.copy()
hash_ = d.pop("hash", UNSET)
_import_ = d.pop("import", UNSET)
import_: Union[Unset, Import]
if isinstance(_import_, Unset):
import_ = UNSET
else:
import_ = Import.from_dict(_import_)
uri = d.pop("uri", UNSET)
file_source = cls(
hash_=hash_,
import_=import_,
uri=uri,
)
file_source.additional_properties = d
return file_source
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="Import")
@attr.s(auto_attribs=True)
class Import:
"""
Attributes:
error (Union[Unset, str]):
state (Union[Unset, str]):
"""
error: Union[Unset, str] = UNSET
state: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
error = self.error
state = self.state
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if error is not UNSET:
field_dict["error"] = error
if state is not UNSET:
field_dict["state"] = state
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
error = d.pop("error", UNSET)
state = d.pop("state", UNSET)
import_ = cls(
error=error,
state=state,
)
import_.additional_properties = d
return import_
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.playlist_entry import PlaylistEntry
T = TypeVar("T", bound="Playlist")
@attr.s(auto_attribs=True)
class Playlist:
"""
Attributes:
created (Union[Unset, str]):
description (Union[Unset, str]):
entries (Union[Unset, List['PlaylistEntry']]):
id (Union[Unset, int]):
playout_mode (Union[Unset, str]):
show_name (Union[Unset, str]):
updated (Union[Unset, str]):
"""
created: Union[Unset, str] = UNSET
description: Union[Unset, str] = UNSET
entries: Union[Unset, List["PlaylistEntry"]] = UNSET
id: Union[Unset, int] = UNSET
playout_mode: Union[Unset, str] = UNSET
show_name: Union[Unset, str] = UNSET
updated: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
created = self.created
description = self.description
entries: Union[Unset, List[Dict[str, Any]]] = UNSET
if not isinstance(self.entries, Unset):
entries = []
for entries_item_data in self.entries:
entries_item = entries_item_data.to_dict()
entries.append(entries_item)
id = self.id
playout_mode = self.playout_mode
show_name = self.show_name
updated = self.updated
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if created is not UNSET:
field_dict["created"] = created
if description is not UNSET:
field_dict["description"] = description
if entries is not UNSET:
field_dict["entries"] = entries
if id is not UNSET:
field_dict["id"] = id
if playout_mode is not UNSET:
field_dict["playoutMode"] = playout_mode
if show_name is not UNSET:
field_dict["showName"] = show_name
if updated is not UNSET:
field_dict["updated"] = updated
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.playlist_entry import PlaylistEntry
d = src_dict.copy()
created = d.pop("created", UNSET)
description = d.pop("description", UNSET)
entries = []
_entries = d.pop("entries", UNSET)
for entries_item_data in _entries or []:
entries_item = PlaylistEntry.from_dict(entries_item_data)
entries.append(entries_item)
id = d.pop("id", UNSET)
playout_mode = d.pop("playoutMode", UNSET)
show_name = d.pop("showName", UNSET)
updated = d.pop("updated", UNSET)
playlist = cls(
created=created,
description=description,
entries=entries,
id=id,
playout_mode=playout_mode,
show_name=show_name,
updated=updated,
)
playlist.additional_properties = d
return playlist
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.file import File
T = TypeVar("T", bound="PlaylistEntry")
@attr.s(auto_attribs=True)
class PlaylistEntry:
"""
Attributes:
duration (Union[Unset, float]):
file (Union[Unset, File]):
uri (Union[Unset, str]):
"""
duration: Union[Unset, float] = UNSET
file: Union[Unset, "File"] = UNSET
uri: Union[Unset, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
duration = self.duration
file: Union[Unset, Dict[str, Any]] = UNSET
if not isinstance(self.file, Unset):
file = self.file.to_dict()
uri = self.uri
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if duration is not UNSET:
field_dict["duration"] = duration
if file is not UNSET:
field_dict["file"] = file
if uri is not UNSET:
field_dict["uri"] = uri
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.file import File
d = src_dict.copy()
duration = d.pop("duration", UNSET)
_file = d.pop("file", UNSET)
file: Union[Unset, File]
if isinstance(_file, Unset):
file = UNSET
else:
file = File.from_dict(_file)
uri = d.pop("uri", UNSET)
playlist_entry = cls(
duration=duration,
file=file,
uri=uri,
)
playlist_entry.additional_properties = d
return playlist_entry
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties
# Marker file for PEP 561
\ No newline at end of file
""" Contains some shared types for properties """
from http import HTTPStatus
from typing import BinaryIO, Generic, Literal, MutableMapping, Optional, Tuple, TypeVar
import attr
class Unset:
def __bool__(self) -> Literal[False]:
return False
UNSET: Unset = Unset()
FileJsonType = Tuple[Optional[str], BinaryIO, Optional[str]]
@attr.s(auto_attribs=True)
class File:
"""Contains information for file uploads"""
payload: BinaryIO
file_name: Optional[str] = None
mime_type: Optional[str] = None
def to_tuple(self) -> FileJsonType:
"""Return a tuple representation that httpx will accept for multipart/form-data"""
return self.file_name, self.payload, self.mime_type
T = TypeVar("T")
@attr.s(auto_attribs=True)
class Response(Generic[T]):
"""A response from an endpoint"""
status_code: HTTPStatus
content: bytes
headers: MutableMapping[str, str]
parsed: Optional[T]
__all__ = ["File", "Response", "FileJsonType"]