Commit 6c88c93f authored by David Trattnig's avatar David Trattnig
Browse files

Processing of M3U Playlists via Tank. #53

parent f866c0eb
......@@ -134,6 +134,7 @@ socketdir="/home/username/code/aura/engine/src/liquidsoap"
[audiosources]
audio_source_folder="var/audio/source"
audio_source_extension=".flac"
audio_playlist_folder="var/audio/playlist"
[fallback]
# track_sensitive => fallback_folder track sensitivity
......
......@@ -133,6 +133,7 @@ socketdir="/srv/src/liquidsoap"
[audiosources]
audio_source_folder="var/audio/source"
audio_source_extension=".flac"
audio_playlist_folder="var/audio/playlist"
[fallback]
# track_sensitive => fallback_folder track sensitivity
......
......@@ -133,6 +133,7 @@ socketdir="/opt/aura/engine/src/liquidsoap"
[audiosources]
audio_source_folder="var/audio/source"
audio_source_extension=".flac"
audio_playlist_folder="var/audio/playlist"
[fallback]
# track_sensitive => fallback_folder track sensitivity
......
......@@ -449,7 +449,7 @@ class Player:
Args:
channel (Channel): The stream channel
uri (String): The stream URL
url (String): The stream URL
Returns:
(Boolean): `True` if successful
......@@ -487,7 +487,7 @@ class Player:
Args:
channel (Channel): The stream channel
uri (String): The stream URL
url (String): The stream URL
Returns:
(Boolean): `True` if successful
......@@ -521,13 +521,13 @@ class Player:
#
def queue_push(self, channel, uri):
def queue_push(self, channel, source):
"""
Adds an filesystem URI to the given `ChannelType.QUEUE` channel.
Args:
channel (Channel): The channel to push the file to
uri (String): The URI of the file
source (String): The URI of the file
Returns:
(Boolean): `True` if successful
......@@ -535,12 +535,12 @@ class Player:
if channel not in ChannelType.QUEUE.channels and \
channel not in ChannelType.FALLBACK_QUEUE.channels:
raise InvalidChannelException
self.logger.info(SU.pink("queue.push('%s', '%s'" % (channel, uri)))
self.connector.enable_transaction()
audio_store = self.config.get("audio_source_folder")
extension = self.config.get("audio_source_extension")
filepath = ResourceUtil.uri_to_filepath(audio_store, uri, extension)
filepath = ResourceUtil.source_to_filepath(audio_store, source, extension)
self.logger.info(SU.pink(f"{channel}.queue_push('{filepath}')"))
result = self.connector.send_lqc_command(channel, "queue_push", filepath)
self.logger.info("%s.queue_push result: %s" % (channel, result))
self.connector.disable_transaction()
......
......@@ -27,7 +27,6 @@ from src.plugins.mailer import AuraMailer
from src.plugins.monitor import AuraMonitor
from src.plugins.trackservice import TrackServiceHandler
from src.scheduling.fallback import FallbackManager
from src.scheduling.models import DB
class EventBinding():
......@@ -304,7 +303,6 @@ class EngineEventDispatcher():
self.logger.debug("on_fallback_active(..)")
self.call_event("on_fallback_active", timeslot, fallback_type)
DB.session.expunge(timeslot)
thread = Thread(target = func, args = (self, timeslot, fallback_type))
thread.start()
......
......@@ -137,7 +137,7 @@ class ResourceUtil(Enum):
for entry in entries:
if ResourceUtil.get_content_type(entry.source) == ResourceType.FILE:
path = ResourceUtil.uri_to_filepath(audio_store_path, entry.source, entry_extension)
path = ResourceUtil.source_to_filepath(audio_store_path, entry.source, entry_extension)
fb.append(f"#EXTINF:{entry.duration},{entry.meta_data.artist} - {entry.meta_data.title}")
fb.append(path)
......@@ -146,19 +146,28 @@ class ResourceUtil(Enum):
@staticmethod
def uri_to_filepath(base_dir, uri, source_extension):
def source_to_filepath(base_dir, source, source_extension):
"""
Converts a file-system URI to an actual, absolute path to the file.
Converts a file-system URI starting with "file://" to an actual,
absolute path to the file, appending the extension as provided
in "source_extension".
If the path starts with an "/" it indicates that it's already an
absolute path including a valid extension.
Args:
basi_dir (String): The location of the audio store.
uri (String): The URI of the file
source (String): The URI of the file
source_extension (String): The file extension of audio sources
Returns:
path (String): Absolute file path
"""
return base_dir + "/" + uri[7:] + source_extension
path = source[7:]
if path.startswith("/"):
return path
else:
return base_dir + "/" + path + source_extension
@staticmethod
......
......@@ -409,7 +409,7 @@ class Playlog:
if entry.get_content_type() in ResourceClass.FILE.types:
base_dir = self.config.get("audio_source_folder")
extension = self.config.get("audio_source_extension")
entry_source = ResourceUtil.uri_to_filepath(base_dir, entry.source, extension)
entry_source = ResourceUtil.source_to_filepath(base_dir, entry.source, extension)
if entry_source == uri:
self.logger.info("Resolved '%s' entry '%s' for URI '%s'" % (entry.get_content_type(), entry, uri))
result = entry
......
......@@ -29,7 +29,7 @@ from src.base.utils import SimpleUtil as SU
from src.core.resources import ResourceClass, ResourceUtil
from src.core.channels import Channel
from src.core.control import EngineExecutor
from src.scheduling.models import DB
class FallbackType(Enum):
......@@ -95,7 +95,7 @@ class FallbackManager:
def on_timeslot_start(self, timeslot=None):
"""
Some new timeslot has just started.
"""
"""
self.state["timeslot"] = timeslot
......@@ -103,7 +103,7 @@ class FallbackManager:
"""
The timeslot has ended and the state is updated. The method ensures that any intermediate state
update doesn't get overwritten.
"""
"""
if self.state["timeslot"] == timeslot:
self.state["timeslot"] = None
......@@ -139,7 +139,10 @@ class FallbackManager:
if fallback_type is not FallbackType.NONE:
# Only trigger the event the upon first state change
if fallback_type != self.state.get("previous_fallback_type"):
self.engine.event_dispatcher.on_fallback_active(self.state["timeslot"], fallback_type)
timeslot = self.state["timeslot"]
if timeslot:
DB.session.merge(timeslot)
self.engine.event_dispatcher.on_fallback_active(timeslot, fallback_type)
......
......@@ -480,7 +480,6 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
# Data
entry_num = Column(Integer)
uri = Column(String(1024))
duration = Column(BigInteger)
volume = Column(Integer, ColumnDefault(100))
source = Column(String(1024))
......@@ -533,7 +532,7 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
def get_content_type(self):
return ResourceUtil.get_content_type(self.uri)
return ResourceUtil.get_content_type(self.source)
def get_prev_entries(self):
......
......@@ -28,7 +28,7 @@ from datetime import datetime
from src.base.config import AuraConfig
from src.base.utils import SimpleUtil as SU
from src.core.engine import Engine
from src.scheduling.utils import M3UPlaylistProcessor
from src.scheduling.models import Timeslot, Playlist, PlaylistEntry, PlaylistEntryMetaData
from src.scheduling.api import ApiFetcher
......@@ -63,7 +63,7 @@ class ProgrammeService():
"""
# Fetch programme from API endpoints
self.logger.debug("Trying to fetch new programe from API endpoints...")
self.logger.debug("Trying to fetch new programme from API endpoints...")
# Create a fetching thread and wait until it's done
self.api_fetcher = ApiFetcher(self.config)
......@@ -244,7 +244,7 @@ class ProgrammeStore():
"""
config = None
logger = None
m3u_processor = None
def __init__(self):
"""
......@@ -252,7 +252,7 @@ class ProgrammeStore():
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.m3u_processor = M3UPlaylistProcessor()
def load_timeslots(self):
......@@ -399,7 +399,7 @@ class ProgrammeStore():
if not playlist_id or not fetched_playlist:
self.logger.debug(f"Playlist ID#{playlist_id} is not available!")
return
playlist_db = Playlist.select_playlist_for_timeslot(timeslot_db.timeslot_start, playlist_id)
havetoadd = False
......@@ -431,11 +431,17 @@ class ProgrammeStore():
"""
entry_num = 0
time_marker = playlist_db.start_unix
entries = fetched_playlist["entries"]
# "Hidden Functionality" to feed engine with M3U playlists via Tank's "Stream" playlist entry type
# See https://gitlab.servus.at/aura/engine/-/issues/53
# In the future this is to be replaced by generic music pool feature.
entries = self.m3u_processor.spread(entries)
self.expand_entry_duration(timeslot_db, fetched_playlist)
self.delete_orphaned_entries(playlist_db, fetched_playlist)
self.expand_entry_duration(timeslot_db, entries)
self.delete_orphaned_entries(playlist_db, entries)
for entry in fetched_playlist["entries"]:
for entry in entries:
entry_db = PlaylistEntry.select_playlistentry_for_playlist(playlist_db.artificial_id, entry_num)
havetoadd = False
if not entry_db:
......@@ -447,10 +453,7 @@ class ProgrammeStore():
entry_db.entry_num = entry_num
entry_db.duration = SU.nano_to_seconds(entry["duration"])
# FIXME Refactor mix of uri/filename/file/source
if "uri" in entry:
entry_db.uri = entry["uri"]
entry_db.source = entry["uri"]
if "filename" in entry:
entry_db.source = entry["filename"]
......@@ -465,13 +468,13 @@ class ProgrammeStore():
def delete_orphaned_entries(self, playlist_db, fetched_playlist):
def delete_orphaned_entries(self, playlist_db, entries):
"""
Deletes all playlist entries which are beyond the current playlist's `entry_count`.
Such entries might be existing due to a remotely changed playlist, which now has
less entries than before.
"""
new_last_idx = len(fetched_playlist["entries"])
new_last_idx = len(entries)
existing_last_idx = PlaylistEntry.count_entries(playlist_db.artificial_id)-1
if existing_last_idx < new_last_idx:
......@@ -484,7 +487,7 @@ class ProgrammeStore():
def expand_entry_duration(self, timeslot_db, fetched_playlist):
def expand_entry_duration(self, timeslot_db, entries):
"""
If some playlist entry doesn't have a duration assigned, its duration is expanded to the
remaining duration of the playlist (= timeslot duration minus playlist entries with duration).
......@@ -496,7 +499,7 @@ class ProgrammeStore():
missing_duration = []
idx = 0
for entry in fetched_playlist["entries"]:
for entry in entries:
if not "duration" in entry:
missing_duration.append(idx)
else:
......@@ -504,15 +507,15 @@ class ProgrammeStore():
idx += 1
if len(missing_duration) == 1:
fetched_playlist["entries"][missing_duration[0]]["duration"] = total_duration - actual_duration
self.logger.info("Expanded duration of playlist entry #%s:%s" % (fetched_playlist["id"], missing_duration[0]))
entries[missing_duration[0]]["duration"] = total_duration - actual_duration
self.logger.info(f"Expanded duration of playlist entry #{missing_duration[0]}")
elif len(missing_duration) > 1:
# This case should actually never happen, as TANK doesn't allow more than one entry w/o duration anymore
for i in reversed(missing_duration[1:-1]):
self.logger.error(SU.red("Deleted Playlist Entry without duration: %s" % \
str(fetched_playlist["entries"][i])))
del fetched_playlist["entries"][i]
str(entries[i])))
del entries[i]
......
......@@ -24,7 +24,7 @@ from enum import Enum
from datetime import datetime
from src.base.utils import SimpleUtil as SU
from src.base.config import AuraConfig
......@@ -93,6 +93,82 @@ class TimeslotFilter():
class M3UPlaylistProcessor():
"""
Renders a M3U Playlist as a engine compatible playlist dictionary.
"""
config = None
logging = None
playlist_folder = None
def __init__(self):
"""
Constructor
"""
self.config = AuraConfig.config()
self.logger = logging.getLogger("AuraEngine")
self.playlist_folder = self.config.get("audio_playlist_folder")
if not self.playlist_folder.endswith("/"):
self.playlist_folder += "/"
def spread(self, entries):
"""
Converts a playlist with M3U entries and renders them as individual playlist entries.
"""
if not self.playlist_folder:
return entries
result = []
m3u_entries = None
for entry in entries:
# It's a M3U Playlist which needs to be spread
if "uri" in entry and entry["uri"].startswith("playlist://"):
playlist_name = entry["uri"].split("playlist://")[1]
self.logger.info(f"Spreading entries of M3U playlist '{playlist_name}'")
m3u_entries = self.read_m3u_file(self.playlist_folder + playlist_name)
result += m3u_entries
# It's an ordinary entry to be taken as it is
else:
result.append(entry)
return result
def read_m3u_file(self, source_file):
"""
Read entries from an M3U file.
"""
file = open(source_file, "r")
lines = file.readlines()
entries = []
for i in range(0, len(lines)):
if lines[i].startswith("#EXTINF:"):
metadata = lines[i].split("#EXTINF:")[1].split(",")
entry = {
"file": {
"metadata": {
"artist": metadata[1].split(" - ")[0],
"album": "",
"title": metadata[1].split(" - ")[1]
}
},
"duration": SU.seconds_to_nano(int(metadata[0])),
"uri": "file://" + lines[i+1]
}
entries.append(entry)
file.close()
return entries
class TimeslotRenderer:
"""
Displays current and next timeslots in ASCII for maintainence and debugging.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment