From 6c88c93fa0036cc4ab74e2039621f250a9a7cd72 Mon Sep 17 00:00:00 2001
From: David Trattnig <david.trattnig@o94.at>
Date: Tue, 8 Dec 2020 18:39:28 +0100
Subject: [PATCH] Processing of M3U Playlists via Tank. #53

---
 config/sample-development.engine.ini |  1 +
 config/sample-docker.engine.ini      |  1 +
 config/sample-production.engine.ini  |  1 +
 src/core/engine.py                   | 14 ++---
 src/core/events.py                   |  2 -
 src/core/resources.py                | 19 +++++--
 src/plugins/trackservice.py          |  2 +-
 src/scheduling/fallback.py           | 11 ++--
 src/scheduling/models.py             |  3 +-
 src/scheduling/programme.py          | 41 ++++++++-------
 src/scheduling/utils.py              | 78 +++++++++++++++++++++++++++-
 11 files changed, 132 insertions(+), 41 deletions(-)

diff --git a/config/sample-development.engine.ini b/config/sample-development.engine.ini
index 1c0a02b2..f200f8e2 100644
--- a/config/sample-development.engine.ini
+++ b/config/sample-development.engine.ini
@@ -134,6 +134,7 @@ socketdir="/home/username/code/aura/engine/src/liquidsoap"
 [audiosources]
 audio_source_folder="var/audio/source"
 audio_source_extension=".flac"
+audio_playlist_folder="var/audio/playlist"
 
 [fallback]
 # track_sensitive => fallback_folder track sensitivity
diff --git a/config/sample-docker.engine.ini b/config/sample-docker.engine.ini
index 778de9b2..0bdb1e37 100644
--- a/config/sample-docker.engine.ini
+++ b/config/sample-docker.engine.ini
@@ -133,6 +133,7 @@ socketdir="/srv/src/liquidsoap"
 [audiosources]
 audio_source_folder="var/audio/source"
 audio_source_extension=".flac"
+audio_playlist_folder="var/audio/playlist"
 
 [fallback]
 # track_sensitive => fallback_folder track sensitivity
diff --git a/config/sample-production.engine.ini b/config/sample-production.engine.ini
index b7e10ef9..1be47ff2 100644
--- a/config/sample-production.engine.ini
+++ b/config/sample-production.engine.ini
@@ -133,6 +133,7 @@ socketdir="/opt/aura/engine/src/liquidsoap"
 [audiosources]
 audio_source_folder="var/audio/source"
 audio_source_extension=".flac"
+audio_playlist_folder="var/audio/playlist"
 
 [fallback]
 # track_sensitive => fallback_folder track sensitivity
diff --git a/src/core/engine.py b/src/core/engine.py
index 1424897f..1d0d8e57 100644
--- a/src/core/engine.py
+++ b/src/core/engine.py
@@ -449,7 +449,7 @@ class Player:
 
         Args:
             channel (Channel): The stream channel
-            uri (String):      The stream URL
+            url (String):      The stream URL
 
         Returns:
             (Boolean):  `True` if successful
@@ -487,7 +487,7 @@ class Player:
 
         Args:
             channel (Channel): The stream channel
-            uri (String):      The stream URL
+            url (String):      The stream URL
 
         Returns:
             (Boolean):  `True` if successful
@@ -521,13 +521,13 @@ class Player:
     #
 
 
-    def queue_push(self, channel, uri):
+    def queue_push(self, channel, source):
         """
         Adds an filesystem URI to the given `ChannelType.QUEUE` channel.
 
         Args:
             channel (Channel): The channel to push the file to
-            uri (String):      The URI of the file
+            source (String):   The URI of the file
 
         Returns:
             (Boolean):  `True` if successful
@@ -535,12 +535,12 @@ class Player:
         if channel not in ChannelType.QUEUE.channels and \
             channel not in ChannelType.FALLBACK_QUEUE.channels:
                 raise InvalidChannelException
-        self.logger.info(SU.pink("queue.push('%s', '%s'" % (channel, uri)))
-
+      
         self.connector.enable_transaction()
         audio_store = self.config.get("audio_source_folder")
         extension = self.config.get("audio_source_extension")
-        filepath = ResourceUtil.uri_to_filepath(audio_store, uri, extension)
+        filepath = ResourceUtil.source_to_filepath(audio_store, source, extension)
+        self.logger.info(SU.pink(f"{channel}.queue_push('{filepath}')"))
         result = self.connector.send_lqc_command(channel, "queue_push", filepath)
         self.logger.info("%s.queue_push result: %s" % (channel, result))
         self.connector.disable_transaction()
diff --git a/src/core/events.py b/src/core/events.py
index 6f2e20bf..74aca24b 100644
--- a/src/core/events.py
+++ b/src/core/events.py
@@ -27,7 +27,6 @@ from src.plugins.mailer         import AuraMailer
 from src.plugins.monitor        import AuraMonitor
 from src.plugins.trackservice   import TrackServiceHandler
 from src.scheduling.fallback    import FallbackManager
-from src.scheduling.models      import DB
 
 
 class EventBinding():
@@ -304,7 +303,6 @@ class EngineEventDispatcher():
             self.logger.debug("on_fallback_active(..)")
             self.call_event("on_fallback_active", timeslot, fallback_type)
 
-        DB.session.expunge(timeslot)
         thread = Thread(target = func, args = (self, timeslot, fallback_type))
         thread.start() 
 
diff --git a/src/core/resources.py b/src/core/resources.py
index f4e9527a..3797f8d8 100644
--- a/src/core/resources.py
+++ b/src/core/resources.py
@@ -137,7 +137,7 @@ class ResourceUtil(Enum):
 
         for entry in entries:     
             if ResourceUtil.get_content_type(entry.source) == ResourceType.FILE:
-                path = ResourceUtil.uri_to_filepath(audio_store_path, entry.source, entry_extension)
+                path = ResourceUtil.source_to_filepath(audio_store_path, entry.source, entry_extension)
                 fb.append(f"#EXTINF:{entry.duration},{entry.meta_data.artist} - {entry.meta_data.title}")
                 fb.append(path)
 
@@ -146,19 +146,28 @@ class ResourceUtil(Enum):
 
 
     @staticmethod
-    def uri_to_filepath(base_dir, uri, source_extension):
+    def source_to_filepath(base_dir, source, source_extension):
         """
-        Converts a file-system URI to an actual, absolute path to the file.
+        Converts a file-system URI starting with "file://" to an actual, 
+        absolute path to the file, appending the extension as provided
+        in "source_extension".
+
+        If the path starts with an "/" it indicates that it's already an
+        absolute path including a valid extension.
 
         Args:
             basi_dir (String):          The location of the audio store.
-            uri (String):               The URI of the file
+            source (String):             The URI of the file
             source_extension (String):  The file extension of audio sources
 
         Returns:
             path (String):  Absolute file path
         """
-        return base_dir + "/" + uri[7:] + source_extension
+        path = source[7:]
+        if path.startswith("/"):
+            return path
+        else:            
+            return base_dir + "/" + path + source_extension
 
 
     @staticmethod
diff --git a/src/plugins/trackservice.py b/src/plugins/trackservice.py
index f91e3d00..cbf79b2d 100644
--- a/src/plugins/trackservice.py
+++ b/src/plugins/trackservice.py
@@ -409,7 +409,7 @@ class Playlog:
                 if entry.get_content_type() in ResourceClass.FILE.types:
                     base_dir = self.config.get("audio_source_folder")
                     extension = self.config.get("audio_source_extension")
-                    entry_source = ResourceUtil.uri_to_filepath(base_dir, entry.source, extension)
+                    entry_source = ResourceUtil.source_to_filepath(base_dir, entry.source, extension)
                 if entry_source == uri:
                     self.logger.info("Resolved '%s' entry '%s' for URI '%s'" % (entry.get_content_type(), entry, uri))
                     result = entry
diff --git a/src/scheduling/fallback.py b/src/scheduling/fallback.py
index 2f4e7c67..faf721f0 100644
--- a/src/scheduling/fallback.py
+++ b/src/scheduling/fallback.py
@@ -29,7 +29,7 @@ from src.base.utils         import SimpleUtil as SU
 from src.core.resources     import ResourceClass, ResourceUtil
 from src.core.channels      import Channel
 from src.core.control       import EngineExecutor
-
+from src.scheduling.models  import DB
 
 
 class FallbackType(Enum):
@@ -95,7 +95,7 @@ class FallbackManager:
     def on_timeslot_start(self, timeslot=None):
         """
         Some new timeslot has just started.
-        """        
+        """
         self.state["timeslot"] = timeslot
 
 
@@ -103,7 +103,7 @@ class FallbackManager:
         """
         The timeslot has ended and the state is updated. The method ensures that any intermediate state 
         update doesn't get overwritten.
-        """        
+        """
         if self.state["timeslot"] == timeslot:
             self.state["timeslot"] = None
 
@@ -139,7 +139,10 @@ class FallbackManager:
         if fallback_type is not FallbackType.NONE:
             # Only trigger the event the upon first state change
             if fallback_type != self.state.get("previous_fallback_type"):
-                self.engine.event_dispatcher.on_fallback_active(self.state["timeslot"], fallback_type)
+                timeslot = self.state["timeslot"]
+                if timeslot:
+                    DB.session.merge(timeslot)
+                self.engine.event_dispatcher.on_fallback_active(timeslot, fallback_type)
 
 
 
diff --git a/src/scheduling/models.py b/src/scheduling/models.py
index f06c3131..ae70bfb1 100644
--- a/src/scheduling/models.py
+++ b/src/scheduling/models.py
@@ -480,7 +480,6 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
 
     # Data
     entry_num = Column(Integer)
-    uri = Column(String(1024))
     duration = Column(BigInteger)
     volume = Column(Integer, ColumnDefault(100))
     source = Column(String(1024))
@@ -533,7 +532,7 @@ class PlaylistEntry(DB.Model, AuraDatabaseModel):
 
 
     def get_content_type(self):
-        return ResourceUtil.get_content_type(self.uri)
+        return ResourceUtil.get_content_type(self.source)
 
 
     def get_prev_entries(self):
diff --git a/src/scheduling/programme.py b/src/scheduling/programme.py
index 109900b5..51df9337 100644
--- a/src/scheduling/programme.py
+++ b/src/scheduling/programme.py
@@ -28,7 +28,7 @@ from datetime import datetime
 from src.base.config import AuraConfig
 from src.base.utils import SimpleUtil as SU
 from src.core.engine import Engine
-
+from src.scheduling.utils import M3UPlaylistProcessor
 from src.scheduling.models import Timeslot, Playlist, PlaylistEntry, PlaylistEntryMetaData
 from src.scheduling.api import ApiFetcher
 
@@ -63,7 +63,7 @@ class ProgrammeService():
         """
 
         # Fetch programme from API endpoints
-        self.logger.debug("Trying to fetch new programe from API endpoints...")
+        self.logger.debug("Trying to fetch new programme from API endpoints...")
 
         # Create a fetching thread and wait until it's done
         self.api_fetcher = ApiFetcher(self.config)
@@ -244,7 +244,7 @@ class ProgrammeStore():
     """
     config = None
     logger = None
-
+    m3u_processor = None
 
     def __init__(self):
         """
@@ -252,7 +252,7 @@ class ProgrammeStore():
         """
         self.config = AuraConfig.config()
         self.logger = logging.getLogger("AuraEngine")
-
+        self.m3u_processor = M3UPlaylistProcessor()
 
 
     def load_timeslots(self):
@@ -399,7 +399,7 @@ class ProgrammeStore():
         if not playlist_id or not fetched_playlist:
             self.logger.debug(f"Playlist ID#{playlist_id} is not available!")
             return
-            
+        
         playlist_db = Playlist.select_playlist_for_timeslot(timeslot_db.timeslot_start, playlist_id)
         havetoadd = False
 
@@ -431,11 +431,17 @@ class ProgrammeStore():
         """
         entry_num = 0
         time_marker = playlist_db.start_unix
+        entries = fetched_playlist["entries"]
+
+        # "Hidden Functionality" to feed engine with M3U playlists via Tank's "Stream" playlist entry type
+        # See https://gitlab.servus.at/aura/engine/-/issues/53
+        # In the future this is to be replaced by generic music pool feature.
+        entries = self.m3u_processor.spread(entries)
 
-        self.expand_entry_duration(timeslot_db, fetched_playlist)
-        self.delete_orphaned_entries(playlist_db, fetched_playlist)  
+        self.expand_entry_duration(timeslot_db, entries)        
+        self.delete_orphaned_entries(playlist_db, entries)  
 
-        for entry in fetched_playlist["entries"]:
+        for entry in entries:
             entry_db = PlaylistEntry.select_playlistentry_for_playlist(playlist_db.artificial_id, entry_num)
             havetoadd = False
             if not entry_db:
@@ -447,10 +453,7 @@ class ProgrammeStore():
             entry_db.entry_num = entry_num
             entry_db.duration = SU.nano_to_seconds(entry["duration"])
 
-            # FIXME Refactor mix of uri/filename/file/source
-
             if "uri" in entry:
-                entry_db.uri = entry["uri"]
                 entry_db.source = entry["uri"]
             if "filename" in entry:
                 entry_db.source = entry["filename"]
@@ -465,13 +468,13 @@ class ProgrammeStore():
 
 
 
-    def delete_orphaned_entries(self, playlist_db, fetched_playlist):
+    def delete_orphaned_entries(self, playlist_db, entries):
         """
         Deletes all playlist entries which are beyond the current playlist's `entry_count`.
         Such entries might be existing due to a remotely changed playlist, which now has
         less entries than before.
         """
-        new_last_idx = len(fetched_playlist["entries"])
+        new_last_idx = len(entries)
         existing_last_idx = PlaylistEntry.count_entries(playlist_db.artificial_id)-1
 
         if existing_last_idx < new_last_idx:
@@ -484,7 +487,7 @@ class ProgrammeStore():
 
 
 
-    def expand_entry_duration(self, timeslot_db, fetched_playlist):
+    def expand_entry_duration(self, timeslot_db, entries):
         """
         If some playlist entry doesn't have a duration assigned, its duration is expanded to the
         remaining duration of the playlist (= timeslot duration minus playlist entries with duration).
@@ -496,7 +499,7 @@ class ProgrammeStore():
         missing_duration = []
         idx = 0
 
-        for entry in fetched_playlist["entries"]:
+        for entry in entries:
             if not "duration" in entry:
                 missing_duration.append(idx)
             else:
@@ -504,15 +507,15 @@ class ProgrammeStore():
             idx += 1
                 
         if len(missing_duration) == 1:
-            fetched_playlist["entries"][missing_duration[0]]["duration"] = total_duration - actual_duration
-            self.logger.info("Expanded duration of playlist entry #%s:%s" % (fetched_playlist["id"], missing_duration[0]))
+            entries[missing_duration[0]]["duration"] = total_duration - actual_duration
+            self.logger.info(f"Expanded duration of playlist entry #{missing_duration[0]}")
 
         elif len(missing_duration) > 1:
             # This case should actually never happen, as TANK doesn't allow more than one entry w/o duration anymore
             for i in reversed(missing_duration[1:-1]):
                 self.logger.error(SU.red("Deleted Playlist Entry without duration: %s" % \
-                    str(fetched_playlist["entries"][i])))
-                del fetched_playlist["entries"][i]
+                    str(entries[i])))
+                del entries[i]
 
 
 
diff --git a/src/scheduling/utils.py b/src/scheduling/utils.py
index fbf55c9d..f4064420 100644
--- a/src/scheduling/utils.py
+++ b/src/scheduling/utils.py
@@ -24,7 +24,7 @@ from enum import Enum
 from datetime import datetime
 
 from src.base.utils import SimpleUtil as SU
-
+from src.base.config import AuraConfig
 
 
 
@@ -93,6 +93,82 @@ class TimeslotFilter():
 
 
 
+class M3UPlaylistProcessor():
+    """
+    Renders a M3U Playlist as a engine compatible playlist dictionary.
+    """
+    config = None
+    logging = None
+    playlist_folder = None
+
+
+    def __init__(self):
+        """
+        Constructor
+        """
+        self.config = AuraConfig.config()
+        self.logger = logging.getLogger("AuraEngine")
+        self.playlist_folder = self.config.get("audio_playlist_folder")
+        if not self.playlist_folder.endswith("/"):
+            self.playlist_folder += "/"
+
+
+
+    def spread(self, entries):
+        """
+        Converts a playlist with M3U entries and renders them as individual playlist entries.
+        """
+        if not self.playlist_folder:
+            return entries
+
+        result = []
+        m3u_entries = None
+
+        for entry in entries:
+            # It's a M3U Playlist which needs to be spread
+            if "uri" in entry and entry["uri"].startswith("playlist://"):
+                
+                playlist_name = entry["uri"].split("playlist://")[1]
+                self.logger.info(f"Spreading entries of M3U playlist '{playlist_name}'")
+                m3u_entries = self.read_m3u_file(self.playlist_folder + playlist_name)
+                result += m3u_entries
+            # It's an ordinary entry to be taken as it is
+            else:
+                result.append(entry)
+
+        return result
+
+
+
+    def read_m3u_file(self, source_file):
+        """
+        Read entries from an M3U file.
+        """
+        file = open(source_file, "r")
+        lines = file.readlines()
+        entries = []
+
+        for i in range(0, len(lines)):
+            if lines[i].startswith("#EXTINF:"):
+                metadata = lines[i].split("#EXTINF:")[1].split(",")
+                entry = {
+                    "file": {
+                        "metadata": {
+                            "artist": metadata[1].split(" - ")[0],
+                            "album": "",
+                            "title": metadata[1].split(" - ")[1]
+                        }
+                    },
+                    "duration": SU.seconds_to_nano(int(metadata[0])),
+                    "uri": "file://" + lines[i+1]
+                }
+                entries.append(entry)
+
+        file.close()
+        return entries
+
+
+
 class TimeslotRenderer:
     """
     Displays current and next timeslots in ASCII for maintainence and debugging.
-- 
GitLab