Commit bae48780 authored by David Trattnig's avatar David Trattnig
Browse files

Scheduling of Streams.

parent 9ba4d763
......@@ -50,8 +50,8 @@ class Channel(Enum):
"""
FILESYSTEM_A = "in_filesystem_0"
FILESYSTEM_B = "in_filesystem_1"
STREAM_A = "http_1"
STREAM_B = "http_2"
HTTP_A = "in_http_0"
HTTP_B = "in_http_1"
LIVE_0 = "aura_linein_0"
LIVE_1 = "aura_linein_1"
LIVE_2 = "aura_linein_2"
......@@ -70,9 +70,9 @@ class ChannelType(Enum):
"id": "fs",
"channels": [Channel.FILESYSTEM_A, Channel.FILESYSTEM_B]
}
STREAM = {
HTTP = {
"id": "http",
"channels": [Channel.STREAM_A, Channel.STREAM_B]
"channels": [Channel.HTTP_A, Channel.HTTP_A]
}
LIVE = {
"id": "live",
......@@ -118,3 +118,17 @@ class EntryQueueState(Enum):
OKAY = "ok"
CUT = "cut"
OUT_OF_SCHEDULE = "oos"
class EntryPlayState(Enum):
UNKNOWN = "unknown"
LOADING = "loading"
READY = "ready_to_play"
PLAYING = "playing"
FINISHED = "finished"
class LiquidsoapResponse(Enum):
SUCCESS = "Done"
STREAM_STATUS_POLLING = "polling"
STREAM_STATUS_STOPPED = "stopped"
STREAM_STATUS_CONNECTED = "connected"
\ No newline at end of file
......@@ -59,6 +59,9 @@ class MailingException(Exception):
class LQConnectionError(Exception):
pass
class LQStreamException(Exception):
pass
class RedisConnectionException(Exception):
pass
\ No newline at end of file
pass
......@@ -48,7 +48,7 @@ class EngineUtil:
"""
if uri.startswith("http"):
return ChannelType.STREAM
return ChannelType.HTTP
if uri.startswith("pool") or uri.startswith("playlist") or uri.startswith("file"):
return ChannelType.FILESYSTEM
if uri.startswith("live") or uri.startswith("linein"):
......
......@@ -22,7 +22,7 @@
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
from modules.base.enum import Channel, ChannelType
from modules.base.enum import Channel
from modules.communication.liquidsoap.client import LiquidSoapClient
......@@ -58,14 +58,11 @@ class LiquidSoapPlayerClient(LiquidSoapClient):
return "LiquidSoapPlayerClient does not understand mixer." + command + str(args)
# ------------------------------------------------------------------------------------------ #
def http(self, command, *args):
if command == "url":
return self.set_http_url(*args)
return "LiquidSoapPlayerClient does not understand http." + command + str(args)
# ------------------------------------------------------------------------------------------ #
#
# Playlist
#
def playlist_push(self, channel, uri):
"""
......@@ -114,6 +111,42 @@ class LiquidSoapPlayerClient(LiquidSoapClient):
return self.message
#
# Stream
#
def http_set_url(self, channel, url):
"""
Sets the URL on the given HTTP channel.
"""
self.command(channel, 'url', url)
return self.message
def http_start(self, channel):
"""
Starts the HTTP stream set with `stream_set_url` on the given channel.
"""
self.command(channel, 'start')
return self.message
def http_stop(self, channel):
"""
Stops the HTTP stream on the given channel.
"""
self.command(channel, 'stop')
return self.message
def http_status(self, channel):
"""
Returns the status of the HTTP stream on the given channel.
"""
self.command(channel, 'status')
return self.message
def uptime(self, command=""): # no command will come
"""
Retrieves how long the engine is running already.
......@@ -145,10 +178,7 @@ class LiquidSoapPlayerClient(LiquidSoapClient):
self.command('auraengine', 'state')
return self.message
# ------------------------------------------------------------------------------------------ #
def set_http_url(self, uri):
self.command('http', 'url', uri)
return self.message
# ------------------------------------------------------------------------------------------ #
def mixerinputs(self):
......
......@@ -33,9 +33,9 @@ from modules.core.state import PlayerStateService
from modules.core.monitor import Monitoring
from modules.communication.mail import AuraMailer
from modules.base.enum import ChannelType, Channel, TransitionType
from modules.base.enum import ChannelType, Channel, TransitionType, LiquidsoapResponse, EntryPlayState
from modules.base.utils import TerminalColors, SimpleUtil
from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, EngineMalfunctionException
from modules.base.exceptions import LQConnectionError, InvalidChannelException, NoActiveEntryException, EngineMalfunctionException, LQStreamException
class SoundSystem():
......@@ -86,7 +86,7 @@ class SoundSystem():
# Initialize Default Channels
self.active_channel = {
ChannelType.FILESYSTEM: Channel.FILESYSTEM_A,
ChannelType.STREAM: Channel.STREAM_A,
ChannelType.HTTP: Channel.HTTP_A,
ChannelType.LIVE: Channel.LIVE_0
}
# self.active_entries = {}
......@@ -213,21 +213,42 @@ class SoundSystem():
#
# FIXME Currently not used, except for test class
# def get_active_channel(self):
# """
# Retrieves the active channel from programme.
def load(self, entry):
"""
Preloads the entry. This is required before the actual `play(..)` can happen.
# Returns:
# (String): The channel type, empty string if no channel is active.
# """
# active_entry = self.scheduler.get_active_entry()
# if active_entry is None:
# return ""
# return active_entry.channel
Note his method is blocking until loading has finished. If this method is called
asynchroniously, the progress on the preloading state can be looked up in `entry.state`.
"""
entry.status = EntryPlayState.LOADING
self.logger.info("Loading entry '%s'" % entry)
# ------------------------------------------------------------------------------------------ #
self.enable_transaction()
self.player_state.set_active_entry(entry)
entry.channel = self.channel_swap(entry.type)
self.disable_transaction()
# PLAYLIST
if entry.type == ChannelType.FILESYSTEM:
self.playlist_push(entry.channel, entry.source)
# STREAM
elif entry.type == ChannelType.HTTP:
self.http_load(entry.channel, entry.source)
time.sleep(1)
while not self.http_is_ready(entry.channel, entry.source):
self.logger.info("Loading Stream ...")
time.sleep(1)
entry.status = EntryPlayState.READY
# LIVE
else:
# TODO Select correct LINE-OUT channels as per entry
pass
def play(self, entry, transition):
......@@ -236,68 +257,25 @@ class SoundSystem():
a clean channel is selected and transitions between old and new channel is performed.
Args:
entry (PlaylistEntry): The audio source to be played
entry (PlaylistEntry): The audio source to be played
transition (TransitionType): The type of transition to use e.g. fade-out.
queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so;
queue (Boolean): If `True` the entry is queued if the `ChannelType` does allow so;
otherwise a new channel of the same type is activated
Raises:
(LQConnectionError): In case connecting to LiquidSoap isn't possible
"""
try:
self.enable_transaction()
# channel = self.active_channel[entry.type]
# prev_channel = channel
# already_active = False
#FIXME
# queue=False
# if self.active_channel_type == entry.type:
# msg = SimpleUtil.pink("Channel type %s already active!" % str(entry.type))
# self.logger.info(msg)
# already_active = True
self.player_state.set_active_entry(entry)
entry.channel = self.channel_swap(entry.type)
# entry.channel = channel
# PLAYLIST
if entry.type == ChannelType.FILESYSTEM:
# if not queue:
self.playlist_push(entry.channel, entry.filename)
# STREAM
elif entry.type == ChannelType.STREAM:
self.set_http_url(entry.channel, entry.source)
self.http_start_stop(entry.channel, True)
# LIVE
else:
# TODO Select correct LINE-OUT channels as per entry
pass
# if not already_active:
# self.channel_transition(prev_channel, channel, entry.volume, 0)
# Assign selected channel
# Move channel volume all the way up
self.enable_transaction()
if transition == TransitionType.FADE:
self.fade_in(entry)
else:
self.channel_volume(entry.channel, entry.volume)
self.disable_transaction()
# Update active channel and type
#self.active_channel_type = entry.type
self.active_channel[entry.type] = entry.channel
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__,
......@@ -352,27 +330,16 @@ class SoundSystem():
pass
# def channel_transition(self, source_channel, target_channel, target_volume=100, transition_type=0):
# # Default: target_channel = 100% volume, source_channel = 0% volume
# if transition_type == 0:
# # Set volume of channel
# self.channel_volume(target_channel, target_volume)
# # Mute source channel
# if target_channel != source_channel:
# self.channel_volume(source_channel, 0)
# # Set other channels to zero volume
# # others = self.all_inputs_but(target_channel)
# # self.logger.info("Setting Volume=0 for channels: %s" % str(others))
# # for o in others:
# # self.channel_volume(o, 0)
def channel_swap(self, channel_type):
"""
Returns the currently in-active channel for a given type. For example if the currently some
file on channel FILESYSTEM_A is playing, the channel FILESYSTEM B is returned for being used
to queue new entries.
Args:
channel_type (ChannelType): The channel type such es filesystem, stream or live channel
"""
active_channel = self.active_channel[channel_type]
channel = None
msg = None
......@@ -385,16 +352,19 @@ class SoundSystem():
channel = Channel.FILESYSTEM_A
msg = "Swapped filesystem channel from B > A"
elif channel_type == ChannelType.STREAM:
if active_channel == Channel.STREAM_A:
channel = Channel.STREAM_B
# TODO Clear old channel
elif channel_type == ChannelType.HTTP:
if active_channel == Channel.HTTP_A:
channel = Channel.HTTP_B
msg = "Swapped stream channel from A > B"
else:
channel = Channel.STREAM_A
channel = Channel.HTTP_A
msg = "Swapped stream channel from B > A"
if msg: self.logger.info(SimpleUtil.pink(msg))
# self.active_channel[channel_type] = channel
return channel
......@@ -488,46 +458,63 @@ class SoundSystem():
# Channel Type - Stream
#
def http_load(self, channel, url):
"""
Preloads the stream URL on the given channel.
"""
result = None
def stream_start(self, url):
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", "url", url)
self.__send_lqc_command__(self.client, "http", "start")
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
self.enable_transaction()
result = self.__send_lqc_command__(self.client, channel, "http_stop")
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("stream.stop result: " + result)
raise LQStreamException("Error while stopping stream!")
result = self.__send_lqc_command__(self.client, channel, "http_set_url", url)
def stream_stop(self, url):
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", "start")
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
if result != LiquidsoapResponse.SUCCESS.value:
self.logger.error("stream.set_url result: " + result)
raise LQStreamException("Error while setting stream URL!")
# Liquidsoap ignores commands sent without a certain timeout
time.sleep(2)
def http_start_stop(self, start):
if start:
cmd = "start"
else:
cmd = "stop"
result = self.__send_lqc_command__(self.client, channel, "http_start")
self.logger.info("stream.start result: " + result)
try:
self.enable_transaction()
self.__send_lqc_command__(self.client, "http", cmd)
self.disable_transaction()
except LQConnectionError:
# we already caught and handled this error in __send_lqc_command__, but we do not want to execute this function further
pass
self.disable_transaction()
return result
def http_is_ready(self, channel, url):
"""
Checks if the stream on the given channel is ready to play.
"""
result = None
self.enable_transaction()
result = self.__send_lqc_command__(self.client, channel, "http_status")
self.logger.info("stream.status result: " + result)
if not result.startswith(LiquidsoapResponse.STREAM_STATUS_CONNECTED.value):
return False
lqs_url = result.split(" ")[1]
if not url == lqs_url:
self.logger.error("Wrong URL '%s' set for channel '%s', expected: '%s'." % (lqs_url, channel, url))
return False
self.disable_transaction()
# Wait another 10 (!) seconds, because even now the old source might *still* be playing
self.logger.info("Ready to play stream, Liquidsoap wants you to wait another 10secs though...")
time.sleep(10)
return True
# ------------------------------------------------------------------------------------------ #
def set_http_url(self, uri):
return self.__send_lqc_command__(self.client, "http", "url", uri)
#
......@@ -830,7 +817,7 @@ class SoundSystem():
# call wanted function ...
# FIXME REFACTOR all calls in a common way
if command in ["playlist_push", "playlist_seek", "playlist_clear"]:
if command in ["playlist_push", "playlist_seek", "playlist_clear", "http_set_url", "http_start", "http_stop", "http_status"]:
func = getattr(lqs_instance, command)
result = func(str(namespace), *args)
else:
......
......@@ -744,8 +744,8 @@ class SingleEntry(DB.Model, AuraDatabaseModel):
type = EngineUtil.get_channel_type(self.uri)
if type == ChannelType.FILESYSTEM:
return Channel.FILESYSTEM_A
elif type == ChannelType.STREAM:
return Channel.STREAM_A
elif type == ChannelType.HTTP:
return Channel.HTTP_A
else:
return "foo:bar"
#FIXME Extend & finalize!!
......@@ -943,7 +943,7 @@ class SingleEntryMetaData(DB.Model, AuraDatabaseModel):
#
# def set_entry_type(self):
# if self.uri.startswith("http"):
# self.type = ScheduleEntryType.STREAM
# self.type = ScheduleEntryType.HTTP
# if self.uri.startswith("pool") or self.uri.startswith("playlist") or self.uri.startswith("file"):
# self.type = ScheduleEntryType.FILESYSTEM
# if self.uri.startswith("live") or self.uri.startswith("linein"):
......@@ -1052,7 +1052,7 @@ class SingleEntryMetaData(DB.Model, AuraDatabaseModel):
# if self.type == self.type.LIVE_0 or self.type == self.type.LIVE_1 or self.type == self.type.LIVE_2 or self.type == self.type.LIVE_3 or self.type == self.type.LIVE_4:
# return "aura_linein_"+self.cleansource # .cleanprotocol[8]
#
# if self.type == self.type.STREAM:
# if self.type == self.type.HTTP:
# return "http"
#
#
......
......@@ -37,7 +37,7 @@ from operator import attrgetter
from modules.database.model import AuraDatabaseModel, Schedule, Playlist, PlaylistEntry, PlaylistEntryMetaData, SingleEntry, SingleEntryMetaData, TrackService
from modules.base.exceptions import NoActiveScheduleException, NoActiveEntryException
from modules.base.enum import Channel, ChannelType, TimerType, TransitionType, EntryQueueState
from modules.base.enum import Channel, ChannelType, TimerType, TransitionType, EntryQueueState, EntryPlayState
from modules.base.utils import SimpleUtil, TerminalColors
from modules.communication.redis.messenger import RedisMessenger
from modules.scheduling.calendar import AuraCalendarService
......@@ -196,7 +196,8 @@ class AuraScheduler(threading.Thread):
if (seconds_to_seek + sleep_offset) > active_entry.duration:
self.logger.info("The FFWD [>>] range exceeds the length of the entry. Drink some tea and wait for the sound of the next entry.")
else:
# Play active entry
# Load and play active entry
self.soundsystem.load(active_entry)
self.soundsystem.play(active_entry, TransitionType.FADE)
# Check if this is the last item of the schedule
......@@ -213,6 +214,18 @@ class AuraScheduler(threading.Thread):
response = self.soundsystem.playlist_seek(active_entry.channel, seconds_to_seek)
self.soundsystem.disable_transaction()
self.logger.info("LiquidSoap seek response: " + response)
elif active_entry.type == ChannelType.HTTP:
# Load and play active entry
self.soundsystem.load(active_entry)
self.soundsystem.play(active_entry, TransitionType.FADE)
self.queue_end_of_schedule(active_entry, True)
elif active_entry.type == ChannelType.LIVE:
self.logger.warn("LIVE ENTRIES ARE NOT YET IMPLEMENTED!")
else:
self.logger.critical("Unknown Entry Type: %s" % active_entry)
......@@ -534,11 +547,11 @@ class AuraScheduler(threading.Thread):
if active_entry:
# Open entries for current playlist
rest_of_playlist = active_entry.get_next_entries(True)
self.queue_playlist_entries(rest_of_playlist, True, True)
self.queue_playlist_entries(rest_of_playlist, False, True)
if playlists:
for next_playlist in playlists:
self.queue_playlist_entries(next_playlist.entries, True, True)
self.queue_playlist_entries(next_playlist.entries, False, True)
self.logger.info(SimpleUtil.green("Finished queuing programme!"))
......@@ -558,37 +571,13 @@ class AuraScheduler(threading.Thread):
(String): Formatted string to display playlist entries in log
"""
# Play function to be called by timer
def do_play(entry):
self.logger.info(SimpleUtil.cyan("=== play('%s') ===" % entry))
transition_type = TransitionType.INSTANT
if fade_in:
transition_type = TransitionType.FADE
self.soundsystem.play(entry, transition_type)
self.logger.info(self.get_ascii_programme())
# Mark entries which start after the end of their schedule or are cut
clean_entries = self.preprocess_entries(entries, True)
# Schedule function calls
for entry in clean_entries:
planned_timer = self.is_something_planned_at_time(entry.start_unix)
now_unix = self.get_virtual_now()
diff = entry.start_unix - now_unix
if planned_timer:
# Check if the Playlist IDs are different
if planned_timer.entry.entry_id != entry.entry_id:
# If not, stop and remove the old timer, create a new one
self.stop_timer(planned_timer)
else:
# If the playlists do not differ => reuse the old timer and do nothing
self.logger.info("Playlist Entry %s is already scheduled - no new timer created!" % entry)
continue
# If nothing is planned at given time, create a new timer
entry.switchtimer = self.create_timer(diff, do_play, [entry], switcher=True)
self.set_entry_timer(entry, fade_in, fade_out)
# Check if it's the last item, which needs special handling
if entry == clean_entries[-1]:
......@@ -597,13 +586,52 @@ class AuraScheduler(threading.Thread):
def set_entry_timer(self, entry, fade_in, fade_out):
"""
Creates timer for loading and playing an entry. Existing times are
updated.
"""
play_timer = self.is_something_planned_at_time(entry.start_unix)
now_unix = self.get_virtual_now()
diff = entry.start_unix - now_unix
# Play function to be called by timer
def do_play(entry):
self.logger.info(SimpleUtil.cyan("=== play('%s') ===" % entry))
transition_type = TransitionType.INSTANT
if fade_in:
transition_type = TransitionType.FADE
if entry.status != EntryPlayState.READY:
self.logger.critical(SimpleUtil.red("PLAY: For some reason the entry is not yet ready or could not be loaded (Entry: %s)" % str(entry)))
# TODO Pro-active fallback handling here
self.soundsystem.play(entry, transition_type)
self.logger.info(self.get_ascii_programme())
if play_timer:
# Check if the Playlist IDs are different
if play_timer.entry.entry_id != entry.entry_id:
# If not, stop and remove the old timer, create a new one
self.stop_timer(play_timer)
else:
# If the playlists do not differ => reuse the old timer and do nothing
self.logger.info("Playlist Entry %s is already scheduled - no new timer created!" % entry)
return
# If nothing is planned at given time, create a new timer
(entry.switchtimer, entry.loadtimer) = self.create_timer(diff, do_play, [entry], switcher=True)
def preprocess_entries(self, entries, cut_oos):
"""