diff --git a/aura.py b/aura.py index 5294a8f26d6b2849128612edae20116309df73c7..addb0edf8d2f1b2820fd6e3a714d6ccfa3f759f8 100755 --- a/aura.py +++ b/aura.py @@ -85,15 +85,11 @@ class Aura: # Check if the database has to be re-created if self.config.get("recreate_db") is not None: - AuraScheduler(self.config) + AuraScheduler(self.config, None) # Create scheduler and Liquidsoap communicator self.liquidsoapcommunicator = LiquidSoapCommunicator(self.config) - self.scheduler = AuraScheduler(self.config) - - # Give both a reference of each other - self.liquidsoapcommunicator.scheduler = self.scheduler - self.scheduler.liquidsoapcommunicator = self.liquidsoapcommunicator + self.scheduler = AuraScheduler(self.config, self.liquidsoapcommunicator) # Create the Redis adapter self.messenger = ServerRedisAdapter(self.config) @@ -107,7 +103,6 @@ class Aura: # And finally wait for redis message / start listener thread self.messenger.start() - self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__)) # diff --git a/configuration/sample.engine.ini b/configuration/sample.engine.ini index 979a5d6393d3c2bda2b98d5b26d4b29bf18c62ea..265e1eb10cf97a21b0f1274c6485d421c0e22e2d 100644 --- a/configuration/sample.engine.ini +++ b/configuration/sample.engine.ini @@ -75,6 +75,9 @@ fade_out_time="2.5" # all these settings from here to the bottom require a restart of the liquidsoap server +# Liquidsoap execution delay; Crucial to keep things in sync +lqs_delay_offset=1 + [user] # the user and group under which this software will run daemongroup="david" diff --git a/guru.py b/guru.py index 32de082f356a0a301e3fd75ea115961c0736c7cd..e0130a90631de39d53ca225a232957ba367c044c 100755 --- a/guru.py +++ b/guru.py @@ -133,8 +133,9 @@ class Guru(): self.parser.add_argument("-gnf", "--get-next-file-for", action="store", dest="get_file_for", default=False, metavar="PLAYLISTTYPE", help="For which type you wanna GET a next audio file?") self.parser.add_argument("-snf", "--set-next-file-for", action="store", dest="set_file_for", default=False, metavar=("PLAYLISTTYPE", "FILE"), nargs=2, help="For which type you wanna SET a next audio file?") self.parser.add_argument("-np", "--now-playing", action="store_true", dest="now_playing", default=False, help="Which source is now playing") + self.parser.add_argument("-ip", "--init-player", action="store_true", dest="init_player", default=False, help="Reset liquidsoap volume and mixer activations?") - self.parser.add_argument("-ts", "--adapt-trackservice-title", action="store", dest="adapt_trackservice_title", default=False, metavar="INFO", help="Update trackservice entry due to fallback") + self.parser.add_argument("-ts", "--on_play", action="store", dest="on_play", default=False, metavar="INFO", help="Event handling when some entry started playing") if len(sys.argv) == 1: raise ValueError("No Argument passed!") diff --git a/libraries/database/broadcasts.py b/libraries/database/broadcasts.py index b110830906ba2e20862352efb62a0ace2f488a2b..2c2baf61f0a5b4089109640ba6a40f827c4154b1 100644 --- a/libraries/database/broadcasts.py +++ b/libraries/database/broadcasts.py @@ -377,20 +377,6 @@ class Playlist(DB.Model, AuraDatabaseModel): return total - @hybrid_property - def current_entry(self): - """ - Retrieves the entry to be played at the very, current point in time. - """ - now_unix = SimpleUtil.timestamp() - - for entry in self.entries: - if entry.start_unix < now_unix < entry.end_unix: - return entry - - return None - - def as_dict(self): """ Returns the playlist as a dictionary for serialization. @@ -435,6 +421,9 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel): duration = Column(BigInteger) filename = Column(String(1024)) entry_start = Column(DateTime) + queue_state = None # Assigned when entry is about to be queued + channel = None # Assigned when entry is actually played + # relationships playlist = relationship("Playlist", uselist=False, back_populates="entries") @@ -454,26 +443,16 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel): @hybrid_property def end_unix(self): - return time.mktime(self.entry_start.timetuple()) + self.duration + return time.mktime(self.entry_end.timetuple()) @hybrid_property def volume(self): - return 100 + return 100 # FIXME Make DB Column @hybrid_property def type(self): return EngineUtil.get_channel_type(self.uri) - @hybrid_property - def channel(self): - type = EngineUtil.get_channel_type(self.uri) - if type == ChannelType.FILESYSTEM: - return Channel.FILESYSTEM_A - elif type == ChannelType.STREAM: - return Channel.STREAM_A - else: - return "foo:bar" - #FIXME Extend & finalize!! def get_prev_entries(self): """ @@ -489,17 +468,25 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel): return prev_entries - def get_next_entries(self): + def get_next_entries(self, schedule_sensitive=True): """ Retrieves all following entries as part of the current entry's playlist. + Args: + schedule_sensitive (Boolean): If `True` entries which start after \ + the end of the schedule are excluded + Returns: (List): List of PlaylistEntry """ next_entries = [] for entry in self.playlist.entries: if entry.entry_start > self.entry_start: - next_entries.append(entry) + if schedule_sensitive: + if entry.entry_start < self.playlist.schedule.schedule_end: + next_entries.append(entry) + else: + next_entries.append(entry) return next_entries @@ -523,8 +510,8 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel): """ time_start = SimpleUtil.fmt_time(self.start_unix) time_end = SimpleUtil.fmt_time(self.end_unix) - track = self.filename[-15:] - return "PlaylistEntry ID#%s [%s - %s | %ssec | Track: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track) + track = self.filename[-25:] + return "PlaylistEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.artificial_id), time_start, time_end, self.duration, track) @@ -606,7 +593,7 @@ class TrackService(DB.Model, AuraDatabaseModel): - Scenario 1: No fallback, all info is gathered via the playlist entry - Scenario 2: Fallback-type > 0, info is also gathered via the defined playlist entry - - Scenario 3: This type of fallback didn't get scheduled; a local audio-file is played + - Scenario 3: This type of fallback didn't get scheduled; a single entry is played """ if self.playlist_entry: return self.playlist_entry.as_dict() @@ -784,8 +771,8 @@ class SingleEntry(DB.Model, AuraDatabaseModel): """ time_start = SimpleUtil.fmt_time(self.start_unix) time_end = SimpleUtil.fmt_time(self.end_unix) - track = self.filename[-15:] - return "SingleEntry ID#%s [%s - %s | %ssec | Track: ...%s]" % (str(self.id), time_start, time_end, self.duration, track) + track = self.filename[-25:] + return "SingleEntry #%s [%s - %s | %ssec | Track: ...%s]" % (str(self.id), time_start, time_end, self.duration, track) diff --git a/meta.py b/meta.py index c6346325522210536d5904e87bdfcc5685795f46..04c630b7e506da494a9447aad2222ba1ce6508ed 100644 --- a/meta.py +++ b/meta.py @@ -1,5 +1,10 @@ # Meta -__version__ = '0.6.1' +__author__ = "David Trattnig and Gottfried Gaisbauer" +__copyright__ = "Copyright 2017-2020, Aura Engine Team" +__credits__ = ["David Trattnig", "Gottfried Gaisbauer", "Michael Liebler"] __license__ = "GNU Affero General Public License (AGPL) Version 3" +__version__ = "0.6.1" __version_info__ = (0, 6, 1) -__author__ = 'David Trattnig <david.trattnig@subsquare.at>' \ No newline at end of file +__maintainer__ = "David Trattnig" +__email__ = "david.trattnig@subsquare.at" +__status__ = "Development" \ No newline at end of file diff --git a/modules/base/enum.py b/modules/base/enum.py index d31a4a03317502d5b3612e830f1db95d8f4720af..1d4fe25d04eb2155fbeaaea893add2cc22fd1b21 100644 --- a/modules/base/enum.py +++ b/modules/base/enum.py @@ -112,3 +112,8 @@ class TimerType(Enum): FADEIN = "fadein" FADEOUT = "fadeout" + +class EntryQueueState(Enum): + OKAY = "ok" + CUT = "cut" + OUT_OF_SCHEDULE = "oos" diff --git a/modules/base/exceptions.py b/modules/base/exceptions.py index 3967279a61afc390f9501adbceb7706e1a2a26c9..5bc15f1b65027de27539d043d3cfa04cab5a06c5 100644 --- a/modules/base/exceptions.py +++ b/modules/base/exceptions.py @@ -29,6 +29,8 @@ class NoProgrammeLoadedException(Exception): pass +class NoActiveScheduleException(Exception): + pass # Mixer Exceptions @@ -39,6 +41,8 @@ class InvalidChannelException(Exception): class PlaylistException(Exception): pass +class NoActiveEntryException(Exception): + pass # Monitoring Exceptions diff --git a/modules/cli_tool/padavan.py b/modules/cli_tool/padavan.py index 04296f1daf9ef4a5e949ca23f5302e9bd0ee4abc..93e1d32f361fb943ed89cca2d0868e616b6e4ea8 100644 --- a/modules/cli_tool/padavan.py +++ b/modules/cli_tool/padavan.py @@ -92,8 +92,8 @@ class Padavan: elif self.args.init_player: self.init_player() - elif self.args.adapt_trackservice_title: - self.adapt_trackservice_title(self.args.adapt_trackservice_title) + elif self.args.on_play: + self.on_play(self.args.on_play) elif self.args.recreatedb: self.recreatedb() @@ -217,12 +217,12 @@ class Padavan: - def adapt_trackservice_title(self, info): + def on_play(self, info): """ - Updates the tracks-service with the info on currently played fallback track. + Event handler to be called when some entry started playing. """ - self.stringreply = self.send_and_wait_redis("aura", "adapt_trackservice_title " + info, RedisChannel.GNF_REPLY) + self.stringreply = self.send_and_wait_redis("aura", "on_play " + info, RedisChannel.GNF_REPLY) # ------------------------------------------------------------------------------------------ # def recreatedb(self): diff --git a/modules/communication/liquidsoap/communicator.py b/modules/communication/liquidsoap/communicator.py index faef1600b39a8b51bfa5cad511fab77d0b8477fb..eccf3f8458ab209134c373dc3f661c470fba6ed7 100644 --- a/modules/communication/liquidsoap/communicator.py +++ b/modules/communication/liquidsoap/communicator.py @@ -28,11 +28,13 @@ import json from modules.communication.liquidsoap.playerclient import LiquidSoapPlayerClient # from modules.communication.liquidsoap.recorderclient import LiquidSoapRecorderClient -from modules.communication.liquidsoap.initthread import LiquidSoapInitThread +from modules.core.startup import StartupThread +from modules.core.state import PlayerStateService from modules.communication.mail import AuraMailer -from libraries.enum.auraenumerations import TerminalColors, ScheduleEntryType -from modules.base.exceptions import LQConnectionError +from modules.base.enum import ChannelType, Channel, TransitionType +from modules.base.utils import TerminalColors, SimpleUtil +from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException from libraries.exceptions.exception_logger import ExceptionLogger @@ -51,12 +53,17 @@ class LiquidSoapCommunicator(ExceptionLogger): auramailer = None is_liquidsoap_running = False connection_attempts = 0 - active_channel = None disable_logging = False fade_in_active = False fade_out_active = False - # ------------------------------------------------------------------------------------------ # + # Active Channel & Entry Handling + active_channel_type = None + active_channel = None + player_state = None + # active_entries = None + + def __init__(self, config): """ Initializes the communicator by establishing a Socket connection @@ -80,144 +87,309 @@ class LiquidSoapCommunicator(ExceptionLogger): self.auramailer = AuraMailer(self.config) self.is_liquidsoap_up_and_running() + # Initialize Default Channels + self.active_channel = { + ChannelType.FILESYSTEM: Channel.FILESYSTEM_A, + ChannelType.STREAM: Channel.STREAM_A, + ChannelType.LIVE: Channel.LIVE_0 + } + # self.active_entries = {} + self.player_state = PlayerStateService(config) + + + + def start(self): + """ + Starts the soundsystem. + """ + # Sleep needed, because the socket is created too slowly by Liquidsoap + time.sleep(1) + self.enable_transaction() + time.sleep(1) + + self.mixer_start() + + # Setting init params like a blank file + install_dir = self.config.get("install_dir") + channel = self.active_channel[ChannelType.FILESYSTEM] + self.playlist_push(channel, install_dir + "/configuration/blank.flac") + + self.disable_transaction() + self.is_liquidsoap_running = True + self.logger.info(SimpleUtil.green("Engine Core ------[ connected ]-------- Liquidsoap")) + + + def is_ready(self): + """ + Returns `True` if the soundsystem is ready to be used. + """ + return self.is_liquidsoap_running + # - # CHANNELS - # + # MIXER : GENERAL + # + + + def mixer_start(self): + # Reset channels and reload them + channels = self.reload_channels() + + # For all available channels + for c in channels: + # Set volume to zero + self.channel_volume(c, "0") + # And activate this channel + self.channel_activate(c, True) # ------------------------------------------------------------------------------------------ # - def get_active_channel(self): - """ - Retrieves the active channel from programme. + # def set_volume(self, mixernumber, volume): + # #return self.client.command("mixer", 'volume', mixernumber, str(volume)) + # return self.__send_lqc_command__(self.client, "mixer", "volume", mixernumber, volume) - Returns: - (String): The channel type, empty string if no channel is active. + # ------------------------------------------------------------------------------------------ # + def get_active_mixer(self): + """ + get active mixer in liquidsoap server + :return: """ - active_entry = self.scheduler.get_active_entry() - if active_entry is None: - return "" - return active_entry.type + activeinputs = [] + + # enable more control over the connection + self.enable_transaction() + + inputs = self.get_all_channels() + + cnt = 0 + for input in inputs: + status = self.__get_mixer_status__(cnt) + + if "selected=true" in status: + activeinputs.append(input) + + cnt = cnt + 1 + + self.disable_transaction() + + return activeinputs # ------------------------------------------------------------------------------------------ # - def http_start_stop(self, start): - if start: - cmd = "start" - else: - cmd = "stop" + def get_mixer_status(self): + inputstate = {} - try: - self.enable_transaction() - self.__send_lqc_command__(self.client, "http", cmd) - self.disable_transaction() - except LQConnectionError: - # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further - pass + self.enable_transaction() + + inputs = self.get_all_channels() + + cnt = 0 + for input in inputs: + inputstate[input] = self.__get_mixer_status__(cnt) + cnt = cnt + 1 + + self.disable_transaction() + + return inputstate + + + # ------------------------------------------------------------------------------------------ # + def get_mixer_volume(self, channel): + return False + + # ------------------------------------------------------------------------------------------ # + def __get_mixer_status__(self, mixernumber): + return self.__send_lqc_command__(self.client, "mixer", "status", mixernumber) + + + # + # MIXER : CHANNELS + # + + + # FIXME Currently not used, except for test class + # def get_active_channel(self): + # """ + # Retrieves the active channel from programme. + + # Returns: + # (String): The channel type, empty string if no channel is active. + # """ + # active_entry = self.scheduler.get_active_entry() + # if active_entry is None: + # return "" + # return active_entry.channel # ------------------------------------------------------------------------------------------ # - def activate(self, new_entry, cue_in=0.0): + + + + def play(self, entry, transition): """ - Activates a new Playlist Entry. + Plays a new `Entry`. In case of a new schedule (or some intented, immediate transition), + a clean channel is selected and transitions between old and new channel is performed. Args: - new_entry (PlaylistEntry): The track to be played - cue_in (Float): Start/cue-time of track (For some reason Liquidsoap doesn't acknowledge this yet) + entry (PlaylistEntry): The audio source to be played + transition (TransitionType): The type of transition to use e.g. fade-out. + queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so; + otherwise a new channel of the same type is activated Raises: (LQConnectionError): In case connecting to LiquidSoap isn't possible """ - - # Grab the actual active entry - active_entry = self.scheduler.get_active_entry() - # Set default channel, if no previous track is available - current_channel = ScheduleEntryType.FILESYSTEM - if active_entry: - current_channel = active_entry.type - try: self.enable_transaction() - if current_channel == new_entry.type: - # TODO Add logic, if some track on the same channel isn't finished yet, - # it should be transitioned using a second filesystem channel. - # - self.activate_same_channel(new_entry, cue_in) + # channel = self.active_channel[entry.type] + # prev_channel = channel + # already_active = False + + #FIXME + # queue=False + + # if self.active_channel_type == entry.type: + # msg = SimpleUtil.pink("Channel type %s already active!" % str(entry.type)) + # self.logger.info(msg) + # already_active = True + + self.player_state.set_active_entry(entry) + + entry.channel = self.channel_swap(entry.type) + # entry.channel = channel + + # PLAYLIST + if entry.type == ChannelType.FILESYSTEM: + # if not queue: + + self.playlist_push(entry.channel, entry.filename) + + # STREAM + elif entry.type == ChannelType.STREAM: + self.set_http_url(entry.channel, entry.source) + self.http_start_stop(entry.channel, True) + + # LIVE else: - self.activate_different_channel(new_entry, cue_in, current_channel) - self.disable_transaction() + # TODO Select correct LINE-OUT channels as per entry + pass + + # if not already_active: + # self.channel_transition(prev_channel, channel, entry.volume, 0) + + # Assign selected channel + + + # Move channel volume all the way up + if transition == TransitionType.FADE: + self.fade_in(entry) + else: + self.channel_volume(entry.channel, entry.volume) + + # Update active channel and type + #self.active_channel_type = entry.type + self.active_channel[entry.type] = entry.channel + - self.scheduler.update_track_service(new_entry) + self.disable_transaction() + except LQConnectionError: # we already caught and handled this error in __send_lqc_command__, # but we do not want to execute this function further and pass the exception pass - def activate_same_channel(self, entry, cue_in=0.0, activate_different_channel=False): + + def on_play(self, source): + """ + Event Handler which is called by soundsystem implementation (i.e. Liquidsoap) + when some entry is actually playing. + """ + self.logger.info(SimpleUtil.pink("Source '%s' started playing" % source)) + + try: + self.player_state.store_trackservice_entry(source) + except NoActiveEntryException: + self.logger.warn(SimpleUtil.red("Currently there's nothing playing!")) + + + + def stop(self, entry, transition): """ - Activates a playlist entry for the current channel. + Stops the currently playing entry. Args: - entry (PlaylistEntry): The entry to play. - cue_in (Float): A value in seconds where to cue the start of the entry. + entry (Entry): The entry to stop playing + transition (TransitionType): The type of transition to use e.g. fade-out. """ - if not activate_different_channel: - self.logger.info(TerminalColors.PINK.value + entry.type.value + " already active!" + TerminalColors.ENDC.value) - # Check if it needs to be pushed to a filesystem queue or stream - if entry.type == ScheduleEntryType.FILESYSTEM: - uri = entry.filename - if cue_in > 0.0: - uri = "annotate:liq_cue_in=\"%s\":%s" % (str(cue_in), entry.filename) - self.playlist_push(uri) - self.active_channel = entry.type + try: + self.enable_transaction() - elif entry.type == ScheduleEntryType.STREAM: - self.set_http_url(entry.source) - self.http_start_stop(True) - self.active_channel = entry.type + if not entry.channel: + self.logger.warn("Trying to stop entry %s, but it has no channel assigned" % entry) + return + + if transition == TransitionType.FADE: + self.fade_out(entry) + else: + self.channel_volume(entry.channel, 0) - # else: # live - # Nothing to do when we are live => just leave it as is + # self.playlist_clear(entry.channel) + self.logger.info(SimpleUtil.pink("Stopped channel '%s' for entry %s" % (entry.channel, entry))) - self.active_channel = entry.type + self.disable_transaction() - # Set active channel to wanted volume - if not activate_different_channel: - self.channel_volume(entry.type.value, entry.volume) + except LQConnectionError: + # we already caught and handled this error in __send_lqc_command__, + # but we do not want to execute this function further and pass the exception + pass - def activate_different_channel(self, entry, cue_in, active_type): - """ - Activates a playlist entry for a channel other then the currently active one. + # def channel_transition(self, source_channel, target_channel, target_volume=100, transition_type=0): + + # # Default: target_channel = 100% volume, source_channel = 0% volume + # if transition_type == 0: - Args: - entry (PlaylistEntry): The entry to play. - cue_in (Float): A value in seconds where to cue the start of the entry. - active_type (ScheduleEntryType): The type of the currently active channel - """ - self.logger.info(TerminalColors.PINK.value + "LiquidSoapCommunicator is activating " + entry.type.value + " & deactivating " + active_type.value + "!" + TerminalColors.ENDC.value) + # # Set volume of channel + # self.channel_volume(target_channel, target_volume) - # Reuse of this function, because activate_same_channel and activate_different_channel - # are doing pretty the same except setting of the volume to zero - self.activate_same_channel(entry, cue_in, True) + # # Mute source channel + # if target_channel != source_channel: + # self.channel_volume(source_channel, 0) - # Set other channels to zero volume - others = self.all_inputs_but(entry.getChannel()) - for o in others: - self.channel_volume(o, 0) + # # Set other channels to zero volume + # # others = self.all_inputs_but(target_channel) + # # self.logger.info("Setting Volume=0 for channels: %s" % str(others)) + # # for o in others: + # # self.channel_volume(o, 0) - # Set active channel to wanted volume - self.channel_volume(entry.type.value, entry.volume) + def channel_swap(self, channel_type): + active_channel = self.active_channel[channel_type] + channel = None + msg = None + + if channel_type == ChannelType.FILESYSTEM: + if active_channel == Channel.FILESYSTEM_A: + channel = Channel.FILESYSTEM_B + msg = "Swapped filesystem channel from A > B" + else: + channel = Channel.FILESYSTEM_A + msg = "Swapped filesystem channel from B > A" + + elif channel_type == ChannelType.STREAM: + if active_channel == Channel.STREAM_A: + channel = Channel.STREAM_B + msg = "Swapped stream channel from A > B" + else: + channel = Channel.STREAM_A + msg = "Swapped stream channel from B > A" + + if msg: self.logger.info(SimpleUtil.pink(msg)) + # self.active_channel[channel_type] = channel + return channel - def clear_queue(self): - """ - Removes all tracks currently queued. - """ - self.logger.info(TerminalColors.PINK.value + "LiquidSoapCommunicator is clearing the filesystem queue!" + TerminalColors.ENDC.value) - return self.__send_lqc_command__(self.client, "fs", "clear", ) # ------------------------------------------------------------------------------------------ # @@ -263,248 +435,274 @@ class LiquidSoapCommunicator(ExceptionLogger): # ------------------------------------------------------------------------------------------ # def channel_volume(self, channel, volume): """ - set volume of a channel - @type channel: string - @param channel: Channel - @type volume: int - @param volume: Volume between 0 and 100 + Set volume of a channel + + Args: + channel (Channel): The channel + volume (Integer) Volume between 0 and 100 """ + channel = str(channel) try: - channels = self.get_all_channels() - index = channels.index(channel) + if str(volume) == "100": + channels = self.get_all_channels() + index = channels.index(channel) + else: + channels = self.get_all_channels() + index = channels.index(channel) except ValueError as e: - self.logger.error("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e)) + msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "!. Reason: " + str(e)) + self.logger.error(msg) return try: if len(channel) < 1: - self.logger.warning("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") + msg = SimpleUtil.red("Cannot set volume of channel " + channel + " to " + str(volume) + "! There are no channels.") + self.logger.warning(msg) else: message = self.__send_lqc_command__(self.client, "mixer", "volume", str(index), str(int(volume))) if not self.disable_logging: if message.find('volume=' + str(volume) + '%'): - self.logger.debug("Set volume of channel " + channel + " to " + str(volume)) + self.logger.info(SimpleUtil.pink("Set volume of channel '%s' to %s" % (channel, str(volume)))) else: - self.logger.warning("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message) + msg = SimpleUtil.red("Setting volume of channel " + channel + " gone wrong! Liquidsoap message: " + message) + self.logger.warning(msg) return message except AttributeError as e: #(LQConnectionError, AttributeError): self.disable_transaction(force=True) - self.logger.error("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) - - - - # ------------------------------------------------------------------------------------------ # - def set_http_url(self, uri): - return self.__send_lqc_command__(self.client, "http", "url", uri) + msg = SimpleUtil.red("Ran into exception when setting volume of channel " + channel + ". Reason: " + str(e)) + self.logger.error(msg) # - # Playlist + # Channel Type - Stream # - def activate_playlist(self, playlist, cue_in=0.0): - """ - Activates a new Playlist. + def stream_start(self, url): + try: + self.enable_transaction() + self.__send_lqc_command__(self.client, "http", "url", url) + self.__send_lqc_command__(self.client, "http", "start") + self.disable_transaction() + except LQConnectionError: + # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further + pass - Args: - new_entry (Playlist): The playlist to be played - cue_in (Float): Start/cue-time of track (For some reason Liquidsoap doesn't acknowledge this yet) - - Raises: - (LQConnectionError): In case connecting to LiquidSoap isn't possible - """ - - # Grab the actual active entry - active_entry = self.scheduler.get_active_entry() - # Set default channel, if no previous track is available - current_channel = ScheduleEntryType.FILESYSTEM - if active_entry: - current_channel = active_entry.type + def stream_stop(self, url): try: - # FIXME clearing creates some serious timing issues - # To activate this feature we'd need some more sophisticated - # Liquidsoap logic, such as >= 2 filesystem channels and - # possiblities to pause pre-queued channels or cleaning them - # after each completed schedule. + self.enable_transaction() + self.__send_lqc_command__(self.client, "http", "start") + self.disable_transaction() + except LQConnectionError: + # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further + pass - # self.enable_transaction() - # #if active_entry: - # #self.fade_out(active_entry) - # res = self.clear_queue() - # self.logger.info("Clear Queue Response: "+res) - # self.disable_transaction() + def http_start_stop(self, start): + if start: + cmd = "start" + else: + cmd = "stop" + try: self.enable_transaction() - self.reload_channels() - # self.fade_in(playlist.entries[0]) - for new_entry in playlist.entries: - if current_channel == new_entry.type: - self.activate_same_channel(new_entry, cue_in) - else: - self.activate_different_channel(new_entry, cue_in, current_channel) - current_channel = new_entry.type - + self.__send_lqc_command__(self.client, "http", cmd) self.disable_transaction() - - self.logger.critical("FIXME: Implement TrackService") - #self.scheduler.update_track_service(new_entry) except LQConnectionError: - # we already caught and handled this error in __send_lqc_command__, - # but we do not want to execute this function further and pass the exception + # we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further pass # ------------------------------------------------------------------------------------------ # - def playlist_push(self, uri): - """ - Adds an filesystem URI to the playlist + def set_http_url(self, uri): + return self.__send_lqc_command__(self.client, "http", "url", uri) - Args: - uri (String): The URI of the file - Returns: - LiquidSoap Response - """ - return self.__send_lqc_command__(self.client, "fs", "push", uri) - # ------------------------------------------------------------------------------------------ # - def playlist_seek(self, seconds_to_seek): - """ - Forwards the player (n) seconds. + # + # Channel Type - Playlist + # - Args: - seconds_to_seeks (Float): The seconds to skip - """ - return self.__send_lqc_command__(self.client, "fs", "seek", str(seconds_to_seek)) + # FIXME + # def playlist_activate(self, playlist, cue_in=0.0): + # """ + # Activates a new Playlist. + # Args: + # new_entry (Playlist): The playlist to be played + # cue_in (Float): Start/cue-time of track (For some reason Liquidsoap doesn't acknowledge this yet) + + # Raises: + # (LQConnectionError): In case connecting to LiquidSoap isn't possible + # """ + + # # Grab the actual active entry + # # active_entry = self.scheduler.get_active_entry() - # - # Mixer - # + # # Set default channel, if no previous track is available + # current_channel = self.active_channel[ChannelType.FILESYSTEM] + # # if active_entry: + # # current_channel = active_entry.channel - # ------------------------------------------------------------------------------------------ # - def set_volume(self, mixernumber, volume): - return self.__send_lqc_command__(self.client, "mixer", "volume", mixernumber, volume) + # try: + # # FIXME clearing creates some serious timing issues + # # To activate this feature we'd need some more sophisticated + # # Liquidsoap logic, such as >= 2 filesystem channels and + # # possiblities to pause pre-queued channels or cleaning them + # # after each completed schedule. - # ------------------------------------------------------------------------------------------ # - def get_active_mixer(self): - """ - get active mixer in liquidsoap server - :return: - """ - activeinputs = [] + # # self.enable_transaction() + # # #if active_entry: + # # #self.fade_out(active_entry) + # # res = self.playlist_clear(current_channel) + # # self.logger.info("Clear Queue Response: "+res) + # # self.disable_transaction() - # enable more control over the connection - self.enable_transaction() - inputs = self.get_all_channels() + # self.enable_transaction() + # self.reload_channels() + # # self.fade_in(playlist.entries[0]) + # # FIXME rework + # for new_entry in playlist.entries: + # if current_channel == new_entry.channel: + # self.activate_same_channel(new_entry, cue_in) + # else: + # self.activate_different_channel(new_entry, cue_in, current_channel) + # current_channel = new_entry.channel - cnt = 0 - for input in inputs: - status = self.__get_mixer_status__(cnt) + # self.disable_transaction() - if "selected=true" in status: - activeinputs.append(input) + # # self.logger.critical("FIXME: Implement TrackService") + # #self.scheduler.update_track_service(new_entry) + # except LQConnectionError: + # # we already caught and handled this error in __send_lqc_command__, + # # but we do not want to execute this function further and pass the exception + # pass - cnt = cnt + 1 - self.disable_transaction() - return activeinputs + def playlist_push(self, channel, uri): + """ + Adds an filesystem URI to the given `ChannelType.FILESYSTEM` channel. - # ------------------------------------------------------------------------------------------ # - def get_mixer_status(self): - inputstate = {} + Args: + uri (String): The URI of the file + Returns: + LiquidSoap Response + """ + if channel not in ChannelType.FILESYSTEM.channels: + raise InvalidChannelException + self.logger.info(SimpleUtil.pink("playlist.push('%s', '%s'" % (channel, uri))) + return self.__send_lqc_command__(self.client, channel, "playlist_push", uri) - self.enable_transaction() - inputs = self.get_all_channels() - cnt = 0 - for input in inputs: - inputstate[input] = self.__get_mixer_status__(cnt) - cnt = cnt + 1 + def playlist_seek(self, channel, seconds_to_seek): + """ + Forwards the player of the given `ChannelType.FILESYSTEM` channel by (n) seconds. - self.disable_transaction() + Args: + seconds_to_seeks (Float): The seconds to skip + """ + if channel not in ChannelType.FILESYSTEM.channels: + raise InvalidChannelException + return self.__send_lqc_command__(self.client, channel, "playlist_seek", str(seconds_to_seek)) - return inputstate - # ------------------------------------------------------------------------------------------ # - def get_mixer_volume(self, channel): - return False + def playlist_clear(self, channel): + """ + Removes all tracks currently queued in the given `ChannelType.FILESYSTEM` channel. + """ + if channel not in ChannelType.FILESYSTEM.channels: + raise InvalidChannelException + + self.logger.info(SimpleUtil.pink("Clearing filesystem queue '%s'!" % channel)) + return self.__send_lqc_command__(self.client, channel, "playlist_clear") + - # ------------------------------------------------------------------------------------------ # - def __get_mixer_status__(self, mixernumber): - return self.__send_lqc_command__(self.client, "mixer", "status", mixernumber) # # Fading # - # ------------------------------------------------------------------------------------------ # - def fade_in(self, new_entry): + + def fade_in(self, entry): + """ + Performs a fade-in for the given `entry` to the `entry.volume` loudness + at channel `entry.channel`. + """ try: fade_in_time = float(self.config.get("fade_in_time")) if fade_in_time > 0: self.fade_in_active = True - target_volume = new_entry.volume + target_volume = entry.volume step = fade_in_time / target_volume - self.logger.info("Starting to fading " + new_entry.type.value + " in. step is " + str(step) + "s. target volume is " + str(target_volume)) + msg = "Starting to fading-in '%s'. Step is %ss and target volume is %s." % \ + (entry.channel, str(step), str(target_volume)) + self.logger.info(SimpleUtil.pink(msg)) + # Enable logging, which might have been disabled in a previous fade-out self.disable_logging = True self.client.disable_logging = True for i in range(target_volume): - self.channel_volume(new_entry.type.value, i + 1) + self.channel_volume(entry.channel.value, i + 1) time.sleep(step) - self.logger.info("Finished with fading " + new_entry.type.value + " in.") + msg = "Finished with fading-in '%s'." % entry.channel + self.logger.info(SimpleUtil.pink(msg)) self.fade_in_active = False if not self.fade_out_active: self.disable_logging = False self.client.disable_logging = False + except LQConnectionError as e: self.logger.critical(str(e)) return True - # ------------------------------------------------------------------------------------------ # - def fade_out(self, old_entry): + + + def fade_out(self, entry): + """ + Performs a fade-out for the given `entry` at channel `entry.channel`. + """ try: fade_out_time = float(self.config.get("fade_out_time")) if fade_out_time > 0: - step = abs(fade_out_time) / old_entry.volume + step = abs(fade_out_time) / entry.volume - self.logger.info("Starting to fading " + old_entry.type.value + " out. step is " + str(step) + "s") + msg = "Starting to fading-out '%s'. Step is %ss." % (entry.channel, str(step)) + self.logger.info(SimpleUtil.pink(msg)) - # disable logging... it is going to be enabled again after fadein and -out is finished + # Disable logging... it is going to be enabled again after fadein and -out is finished self.disable_logging = True self.client.disable_logging = True - for i in range(old_entry.volume): - self.channel_volume(old_entry.type.value, old_entry.volume-i-1) + for i in range(entry.volume): + self.channel_volume(entry.channel.value, entry.volume-i-1) time.sleep(step) - self.logger.info("Finished with fading " + old_entry.type.value + " out.") + msg = "Finished with fading-out '%s'" % entry.channel + self.logger.info(SimpleUtil.pink(msg)) - # enable logging again + # Enable logging again self.fade_out_active = False if not self.fade_in_active: self.disable_logging = False self.client.disable_logging = False + except LQConnectionError as e: self.logger.critical(str(e)) @@ -590,12 +788,10 @@ class LiquidSoapCommunicator(ExceptionLogger): Returns: (String): Message that the player is started. """ - active_entry = self.scheduler.get_active_entry() - - t = LiquidSoapInitThread(self, active_entry) + t = StartupThread(self) t.start() - return "LiquidSoapInitThread started!" + return "Engine Core startup done!" # ------------------------------------------------------------------------------------------ # @@ -616,17 +812,23 @@ class LiquidSoapCommunicator(ExceptionLogger): try: if not self.disable_logging: if namespace == "recorder": - self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args)) + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "_" + str(command) + "." + str(args)) else: if command == "": - self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + str(args)) + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + str(args)) else: - self.logger.info("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args)) + self.logger.debug("LiquidSoapCommunicator is calling " + str(namespace) + "." + str(command) + str(args)) # call wanted function ... - func = getattr(lqs_instance, namespace) - # ... and fetch the result - result = func(command, *args) + + # FIXME REFACTOR all calls in a common way + if command in ["playlist_push", "playlist_seek", "playlist_clear"]: + func = getattr(lqs_instance, command) + result = func(str(namespace), *args) + else: + func = getattr(lqs_instance, namespace) + result = func(command, *args) + if not self.disable_logging: self.logger.debug("LiquidSoapCommunicator got response " + str(result)) diff --git a/modules/communication/liquidsoap/playerclient.py b/modules/communication/liquidsoap/playerclient.py index 36830b907cd6ee84fbc6ec48b4e9fa7cb4fca05f..558775c05f3e29181713d8b5995d35ccc4f6a801 100644 --- a/modules/communication/liquidsoap/playerclient.py +++ b/modules/communication/liquidsoap/playerclient.py @@ -105,9 +105,9 @@ class LiquidSoapPlayerClient(LiquidSoapClient): Liquidsoap server response """ if channel == Channel.FILESYSTEM_A.value: - self.command(channel.value, 'clear_filesystem_0') + self.command(channel, 'clear_filesystem_0') elif channel == Channel.FILESYSTEM_B.value: - self.command(channel.value, 'clear_filesystem_1') + self.command(channel, 'clear_filesystem_1') else: return "Invalid filesystem channel '%s'" % channel diff --git a/modules/communication/redis/adapter.py b/modules/communication/redis/adapter.py index 9326f4410207667b07698692cd0efa83140dcdf5..ab53544e2a3c8438f9e98be3e8da58a1011e783a 100644 --- a/modules/communication/redis/adapter.py +++ b/modules/communication/redis/adapter.py @@ -175,18 +175,9 @@ class ServerRedisAdapter(threading.Thread, RedisMessenger): #playlist = playlist[0:len(playlist)-8] self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist) - elif item["data"].find("adapt_trackservice_title") >= 0: - source = item["data"].split("|+|+|")[1] - artist = item["data"].split("|+|+|")[2] - title = item["data"].split("|+|+|")[3] - - if not artist: - artist = "" - if not title: - title = "" - - self.execute(RedisChannel.TS_REPLY.value, self.scheduler.liquidsoapcommunicator.store_trackservice_info, source) - self.execute(RedisChannel.TS_REPLY.value, self.scheduler.adapt_trackservice_title, source, artist, title) + elif item["data"].find("on_play") >= 0: + source = item["data"].split("on_play ")[1] + self.execute(RedisChannel.TS_REPLY.value, self.scheduler.liquidsoapcommunicator.on_play, source) elif item["data"] == "recreate_db": self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database) diff --git a/modules/communication/redis/messenger.py b/modules/communication/redis/messenger.py index e6f2f7144f70da06bf647c01b6f84bfe11c8c537..aae12e4c0205dfe6b05b9288ad4fe243c5951d72 100644 --- a/modules/communication/redis/messenger.py +++ b/modules/communication/redis/messenger.py @@ -299,8 +299,8 @@ class RedisMessenger(): # return next.decode('utf-8') # ------------------------------------------------------------------------------------------ # - # def adapt_trackservice_title(self, info): - # result = self.rstore.db.get('adapt_trackservice_title') + # def on_play(self, info): + # result = self.rstore.db.get('on_play') # if result is None: # result = b"" diff --git a/modules/core/startup.py b/modules/core/startup.py index 4717de0b1c88e3582956563daa6ce3b6a9f9c75d..6787554e61c319b238a7bf49da3502e2585ab5c2 100644 --- a/modules/core/startup.py +++ b/modules/core/startup.py @@ -24,8 +24,9 @@ import datetime import threading import meta -from modules.base.enum import Channel, ChannelType -from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil +from modules.base.exceptions import NoActiveScheduleException +from modules.base.enum import Channel, ChannelType +from modules.base.utils import TerminalColors, SimpleUtil, EngineUtil class StartupThread(threading.Thread): @@ -37,99 +38,33 @@ class StartupThread(threading.Thread): logger = None active_entry = None liquidsoapcommunicator = None + scheduler = None - - def __init__(self, liquidsoapcommunicator, active_entry): + def __init__(self, liquidsoapcommunicator): """ Initialize the thread. """ threading.Thread.__init__(self) self.logger = logging.getLogger("AuraEngine") self.liquidsoapcommunicator = liquidsoapcommunicator - self.active_entry = active_entry + self.scheduler = liquidsoapcommunicator.scheduler def run(self): """ - Starts the LiquidSoap player including the current show. + Boots the soundsystem. """ try: - # Sleep needed, because the socket is created too slow by liquidsoap - time.sleep(1) - self.logger.info("Waited 1s for liquidsoap. Jez soit a si gspian") - - self.liquidsoapcommunicator.enable_transaction() - - # Wait another second. lqs really starts slow.. be prepared you liquidsoap you! - time.sleep(1) - self.logger.info(SimpleUtil.green("Engine Core ------[ connected ]-------- Liquidsoap")) - - self.set_start_parameters() + self.liquidsoapcommunicator.start() self.logger.info(EngineUtil.engine_info("Engine Core", meta.__version__)) + self.scheduler.on_ready() - # Display the current programme - programme = self.liquidsoapcommunicator.scheduler.get_ascii_programme() - self.logger.info(programme) - - # Start playing - self.set_active_show() - self.liquidsoapcommunicator.disable_transaction() - - # The rest of the system now can use liquidsoap connection - self.liquidsoapcommunicator.is_liquidsoap_running = True - - + except NoActiveScheduleException as e: + self.logger.info("Nothing scheduled at startup time. Please check if there are follow-up schedules.") except Exception as e: - self.logger.critical(TerminalColors.RED.value+"Liquidsoap connection ERROR! Restart LQ Server! Reason: "+str(e)+TerminalColors.ENDC.value) - + self.logger.error("Error while initializing the soundsystem: " + str(e)) - def set_active_show(self): - """ - Sets and resumes the show which should be playing at the time of starting - the LiquidSoap player. - """ - if self.active_entry is not None: - channel_type = self.active_entry.type - self.logger.info("Engine Startup: Play '%s' via channel type '%s'" % (str(self.active_entry), channel_type)) - - # TODO Skip active entry if not enough time left; wait and play next one instead - self.liquidsoapcommunicator.play(self.active_entry, False) - if channel_type == ChannelType.FILESYSTEM: - - - - # Have to seek? Calc how many seconds were missed - now_unix = time.mktime(datetime.datetime.now().timetuple()) - seconds_to_seek = now_unix - self.active_entry.start_unix - - # And seek these seconds forward - if seconds_to_seek > 0: - # Without plenty of timeout (10s) the seek doesn't work - seconds_to_seek += 10 - time.sleep(10) - channel = self.liquidsoapcommunicator.active_channel[ChannelType.FILESYSTEM] - response = self.liquidsoapcommunicator.playlist_seek(channel, seconds_to_seek) - self.logger.info("LiquidSoap seek response: " + response) - - else: - self.logger.warning("No active entry in the scheduler! Is a programme loaded?") - - - - - def set_start_parameters(self): - """ - Set initial parameters for the LiquidSoap player startup. - """ - self.liquidsoapcommunicator.mixer_start() - - # Setting init params like a blank file.. - install_dir = self.liquidsoapcommunicator.config.get("install_dir") - channel = self.liquidsoapcommunicator.active_channel[ChannelType.FILESYSTEM] - self.liquidsoapcommunicator.playlist_push(channel, install_dir + "/configuration/blank.flac") - # .. or the radio fro stream (it is overwritten as soon as one http overtake is planned) - #self.liquidsoapcommunicator.set_http_url("http://stream.fro.at/fro-128.ogg") diff --git a/modules/core/state.py b/modules/core/state.py new file mode 100644 index 0000000000000000000000000000000000000000..c188695eca903a67ba34ac8ff63e7bcb88d78365 --- /dev/null +++ b/modules/core/state.py @@ -0,0 +1,158 @@ + +# +# Aura Engine +# +# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at> + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +from collections import deque + +from modules.base.exceptions import NoActiveEntryException +from modules.base.utils import SimpleUtil +from libraries.database.broadcasts import SingleEntry, SingleEntryMetaData, PlaylistEntry, PlaylistEntryMetaData, TrackService + + + +class PlayerStateService: + """ + PlayerStateService keeps a short history of currently playing entries. It stores the recent + active entries to a local cache `entry_history` being able to manage concurrently playing entries. + + It also is in charge of storing relevant meta information of the currently playing entry to + the TrackService table. + """ + + config = None + logger = None + entry_history = None + + + + def __init__(self, config): + """ + Constructor + + Args: + config (AuraConfig): Holds the engine configuration + """ + self.config = config + self.logger = logging.getLogger("AuraEngine") + self.entry_history = deque([None, None, None]) + + + # + # PUBLIC METHODS + # + + + def set_active_entry(self, entry): + """ + Saves the currently playing entry to the local cache. + """ + self.entry_history.pop() + self.entry_history.appendleft(entry) + + self.logger.info("Active entry history:\n"+str(self.entry_history)) + + + + def get_active_entry(self): + """ + Retrieves the currently playing `Entry` from the local cache. + """ + return self.entry_history[0] + + + + def store_trackservice_entry(self, source): + """ + Stores the given entry the Track Service. + + Args: + source (String): The URI of the currently playing source + + Raises: + (NoActiveEntryException): In case currently nothing is playing + """ + active_entry = self.get_active_entry() + + if not active_entry: + raise NoActiveEntryException + + if active_entry.filename == source: + trackservice = TrackService(active_entry) + trackservice.store(add=True, commit=True) + + active_entry.trackservice_id = trackservice.id + active_entry.store(add=False, commit=True) + + self.logger.info("Stored active entry '%s' to TrackService as '%s'" % (active_entry, trackservice)) + else: + msg = "Active entry source '%s' != '%s' activated source." % (active_entry.filename, source) + self.logger.critical(SimpleUtil.red(msg)) + + + + + # def adapt_trackservice_title(self, filename): + # """ + # Updates the track-service entry with the info from a fallback track/playlist. + # """ + # liquidsoap_offset = int(self.config.lqs_delay_offset) + # scheduled_entry = self.get_active_entry(liquidsoap_offset) + + # entry = SingleEntry() + # meta = SingleEntryMetaData() + + # # # Validate artist and title + # # if not title: + # # title = self.config.get("fallback_title_not_available") + + # # Create Entry + # entry.filename = filename + # entry.duration = self.fallback_manager.get_track_duration(filename) + # if not entry.duration: + # self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry))) + + # # Create track service log for local station fallback (type=4) + # trackservice = TrackService(entry, 4) + # trackservice.store(add=True, commit=True) + + # entry.store(add=True, commit=True) + + # # Create Meta + # meta.artist = "----FIXME" + # meta.album = "" + # meta.title = "----TODO" + # meta.single_entry_id = entry.id + # meta.store(add=True, commit=True) + + # # Reference each other + # entry.meta_data_id = meta.id + # entry.trackservice_id = trackservice.id + # entry.store(add=False, commit=True) + + + # msg = "Track Service active track '%s' updated with fallback source '%s - %s'!" % (scheduled_entry, meta.artist, meta.title) + # self.logger.info(msg) + # return msg + + + # + # PRIVATE METHODS + # + diff --git a/modules/liquidsoap/engine.liq b/modules/liquidsoap/engine.liq index 23bf55fc748282f8077479dd5dfccae0a3d7a6c7..88324873598abeda2c696f47477166d33ad82611 100644 --- a/modules/liquidsoap/engine.liq +++ b/modules/liquidsoap/engine.liq @@ -50,14 +50,20 @@ inputs = ref [] %include "in_soundcard.liq" # fill the mixer -mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_filesystem_2, input_filesystem_3, input_filesystem_4, input_http_0, input_http_1], !inputs)) +mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_http_0, input_http_1], !inputs)) +# mixer = mix(id="mixer", list.append([input_filesystem_0, input_filesystem_1, input_filesystem_2, input_filesystem_3, input_filesystem_4, input_http_0, input_http_1], !inputs)) # output source with fallbacks stripped_stream = strip_blank(id='strip_blank', track_sensitive=false, max_blank=fallback_max_blank, min_noise=fallback_min_noise, threshold=fallback_threshold, mixer) # enable fallback -output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)]) +# output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)]) +#output_source = fallback(id="fallback", track_sensitive=false, [stripped_stream, timeslot_fallback, show_fallback, mksafe(station_fallback)]) +output_source = mksafe(stripped_stream) +ignore(timeslot_fallback) +ignore(show_fallback) +ignore(station_fallback) ################## # create outputs # diff --git a/modules/liquidsoap/fallback.liq b/modules/liquidsoap/fallback.liq index b43ee24ed3597eda6c11453b0b82349c6a641723..a160e9ed885f9e7ac664f0211170c3c55af34b2a 100644 --- a/modules/liquidsoap/fallback.liq +++ b/modules/liquidsoap/fallback.liq @@ -204,9 +204,9 @@ def fallback_create(~skip=true, name, requestor) # Tell the system when a new track is played def do_meta(meta) = filename = meta["filename"] - artist = meta["artist"] - title = meta["title"] - system('#{list.assoc(default="", "install_dir", ini)}/guru.py --adapt-trackservice-title "|+|+|#{filename}|+|+|#{artist}|+|+|#{title}|+|+|"') + # artist = meta["artist"] + # title = meta["title"] + system('#{list.assoc(default="", "install_dir", ini)}/guru.py --on_play "#{filename}"') end source = on_metadata(do_meta, source) diff --git a/modules/liquidsoap/in_filesystem.liq b/modules/liquidsoap/in_filesystem.liq index f410f92d8a29c4eb278a2fd2fa6430aa84bd4d66..80d5bdc2ad7a7299dab9eccb2242bfd08d8ae431 100644 --- a/modules/liquidsoap/in_filesystem.liq +++ b/modules/liquidsoap/in_filesystem.liq @@ -28,10 +28,21 @@ input_filesystem_1 = request.equeue(id="in_filesystem_1") # input_filesystem_3 = request.equeue(id="in_filesystem_3") # input_filesystem_4 = request.equeue(id="in_filesystem_4") - #input_fs = cue_cut(mksafe(request.equeue(id="fs"))) -#req = request.queue(id="req") + +# Update Trackservice + +def do_meta_filesystem(meta) = + filename = meta["filename"] + # artist = meta["artist"] + # title = meta["title"] + system('#{list.assoc(default="", "install_dir", ini)}/guru.py --on_play "#{filename}"') +end + +input_filesystem_0 = on_metadata(id="in_filesystem_0", do_meta_filesystem, input_filesystem_0) +input_filesystem_1 = on_metadata(id="in_filesystem_1", do_meta_filesystem, input_filesystem_1) + # def clear_queue(s) = # ret = server.execute("fs.queue") diff --git a/modules/scheduling/scheduler.py b/modules/scheduling/scheduler.py index ed78a940447bd2e78df1a99bd3de2bb6a433032a..cd336ef9fc61f28c6b45ad6d4e1a2de57d6b8963 100644 --- a/modules/scheduling/scheduler.py +++ b/modules/scheduling/scheduler.py @@ -22,12 +22,6 @@ # along with engine. If not, see <http://www.gnu.org/licenses/>. # -# Meta -__version__ = '0.0.1' -__license__ = "GNU General Public License (GPL) Version 3" -__version_info__ = (0, 0, 1) -__author__ = 'Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>' - import time import json @@ -40,13 +34,15 @@ import threading from operator import attrgetter -from modules.base.simpleutil import SimpleUtil +from libraries.database.broadcasts import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData, SingleEntry, SingleEntryMetaData, TrackService +from libraries.exceptions.exception_logger import ExceptionLogger + +from modules.base.exceptions import NoActiveScheduleException, NoActiveEntryException +from modules.base.enum import Channel, ChannelType, TimerType, TransitionType, EntryQueueState +from modules.base.utils import SimpleUtil, TerminalColors from modules.communication.redis.messenger import RedisMessenger from modules.scheduling.calendar import AuraCalendarService from modules.scheduling.fallback_manager import FallbackManager -from libraries.database.broadcasts import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData, SingleEntry, SingleEntryMetaData, TrackService -from libraries.exceptions.exception_logger import ExceptionLogger -from libraries.enum.auraenumerations import ScheduleEntryType, TimerType, TerminalColors def alchemyencoder(obj): @@ -91,14 +87,13 @@ class AuraScheduler(ExceptionLogger, threading.Thread): liquidsoapcommunicator = None last_successful_fetch = None programme = None - active_entry = None message_timer = [] fallback_manager = None - #schedule_entries = None client = None - def __init__(self, config): + + def __init__(self, config, liquidsoapcommunicator): """ Constructor @@ -107,11 +102,14 @@ class AuraScheduler(ExceptionLogger, threading.Thread): """ self.config = config self.logger = logging.getLogger("AuraEngine") - self.init_error_messages() + + # self.init_error_messages() self.init_database() self.fallback_manager = FallbackManager(config, self.logger, self) self.redismessenger = RedisMessenger(config) - + self.liquidsoapcommunicator = liquidsoapcommunicator + self.liquidsoapcommunicator.scheduler = self + # init threading threading.Thread.__init__(self) @@ -132,14 +130,17 @@ class AuraScheduler(ExceptionLogger, threading.Thread): 1. `self.fetch_new_programme()` periodically from the API depending on the `fetching_frequency` defined in the engine configuration. 2. Loads the latest programme from the database and sets the instance state `self.programme` with current schedules. - 3. Queues all playlists of the programm, other than the playlist currently to be played (This is triggered by Liquidsoap itself). + 3. Queues all playlists of the programm, if the soundssystem is ready to accept commands. """ while not self.exit_event.is_set(): seconds_to_wait = int(self.config.get("fetching_frequency")) next_time = datetime.datetime.now() + datetime.timedelta(seconds=seconds_to_wait) self.logger.info("Fetch new programmes every %ss. Next fetching in %ss." % (str(seconds_to_wait), str(next_time))) self.fetch_new_programme() - self.queue_programme() + + if self.liquidsoapcommunicator.is_ready(): + self.queue_programme() + self.print_message_queue() self.exit_event.wait(seconds_to_wait) @@ -149,18 +150,70 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # PUBLIC METHODS # + + def on_ready(self): + """ + Called when the soundsystem is ready. + """ + self.queue_programme() + self.logger.info(self.get_ascii_programme()) + self.play_active_entry() + + + def play_active_entry(self): + """ + Plays the entry scheduled for the current moment. + """ + sleep_offset = 10 + + active_entry = self.get_active_entry() + if not active_entry: + raise NoActiveScheduleException + + # In case of a file-system source, we need to fast-foward to the current marker as per schedule + if active_entry.type == ChannelType.FILESYSTEM: + + # Calculate the seconds we have to fast-forward + now_unix = self.get_virtual_now() + seconds_to_seek = now_unix - active_entry.start_unix + + # If the seek exceeds the length of the current track, + # there's no need to do anything - the scheduler takes care of the rest + if (seconds_to_seek + sleep_offset) > active_entry.duration: + self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.") + else: + # Play active entry + self.liquidsoapcommunicator.play(active_entry, TransitionType.FADE) + + # Check if this is the last item of the schedule + if active_entry.end_unix > active_entry.playlist.schedule.end_unix: + self.queue_end_of_schedule(active_entry, True) + else: #Otherwise only queue the regular end of the entry + #FIXME + self.logger.error(SimpleUtil.red("--FIXME-- Implement queue EOE")) + + # Fast-forward to the scheduled position + if seconds_to_seek > 0: + # Without plenty of timeout (10s) the seek doesn't work + seconds_to_seek += sleep_offset + time.sleep(sleep_offset) + self.logger.info("Going to fast-forward %s seconds" % seconds_to_seek) + self.liquidsoapcommunicator.enable_transaction() + response = self.liquidsoapcommunicator.playlist_seek(active_entry.channel, seconds_to_seek) + self.liquidsoapcommunicator.disable_transaction() + self.logger.info("LiquidSoap seek response: " + response) + + + + def get_active_entry(self): """ Retrieves the current `PlaylistEntry` which should be played as per programme. - Publically called via `LiquidSoapCommunicator`. - - Important note: This method also updates the state variable `active_entry`. Returns: (PlaylistEntry): The track which is (or should) currently being played """ - now_unix = time.mktime(datetime.datetime.now().timetuple()) - + now_unix = self.get_virtual_now() # Load programme if necessary if not self.programme: @@ -169,44 +222,56 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # Check for scheduled playlist current_schedule, current_playlist = self.get_active_playlist() + + if not current_schedule: + self.logger.warning("There's no active schedule") + return None if not current_playlist: - if not current_schedule: - self.logger.critical("There's no active playlist nor schedule. It's probably time to play some fallback...") - else: - self.logger.warning("There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule.") + self.logger.warning("There's no active playlist for a current schedule. Most likely the playlist finished before the end of the schedule.") return None - time_start = SimpleUtil.fmt_time(current_playlist.start_unix) - time_end = SimpleUtil.fmt_time(current_playlist.start_unix+current_playlist.duration) - time_now = SimpleUtil.fmt_time(now_unix) - self.logger.info("Current Playlist (%d:%d) for show '%s' scheduled to be played at %s until %s (Now: %s)" % (current_playlist.playlist_id, current_playlist.artificial_id, current_playlist.show_name, time_start, time_end, time_now)) - # Iterate over playlist entries and store the current one - time_marker = current_playlist.start_unix current_entry = None for entry in current_playlist.entries: self.logger.info(entry) - if entry.start_unix < now_unix < entry.start_unix + entry.duration: + if entry.start_unix <= now_unix and now_unix <= entry.end_unix: current_entry = entry break - time_marker += entry.duration - - if current_entry: - time_start = SimpleUtil.fmt_time(current_entry.start_unix) - time_end = SimpleUtil.fmt_time(current_entry.start_unix+current_entry.duration) - time_now = SimpleUtil.fmt_time(now_unix) - self.logger.info("Track '%s' is expected playing from %s to %s (Now: %s)" % (current_entry.filename, time_start, time_end, time_now)) - if not self.active_entry: - self.logger.warn("Activate track '%s' and [>> FFWD] to current point in time" % (current_entry.filename)) - elif self.active_entry.filename != current_entry.filename: - self.logger.critical("--- SOMETHING UNEXPECTED IS PLAYING: %s --vs-- %s" % (self.active_entry.filename, current_entry.filename)) - self.active_entry = current_entry - return (current_entry) - else: + + if not current_entry: # Nothing playing ... fallback will kick-in - self.logger.warning("There's no entry scheduled for playlist '%s'. Is currently -nothing- or a fallback playing?" % str(current_playlist)) + self.logger.warning("There's no entry scheduled for playlist '%s' at time %s" % (str(current_playlist), SimpleUtil.fmt_time(now_unix))) return None + return current_entry + + + + def get_active_playlist(self): + """ + Retrieves the schedule and playlist currently to be played as per + schedule. If the current point in time has no playlist assigned, + only the matching schedule is returned. + + Returns: + (Schedule, Playlist): The current schedule and playlist tuple. + """ + current_schedule = None + current_playlist = None + now_unix = self.get_virtual_now() + + # Iterate over all shows and playlists and find the one to be played right now + if self.programme: + for schedule in self.programme: + if schedule.start_unix <= now_unix and now_unix < schedule.end_unix: + current_schedule = schedule + for playlist in schedule.playlist: + if playlist.start_unix <= now_unix and now_unix < playlist.end_unix: + current_playlist = playlist + break + break + + return current_schedule, current_playlist @@ -256,7 +321,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # ------------------------------------------------------------------------------------------ # def set_next_file_for(self, playlistname): self.logger.critical("HAVE TO <SET> NEXT FILE FOR: " + playlistname) - self.logger.critical(str(self.get_active_entry())) + self.logger.critical(str(self.get_active_entry(0))) if playlistname == "station": file = "/home/david/Code/aura/engine2/testing/content/ernie_mayne_sugar.mp3" @@ -288,10 +353,11 @@ class AuraScheduler(ExceptionLogger, threading.Thread): Returns: (String): Absolute path to the file to be played as a fallback. """ + file = self.fallback_manager.get_fallback_for(fallbackname) if file: - self.logger.info("Got next file '%s' (type: %s)" % (file, fallbackname)) + self.logger.info("Got next file '%s' (fallback type: %s)" % (file, fallbackname)) #set_next_file_thread = SetNextFile(fallbackname, show) #set_next_file_thread.start() @@ -300,70 +366,103 @@ class AuraScheduler(ExceptionLogger, threading.Thread): - def update_track_service(self, entry): - """ - Inserts the given, currently playing `PlaylistEntry` to the track-service. - Called by LiquidSoapCommunicator when a new playlist item is going to be activated. - Args: - entry (PlaylistEntry): The item which is currently playing + def get_ascii_programme(self): """ - if not entry.duration: - self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry))) + Creates a printable version of the current programme (playlists and entries as per schedule) - trackservice = TrackService(entry) - trackservice.store(add=True, commit=True) + Returns: + (String): A ASCII representation of the programme + """ + active_schedule, active_playlist = self.get_active_playlist() + playlists = self.get_next_playlists() - entry.trackservice_id = trackservice.id - entry.store(add=False, commit=True) + s = "\n\n PLAYING NOW:" + s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" + if active_schedule: + s += "\n│ Playing schedule %s " % active_schedule + if active_playlist: + s += "\n│ └── Playlist %s " % active_playlist + + # active_entry = active_playlist.current_entry + active_entry = self.get_active_entry() - self.logger.info("Stored track-service entry %s" % trackservice) + # Finished entries + for entry in active_playlist.entries: + if active_entry == entry: + break + else: + s += self.build_entry_string("\n│ └── ", entry, True) + # Entry currently being played + if active_entry: + s += "\n│ └── Entry %s | %s " % \ + (str(entry.entry_num+1), SimpleUtil.green("PLAYING > "+str(active_entry))) + # Open entries for current playlist + rest_of_playlist = active_entry.get_next_entries(False) + entries = self.preprocess_entries(rest_of_playlist, False) + s += self.build_playlist_string(entries) + + else: + s += "\n│ └── %s" % (SimpleUtil.red("No playlist active. Did it finish before the end of the schedule?")) + else: + s += "\n│ Nothing. " + s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────" - def adapt_trackservice_title(self, filename, artist, title): - """ - Updates the track-service entry with the info from a fallback track/playlist. - """ - # FIXME - scheduled_entry = self.get_active_entry() + s += "\n PLAYING NEXT:" + s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" - entry = SingleEntry() - meta = SingleEntryMetaData() + if not playlists: + s += "\n│ Nothing. " + else: + for next_playlist in playlists: + s += "\n│ Queued schedule %s " % next_playlist.schedule + s += "\n│ └── Playlist %s " % next_playlist + if next_playlist.end_unix > next_playlist.schedule.end_unix: + s += "\n│ %s! " % \ + (SimpleUtil.red("↑↑↑ Playlist #%s ends after Schedule #%s!" % (next_playlist.playlist_id, next_playlist.schedule.schedule_id))) + + entries = self.preprocess_entries(next_playlist.entries, False) + s += self.build_playlist_string(entries) - # Validate artist and title - if not title: - title = self.config.get("fallback_title_not_available") + s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────\n\n" + return s - # Create Entry - entry.filename = filename - entry.duration = self.fallback_manager.get_track_duration(filename) - if not entry.duration: - self.logger.critical("Entry %s has no duration! This may cause malfunction of some engine services." % (str(entry))) - # Create track service log for local station fallback (type=4) - trackservice = TrackService(entry, 4) - trackservice.store(add=True, commit=True) + def build_playlist_string(self, entries): + """ + Returns a stringified list of entries + """ + s = "" + is_out_of_schedule = False - entry.store(add=True, commit=True) + for entry in entries: + if entry.queue_state == EntryQueueState.OUT_OF_SCHEDULE and not is_out_of_schedule: + s += "\n│ %s" % \ + SimpleUtil.red("↓↓↓ These entries won't be played because they are out of schedule.") + is_out_of_schedule = True - # Create Meta - meta.artist = artist - meta.album = "" - meta.title = title - meta.single_entry_id = entry.id - meta.store(add=True, commit=True) + s += self.build_entry_string("\n│ └── ", entry, is_out_of_schedule) - # Reference each other - entry.meta_data_id = meta.id - entry.trackservice_id = trackservice.id - entry.store(add=False, commit=True) + return s - msg = "Track Service active track '%s' updated with fallback source '%s - %s'!" % (scheduled_entry, artist, title) - self.logger.info(msg) - return msg + def build_entry_string(self, prefix, entry, strike): + """ + Returns an stringified entry. + """ + s = "" + if entry.queue_state == EntryQueueState.CUT: + s = "\n│ %s" % SimpleUtil.red("↓↓↓ This entry is going to be cut.") + if strike: + entry_str = SimpleUtil.strike(entry) + else: + entry_str = str(entry) + + entry_line = "%sEntry %s | %s" % (prefix, str(entry.entry_num+1), entry_str) + return s + entry_line @@ -371,33 +470,17 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # PRIVATE METHODS # - - - def get_active_playlist(self): + def get_virtual_now(self): """ - Retrieves the schedule and playlist currently to be played as per - schedule. If the current point in time has no playlist assigned, - only the matching schedule is returned. + Liquidsoap is slow in executing commands, therefore it's needed to schedule + actions by (n) seconds in advance, as defined in the configuration file by + the property `lqs_delay_offset`. Returns: - (Schedule, Playlist): The current schedule and playlist tuple. + (Integer): the Unix epoch timestamp including the offset """ - now_unix = time.mktime(datetime.datetime.now().timetuple()) - current_schedule = None - current_playlist = None - - # Iterate over all shows and playlists and find the one to be played right now - if self.programme: - for schedule in self.programme: - if schedule.start_unix < now_unix < schedule.end_unix: - current_schedule = schedule - for playlist in schedule.playlist: - if playlist.start_unix < now_unix < playlist.end_unix: - current_playlist = playlist - break - break - - return (current_schedule, current_playlist) + time_offset = int(self.config.lqs_delay_offset) + return SimpleUtil.timestamp() + time_offset @@ -408,7 +491,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): Returns: ([Playlist]): The next playlists """ - now_unix = time.mktime(datetime.datetime.now().timetuple()) + now_unix = self.get_virtual_now() next_playlists = [] for schedule in self.programme: @@ -421,146 +504,148 @@ class AuraScheduler(ExceptionLogger, threading.Thread): - def get_next_entry(self): - """ - Retrieves the playlist entry to be played next. - - Returns: - (Playlist): The next playlist track - """ - next_entry = None - current_schedule, current_playlist = self.get_active_playlist() - - if not self.active_entry: - self.logger.warn("For some reason there is no active playlist entry set... Fetching it now!") - self.get_active_entry() - if not self.active_entry: - self.logger.warn("Looks like nothing is currently scheduled...") - return None - - # Check if there is a next entry in the current playlist - for i, entry in enumerate(self.active_entry.playlist.entries): - if entry is self.active_entry: - if i+1 < len(self.active_entry.playlist.entries): - next_entry = self.active_entry.playlist.entries[i+1] - break - - # It might be in the next playlist... - if not next_entry: - next_playlist = None - found_current = False - # Iterate over all schedule and playlists and find the one to be played next - for schedule in self.programme: - for playlist in schedule.playlist: - if playlist is current_playlist: - found_current = True - elif found_current: - next_playlist = playlist - break - - if next_playlist: - next_entry = next_playlist.entries[0] - - if not next_entry: - self.logger.fatal("There is no next playlist-entry in the programme!") - - return next_entry - - - def queue_programme(self): """ Queues the current programme (playlists as per schedule) by creating timed commands to Liquidsoap to enable the individual tracks of playlists. """ - active_schedule, active_playlist = self.get_active_playlist() + current_schedule, current_playlist = self.get_active_playlist() playlists = self.get_next_playlists() - s = "\n\n PLAYING NOW:" - s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" - if active_schedule: - s += "\n│ Playing schedule %s " % active_schedule - if active_playlist: - s += "\n│ └── Playlist %s " % active_playlist - - active_entry = active_playlist.current_entry - # Finished entries - for entry in active_playlist.entries: - if active_entry == entry: - break - else: - s += "\n│ └── Entry %s " % SimpleUtil.strike(str(entry)) - - # Entry currently being played - if active_entry: - s += "\n│ └── Entry %s " % (TerminalColors.GREEN.value+"PLAYING > "+str(active_entry)+TerminalColors.ENDC.value) - - # Open entries for current playlist - rest_of_playlist = active_entry.get_next_entries() - s += self.queue_playlist_entries(rest_of_playlist) - - else: - s += "\n│ └── %s No Playlist active. Did it finish before the end of the schedule? %s" % (TerminalColors.ORANGE.value, TerminalColors.ENDC.value) - else: - s += "\n│ Nothing. " - s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────" + if current_schedule and current_playlist: + active_entry = self.get_active_entry() + # Finished entries + for entry in current_playlist.entries: + if active_entry == entry: + break - s += "\n PLAYING NEXT:" - s += "\n┌──────────────────────────────────────────────────────────────────────────────────────────────────────" + # Entry currently being played + if active_entry: + # Open entries for current playlist + rest_of_playlist = active_entry.get_next_entries(True) + self.queue_playlist_entries(rest_of_playlist, True, True) - if not playlists: - s += "\n│ Nothing. " - else: + if playlists: for next_playlist in playlists: - s += "\n│ Queued schedule %s " % next_playlist.schedule - s += "\n│ └── Playlist %s " % next_playlist - if next_playlist.end_unix > next_playlist.schedule.end_unix: - s += "\n│ %s ↑↑↑ Playlist #%s ends after Schedule #%s!%s " % (TerminalColors.RED.value, next_playlist.playlist_id, next_playlist.schedule.schedule_id, TerminalColors.ENDC.value) - s += self.queue_playlist_entries(next_playlist.entries) + self.queue_playlist_entries(next_playlist.entries, True, True) - s += "\n└──────────────────────────────────────────────────────────────────────────────────────────────────────\n\n" - self.logger.info(s) + self.logger.info(SimpleUtil.green("Finished queuing programme!")) - def queue_playlist_entries(self, entries): + + def queue_playlist_entries(self, entries, fade_in, fade_out): """ Creates Liquidsoap player commands for all playlist items to be executed at the scheduled time. Args: - entries([PlaylistEntry]): The playlist entries to be scheduled for playout + entries ([PlaylistEntry]): The playlist entries to be scheduled for playout + fade_in (Boolean): Fade-in at the beginning of the set of entries + fade_out (Boolean): Fade-out at the end of the set of entries Returns: - (String): Formatted string to display playlist entries in log - """ - msg = "" + (String): Formatted string to display playlist entries in log + """ + + # Play function to be called by timer + def do_play(entry): + self.logger.info(SimpleUtil.cyan("=== play('%s') ===" % entry)) + transition_type = TransitionType.INSTANT + if fade_in: + transition_type = TransitionType.FADE + self.liquidsoapcommunicator.play(entry, transition_type) + self.logger.info(self.get_ascii_programme()) - for entry in entries: - - # Function to be called by timer - def func(entry): - self.logger.info("=== Executing timed LQS command: activate('%s') ===" % entry) - self.liquidsoapcommunicator.activate(entry) + + # Mark entries which start after the end of their schedule or are cut + clean_entries = self.preprocess_entries(entries, True) + # Schedule function calls + for entry in clean_entries: planned_timer = self.is_something_planned_at_time(entry.start_unix) - now_unix = SimpleUtil.timestamp() + now_unix = self.get_virtual_now() diff = entry.start_unix - now_unix - msg += "\n│ └── Entry %s " % entry if planned_timer: - # Check if the playlist_id's are different + # Check if the Playlist IDs are different if planned_timer.entry.entry_id != entry.entry_id: # If not, stop and remove the old timer, create a new one - self.stop_timer(planned_timer) - entry.switchtimer = self.create_timer(diff, func, [entry], switcher=True) + self.stop_timer(planned_timer) else: # If the playlists do not differ => reuse the old timer and do nothing - self.logger.info("Playlist Entry %s is already scheduled - No new timer created!" % entry) + self.logger.info("Playlist Entry %s is already scheduled - no new timer created!" % entry) + continue + + # If nothing is planned at given time, create a new timer + entry.switchtimer = self.create_timer(diff, do_play, [entry], switcher=True) + + # Check if it's the last item, which needs special handling + if entry == clean_entries[-1]: + # The end of schedule is the actual end of the track + self.queue_end_of_schedule(entry, fade_out) + + + + def preprocess_entries(self, entries, cut_oos): + """ + Analyses and marks entries which are going to be cut or excluded. + + Args: + entries ([PlaylistEntry]): The playlist entries to be scheduled for playout + cut_oos (Bollean): If `True` entries which are 'out of schedule' are not returned + + Returns: + ([PlaylistEntry]): The list of processed playlist entries + """ + clean_entries = [] + + for entry in entries: + + if entry.entry_start >= entry.playlist.schedule.schedule_end: + msg = "Filtered entry (%s) after end-of schedule (%s) ... SKIPPED" % (entry, entry.playlist.schedule) + self.logger.warn(SimpleUtil.red(msg)) + entry.queue_state = EntryQueueState.OUT_OF_SCHEDULE + elif entry.end_unix > entry.playlist.schedule.end_unix: + entry.queue_state = EntryQueueState.CUT else: - # If nothing is planned at given time, create a new timer - entry.switchtimer = self.create_timer(diff, func, [entry], switcher=True) + entry.queue_state = EntryQueueState.OKAY + + if not entry.queue_state == EntryQueueState.OUT_OF_SCHEDULE or not cut_oos: + clean_entries.append(entry) - return msg + return clean_entries + + + + def queue_end_of_schedule(self, entry, fade_out): + """ + Queues a soundsystem action to stop/fade-out the given schedule + + Args: + entry (PlaylistEntry): The last entry of the schedule + fade_out (Boolean): If the entry should be faded-out + """ + schedule_end = entry.playlist.schedule.schedule_end + schedule_end_unix = entry.playlist.schedule.end_unix + now_unix = self.get_virtual_now() + fade_out_time = 0 + + # Stop function to be called when schedule ends + def do_stop(entry): + self.logger.info(SimpleUtil.cyan("=== stop('%s') ===" % entry)) + transition_type = TransitionType.INSTANT + if fade_out: + transition_type = TransitionType.FADE + self.liquidsoapcommunicator.stop(entry, transition_type) + + if fade_out == True: + fade_out_time = int(round(float(self.config.get("fade_out_time")))) #FIXME Use float + + start_fade_out = schedule_end_unix - now_unix - fade_out_time + entry.fadeouttimer = self.create_timer(start_fade_out, do_stop, [entry], fadeout=True) + + self.logger.info("Fading out schedule in %s seconds at %s | Last entry: %s" % \ + (str(start_fade_out), str(schedule_end), entry)) @@ -588,7 +673,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): self.programme = response if self.programme is not None and len(self.programme) > 0: self.last_successful_fetch = datetime.datetime.now() - self.logger.info("+++ Successfully fetched current programme from API +++") + self.logger.info(SimpleUtil.green("Finished fetching current programme from API")) if len(self.programme) == 0: self.logger.critical("Programme fetched from Steering/Tank has no entries!") elif response.startswith("fetching_aborted"): @@ -599,7 +684,7 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # Always load latest programme from the database self.last_successful_fetch = lsf self.load_programme_from_db() - self.logger.info("Finished loading current programme from database (%s schedules)" % str(len(self.programme))) + self.logger.info(SimpleUtil.green("Finished loading current programme from database (%s schedules)" % str(len(self.programme)))) for schedule in self.programme: self.logger.debug("\tSchedule %s with Playlist %s" % (str(schedule), str(schedule.playlist))) @@ -621,124 +706,129 @@ class AuraScheduler(ExceptionLogger, threading.Thread): # FIXME Still needed? - def enable_entries(self, playlist): - """ - Iterates over all playlist entries and assigs their start time. - Additionally timers for fadings are created. + # def enable_entries(self, playlist): + # """ + # Iterates over all playlist entries and assigs their start time. + # Additionally timers for fadings are created. - Args: - playlist(Playlist): The playlist to be scheduled for playout - """ - now_unix = time.mktime(datetime.datetime.now().timetuple()) - time_marker = playlist.start_unix + # Args: + # playlist(Playlist): The playlist to be scheduled for playout + # """ + # now_unix = time.mktime(datetime.datetime.now().timetuple()) + # time_marker = playlist.start_unix - # Old entry for fading out - # FIXME retrieve active entry from previous playlist - old_entry = None + # # Old entry for fading out + # # FIXME retrieve active entry from previous playlist + # old_entry = None - for entry in playlist.entries: - diff=3 - entry.start_unix = time_marker - self.enable_timer(diff, entry, old_entry) - old_entry = entry - # time_marker += 1 # FIXME ??? + # for entry in playlist.entries: + # diff=3 + # entry.start_unix = time_marker + # self.enable_timer(diff, entry, old_entry) + # old_entry = entry + # # time_marker += 1 # FIXME ??? - # # Since we also get entries from the past, filter these out - # if time_marker > now_unix: + # # # Since we also get entries from the past, filter these out + # # if time_marker > now_unix: - # # when do we have to start? - # diff = time_marker - now_unix - # diff = 3 # FIXME test - # entry.start_unix = time_marker + # # # when do we have to start? + # # diff = time_marker - now_unix + # # diff = 3 # FIXME test + # # entry.start_unix = time_marker - # # enable the three timer - # self.enable_timer(diff, entry, old_entry) - # old_entry = entry + # # # enable the three timer + # # self.enable_timer(diff, entry, old_entry) + # # old_entry = entry - # ------------------------------------------------------------------------------------------ # - def enable_timer(self, diff, entry, old_entry): - """ - Create threads to send track-activation messages to LiquidSoap. - Those tracks can be delayed by `diff` seconds. + # # ------------------------------------------------------------------------------------------ # + # def enable_timer(self, diff, entry, old_entry): + # """ + # Create threads to send track-activation messages to LiquidSoap. + # Those tracks can be delayed by `diff` seconds. - Args: - diff (Integer): seconds after tracks should be activated - """ + # Args: + # diff (Integer): seconds after tracks should be activated + # """ - self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry)) - entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.activate, [entry]) + # self.logger.critical("ENABLING SWITCHTIMER FOR " + str(entry)) + # entry.switchtimer = self.add_or_update_timer(diff, self.liquidsoapcommunicator.play, [entry]) - # FIXME Fade In/Out logic: Not sure if that's functional - #self.enable_fading(diff, entry, old_entry) + # # FIXME Fade In/Out logic: Not sure if that's functional + # #self.enable_fading(diff, entry, old_entry) # ------------------------------------------------------------------------------------------ # - def enable_fading(self, diff, new_entry, old_entry): - # fading times - fade_out_time = float(self.config.get("fade_out_time")) - - # enable fading when entry types are different - if old_entry is not None: - if old_entry.type != new_entry.type: - #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry]) - old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True) - self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry)) - - # same for fadein except old_entry can be None - else: - #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry]) - new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True) - self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry)) + # def enable_fading(self, diff, new_entry, old_entry): + # # fading times + # fade_out_time = float(self.config.get("fade_out_time")) + + # # enable fading when entry types are different + # if old_entry is not None: + # if old_entry.type != new_entry.type: + # #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_out, [old_entry]) + # old_entry.fadeouttimer = self.create_timer(diff-fade_out_time, self.liquidsoapcommunicator.fade_out, [old_entry], fadeout=True) + # self.logger.critical("ENABLING FADEOUTTIMER FOR " + str(old_entry)) + + # # same for fadein except old_entry can be None + # else: + # #self.add_or_update_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry]) + # new_entry.fadeintimer = self.create_timer(diff, self.liquidsoapcommunicator.fade_in, [new_entry], fadein=True) + # self.logger.critical("ENABLING FADEINTIMER FOR " + str(new_entry)) # ------------------------------------------------------------------------------------------ # - def add_or_update_timer(self, diff, func, parameters): - timer = None - entry = parameters[0] - planned_timer = self.is_something_planned_at_time(entry.start_unix) + # def add_or_update_timer(self, diff, func, parameters): + # timer = None + # entry = parameters[0] + # planned_timer = self.is_something_planned_at_time(entry.start_unix) - # if something is planned on entry.entry_start - #FIXME - #if 1==0: - if planned_timer: - planned_entry = planned_timer.entry + # # if something is planned on entry.entry_start + # #FIXME + # #if 1==0: + # if planned_timer: + # planned_entry = planned_timer.entry - # check if the playlist_id's are different - if planned_entry.playlist.playlist_id != entry.playlist.playlist_id: - # if not stop the old timer and remove it from the list - self.stop_timer(planned_timer) + # # check if the playlist_id's are different + # if planned_entry.playlist.playlist_id != entry.playlist.playlist_id: + # # if not stop the old timer and remove it from the list + # self.stop_timer(planned_timer) - # and create a new one - timer = self.create_timer(diff, func, parameters, switcher=True) + # # and create a new one + # timer = self.create_timer(diff, func, parameters, switcher=True) - # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same + # # if the playlist id's do not differ => reuse the old timer and do nothing, they are the same - # if nothing is planned at given time, create a new timer - else: - timer = self.create_timer(diff, func, parameters, switcher=True) + # # if nothing is planned at given time, create a new timer + # else: + # timer = self.create_timer(diff, func, parameters, switcher=True) - if timer is None: - return planned_timer - return timer + # if timer is None: + # return planned_timer + # return timer - # ------------------------------------------------------------------------------------------ # - def stop_timer(self, timer): - # stop timer - timer.cancel() - if timer.entry.fadeintimer is not None: - timer.entry.fadeintimer.cancel() - self.message_timer.remove(timer.entry.fadeintimer) - if timer.entry.fadeouttimer is not None: - timer.entry.fadeouttimer.cancel() - self.message_timer.remove(timer.entry.fadeouttimer) - # and remove it from message queue - self.message_timer.remove(timer) - self.logger.critical("REMOVED TIMER for " + str(timer.entry)) + def is_something_planned_at_time(self, given_time): + """ + Checks for existing timers at the given time. + """ + for t in self.message_timer: + if t.entry.start_unix == given_time: + return t + return False + + - # ------------------------------------------------------------------------------------------ # def create_timer(self, diff, func, parameters, fadein=False, fadeout=False, switcher=False): + """ + Creates a new timer for timed execution of mixer commands. + + Args: + diff (Integer): The difference in seconds from now, when the call should happen + func (Function): The function to call + parameters ([]): The function parameters + + """ if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher: raise Exception("You have to call me with either fadein=true, fadeout=true or switcher=True") @@ -747,25 +837,42 @@ class AuraScheduler(ExceptionLogger, threading.Thread): t.start() return t - # ------------------------------------------------------------------------------------------ # - def is_something_planned_at_time(self, given_time): - for t in self.message_timer: - if t.entry.start_unix == given_time: - return t - return False - def init_error_messages(self): + def stop_timer(self, timer): """ - Load error messages + Stops the given timer. + + Args: + timer (Timer): The timer to stop. """ - error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" - f = open(error_file) - self.error_data = json.load(f) - f.close() + timer.cancel() + + if timer.entry.fadeintimer is not None: + timer.entry.fadeintimer.cancel() + self.message_timer.remove(timer.entry.fadeintimer) + + if timer.entry.fadeouttimer is not None: + timer.entry.fadeouttimer.cancel() + self.message_timer.remove(timer.entry.fadeouttimer) + + # Remove it from message queue + self.message_timer.remove(timer) + self.logger.info("Removed timer for " + str(timer.entry)) + + + # def init_error_messages(self): + # """ + # Load error messages + # """ + # error_file = self.config.get("install_dir") + "/errormessages/scheduler_error.js" + # f = open(error_file) + # self.error_data = json.load(f) + # f.close() + # FIXME Move to adequate module def init_database(self): """ Initializes the database. @@ -783,8 +890,8 @@ class AuraScheduler(ExceptionLogger, threading.Thread): errcode = e.orig.args[0] if errcode == 1146: # Error for no such table - x = AuraDatabaseModel() - x.recreate_db() + model = AuraDatabaseModel() + model.recreate_db() else: raise @@ -837,9 +944,8 @@ class CallFunctionTimer(threading.Timer): self.logger = logging.getLogger("AuraEngine") self.logger.debug("CallFunctionTimer: Executing LiquidSoap command '%s' in %s seconds..." % (str(func.__name__), str(diff))) - threading.Timer.__init__(self, diff, func, param) + threading.Timer.__init__(self, diff, func, args=param) - # TODO Review usage of the fading-attributes: if not fadein and not fadeout and not switcher or fadein and fadeout or fadein and switcher or fadeout and switcher: raise Exception("You have to create me with either fadein=true, fadeout=true or switcher=True")