Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • aura/engine
  • hermannschwaerzler/engine
  • sumpfralle/aura-engine
3 results
Show changes
Showing
with 2364 additions and 0 deletions
; supervisor config file
[unix_http_server]
file=/opt/aura/engine/tmp/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
chown=engineuser:engineuser
[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/opt/aura/engine/tmp/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///opt/aura/engine/tmp/supervisor.sock ; use a unix:// URL for a unix socket
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.
[include]
; files = /etc/supervisor/conf.d/*.conf
files = /opt/aura/engine/config/supervisor/*.conf
import os
import sys
PROJECT_PATH = os.getcwd()
SOURCE_PATH = os.path.join(PROJECT_PATH, "src")
sys.path.append(SOURCE_PATH)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from enum import StrEnum, auto
from time import sleep
from urllib.parse import urlparse
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import synchronized
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.core.channels import ChannelName, PlayoutStatusResponse
from aura_engine.core.client import CoreClient
from aura_engine.core.mixer import Mixer
from aura_engine.events import EngineEventDispatcher
# enums reflecting namespace, action and reponses
class Namespace(StrEnum):
MIXER = auto()
CHANNEL = auto()
@classmethod
def _missing_(cls, value):
# map 'mixer' to .MIXER and values of ChannelName to .CHANNEL
if isinstance(value, Namespace):
return cls(value)
elif value in ChannelName._value2member_map_.values():
return cls.CHANNEL
else:
raise TypeError
class MixerAction(StrEnum):
INPUTS = auto()
OUTPUTS = auto()
STATUS = auto()
VOLUME = auto()
SELECT = auto()
ACTIVATE = auto()
class ChannelAction(StrEnum):
PUSH = auto()
LOAD = auto()
URL = auto()
STATUS = auto()
START = auto()
STOP = auto()
CLEAR = auto()
ROLL = auto()
SET_TRACK_METADATA = auto()
class CoreClientMock(CoreClient):
"""
Suppress unwanted behavior by subclassing and overriding necessary functions.
"""
instance = None
connection = None
event_dispatcher = None
conn = None
stream_url = None
stream_status = PlayoutStatusResponse.STREAM_STATUS_CONNECTED
resource_id = -1
volumes = [0, 0, 0, 0, 0, 0]
selections = [0, 0, 0, 0, 0, 0]
def __init__(self, event_dispatcher: EngineEventDispatcher):
"""
Initialize the client.
"""
self.config = AuraConfig.instance.config
self.event_dispatcher = event_dispatcher
@staticmethod
def get_instance(event_dispatcher: EngineEventDispatcher):
"""
Get an instance of the client singleton.
"""
if not CoreClientMock.instance:
CoreClientMock.instance = CoreClientMock(event_dispatcher)
return CoreClientMock.instance
def _validate_url(self, url: str) -> bool:
"""
Check url validity.
"""
parse = urlparse(url)
return all([parse.scheme, parse.netloc])
def _get_resource_id(self) -> int:
"""
Increment and return index starting with 0.
"""
self.resource_id += 1
return self.resource_id
@synchronized
def connect(self):
sleep(0.5)
pass
@synchronized
def disconnect(self):
sleep(0.5)
pass
@synchronized
def exec(self, namespace: str, action: str, args: str = "") -> str:
log = print
# log(f"Core mock request 'ns: {namespace}', action: '{action}', args: '{args}'")
response: str = None
ns = Namespace(namespace)
if ns is None:
raise TypeError
match ns:
case Namespace.MIXER:
act = MixerAction(action)
if act is None:
raise TypeError
match act:
case MixerAction.INPUTS:
response = f"{ChannelName.QUEUE_A} {ChannelName.QUEUE_B} {ChannelName.HTTP_A} {ChannelName.HTTP_B} {ChannelName.LIVE_0} {ChannelName.LIVE_1}"
case MixerAction.OUTPUTS:
response = '["aura_engine_line_out_0", "out_http_0"]' # no abstraction yet
case MixerAction.STATUS:
chn = int(args)
vol = self.volumes[chn]
response = (
f"ready=true selected=true single=false volume={vol}% remaining=0"
)
case MixerAction.VOLUME:
argv = args.split(" ")
chn = int(argv[0])
vol = int(float(argv[1]) * 100.0)
self.volumes[chn] = vol
response = f"volume={vol}% remaining=0"
case MixerAction.SELECT:
argv = args.split(" ")
chn = int(argv[0])
vol = self.volumes[chn]
sel = 1 if argv[1] == "true" else 0
self.selections[chn] = sel
response = f"volume={vol}% remaining=0 selected={sel}"
case MixerAction.ACTIVATE:
response = PlayoutStatusResponse.SUCCESS
case Namespace.CHANNEL:
act = ChannelAction(action)
if act is None:
raise TypeError
match act:
case ChannelAction.PUSH:
sleep(5)
resid = self._get_resource_id()
response = str(resid)
case ChannelAction.LOAD:
sleep(3)
resid = self._get_resource_id()
response = str(resid)
case ChannelAction.URL:
sleep(0.5)
self.stream_url = args
response = PlayoutStatusResponse.SUCCESS
case ChannelAction.STATUS:
if self._validate_url(self.stream_url):
response = self.stream_status
if response == PlayoutStatusResponse.STREAM_STATUS_CONNECTED:
response += " " + self.stream_url
else:
# simulate polling state by supplying invalid url
response = PlayoutStatusResponse.STREAM_STATUS_POLLING
case ChannelAction.START:
sleep(1)
response = PlayoutStatusResponse.SUCCESS
case ChannelAction.STOP:
response = PlayoutStatusResponse.SUCCESS
case ChannelAction.CLEAR:
response = PlayoutStatusResponse.SUCCESS
case ChannelAction.ROLL:
response = PlayoutStatusResponse.SUCCESS
case ChannelAction.SET_TRACK_METADATA:
response = PlayoutStatusResponse.SUCCESS
if response is not None:
# log(f"Core mock reponse '{response}'")
return response
raise Exception(f"Unhandled namespace: '{namespace}', action: '{action}', args: '{args}'")
class PlayoutClientMock(CoreClientMock):
"""
Mock PlayoutClient by subclssing and skipping unwanted.
"""
mixer: Mixer = None
def __init__(self, event_dispatcher: EngineEventDispatcher):
super().__init__(event_dispatcher)
self.mixer = Mixer("mixer", self)
@staticmethod
def get_instance(event_dispatcher: EngineEventDispatcher) -> CoreClientMock:
if not PlayoutClientMock.instance:
PlayoutClientMock.instance = PlayoutClientMock(event_dispatcher)
return PlayoutClientMock.instance
def get_mixer(self):
return self.mixer
[
{
"id": "fbea8c8f-27fb-4a1b-8fbf-4c4b5f5f62b2",
"start": "2024-07-25T18:00:10",
"end": "2024-07-25T19:00:00",
"timeslotId": 1,
"playlistId": 1,
"showId": 1,
"timeslot": {
"memo": "",
"playlistId": 1,
"repetitionOfId": 18,
"end": "2024-07-25T19:00:00",
"id": 1,
"noteId": 4425,
"scheduleId": 15684,
"showId": 1,
"start": "2024-07-25T18:00:10"
},
"show": {
"id": 1,
"name": "Show 1",
"defaultPlaylistId": 1
},
"episode": {
"id": 4425,
"title": "Episode 1"
},
"schedule": {
"id": 9,
"defaultPlaylistId": 1
}
},
{
"id": "a6f3f57d-6830-415f-bf15-9fc715cab60d",
"start": "2024-07-25T19:00:10",
"end": "2024-07-25T20:00:00",
"timeslotId": 2,
"playlistId": 2,
"showId": 2,
"timeslot": {
"memo": "",
"playlistId": 2,
"repetitionOfId": null,
"end": "2024-07-25T20:00:00",
"id": 2,
"noteId": 6,
"scheduleId": 21,
"showId": 3,
"start": "2024-07-25T19:00:10"
},
"show": {
"id": 2,
"name": "Show 2",
"defaultPlaylistId": 1
},
"episode": {
"id": 6,
"title": "Episode 6"
},
"schedule": {
"id": 21,
"defaultPlaylistId": 1
}
},
{
"id": "98a7464d-4eec-4600-a1f4-25cdad30433e",
"start": "2024-07-25T20:00:10",
"end": "2024-07-25T22:00:00",
"timeslotId": 3,
"playlistId": 3,
"showId": 3,
"timeslot": {
"memo": "",
"playlistId": 3,
"repetitionOfId": null,
"end": "2024-07-25T22:00:00",
"id": 3,
"noteId": 39,
"scheduleId": 19,
"showId": 3,
"start": "2024-07-25T20:00:10"
},
"show": {
"id": 3,
"name": "Show 3",
"defaultPlaylistId": null
},
"episode": {
"id": 39,
"title": "Episode 39"
},
"schedule": {
"id": 19,
"defaultPlaylistId": null
}
},
{
"id": "f69169dc-2e04-4504-bf9e-81db62c24d1c",
"start": "2024-07-25T22:00:10",
"end": "2024-07-25T22:30:00",
"timeslotId": null,
"playlistId": null,
"showId": 4,
"timeslot": null,
"show": {
"id": 4,
"name": "Show 4 - Fallback show with virtual timeslot",
"defaultPlaylistId": 3
},
"episode": null,
"schedule": null
},
{
"id": "cfe03dba-599d-4f6c-b87f-4d13b5be3cb7",
"start": "2024-07-25T22:30:00",
"end": "2024-07-25T23:59:59",
"timeslotId": 5,
"playlistId": 3,
"showId": 4,
"timeslot": {
"memo": "",
"playlistId": 3,
"repetitionOfId": null,
"end": "2024-07-25T22:30:00",
"id": 5,
"noteId": 3,
"scheduleId": 19,
"showId": 3,
"start": "2024-07-25T22:00:10"
},
"show": {
"id": 4,
"name": "Show 4 - Fallback Show",
"defaultPlaylistId": null
},
"episode": {
"id": 3,
"title": "Episode 3 - Virtual Timeslot Override"
},
"schedule": {
"id": 20,
"defaultPlaylistId": null
}
}
]
{
"id": 1,
"created": "2023-02-28T15:25:38.684803+01:00",
"updated": "2023-02-28T15:25:38.684803+01:00",
"description": "test",
"playoutMode": "linear",
"showName": "musikprogramm",
"entries": [
{
"uri": "file://musikprogramm/2",
"duration": 199.04,
"file": {
"id": 2,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "upload://some-audio-file.flac",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
},
"metadata": {
"artist": "Test Artist",
"title": "Test Track Title",
"album": "Test Album"
},
"size": 36496517,
"duration": 199.94
}
}
]
}
{
"id": 111,
"created": "2023-02-28T15:25:38.684803+01:00",
"updated": "2023-02-28T15:25:38.684803+01:00",
"description": "playlist 111",
"playoutMode": "shuffle",
"showName": "musikprogramm",
"entries": [
{
"uri": "file://musikprogramm/303",
"duration": 199,
"file": {
"id": 2,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "upload://303.flac",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
},
"metadata": {},
"size": 36496517,
"duration": 199
}
},
{
"uri": "file://musikprogramm/808",
"duration": 299,
"file": {
"id": 2,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "upload://808.flac",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
},
"metadata": {
"artist": "Peaches",
"title": "Rock Show",
"album": "The Teaches of Peaches"
},
"size": 66496517,
"duration": 299
}
},
{
"uri": "m3u://sample.m3u",
"file": {
"id": 2,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "m3u://sample.m3u",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
}
}
}
]
}
\ No newline at end of file
{
"id": 1,
"created": "2023-02-28T15:25:38.684803+01:00",
"updated": "2023-02-28T15:25:38.684803+01:00",
"description": "test",
"playoutMode": "linear",
"showName": "musikprogramm",
"entries": [
{
"uri": "file://musikprogramm/3",
"duration": 180,
"file": {
"id": 2,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "upload://some-audio-file.flac",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
},
"metadata": {
"artist": "Test Artist 1",
"title": "Test Track Title 1",
"album": "Test Album 1"
},
"size": 36496517,
"duration": 180
}
},
{
"uri": "file://musikprogramm/4",
"file": {
"id": 3,
"created": "2023-02-28T14:42:09.540485+01:00",
"updated": "2023-02-28T14:42:17.564099+01:00",
"showName": "musikprogramm",
"source": {
"uri": "upload://some-audio-file.flac",
"hash": "sha256:b4e1922bad633ff0e11f55611f04cb3807d15d70bb09969d2b324373af47b574",
"import": {
"state": "done"
}
},
"metadata": {
"artist": "Test Artist 2",
"title": "Test Track Title 2",
"album": "Test Album 2"
},
"size": 364.9
}
}
]
}
#EXTM3U
#EXTINF:123,Alle - Unser Lied
/media/music-lib/Unser Album/Unser Lied.flac
#EXTINF:321,Alle - Dein Lied
music-lib/Unser Album/Dein Lied.ogg
#EXTINF:231,Alle - Liedlos
../music-lib/Alle/Unser Album/Liedlos.m4a
#EXTINF:213,Alle - Euer Lied
http://www.example.org/music/Alle-Unser_Album-Euer_Lied.mp3
\ No newline at end of file
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
import time
import unittest
from aura_engine.base.lang import DotDict, private, synchronized
class TestSynchronizedAnnotation(unittest.TestCase):
"""
Testing the @synchronized annotation.
"""
value = 0
@synchronized
def increment(self):
value = self.value
value += 1
time.sleep(0.00001)
self.value = value
def worker(self):
for _ in range(1000):
self.increment()
def test_synchronized(self):
threads = []
for _ in range(10):
thread = threading.Thread(target=self.worker)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
self.assertEqual(self.value, 10000)
class TestPrivateAnnotation(unittest.TestCase):
"""
Testing the @private annotation.
"""
def test_private(self):
class Container:
@private
def priv(self, flag: bool) -> bool:
return flag
def pub(self, flag: bool) -> bool:
return self.priv(flag)
cont = Container()
self.assertTrue(cont.pub(True))
self.assertFalse(cont.pub(False))
with self.assertRaises(Exception):
cont.priv(True)
class TestDotDict(unittest.TestCase):
"""
Testing the DotDict dictionary wrapper.
"""
def test_dotdict_key_val(self):
print(self._testMethodName)
str_val = "Hello"
int_val = 8
flt_val = 1.0 / 12.0
bln_val = True
lst_val = ["VCO", "LFO", "ADSR", "VCF", "VCA"]
dct_val = {10: "Waveform", 11: "Frequency", 20: "Cutoff", 21: "Res"}
dotdict = DotDict()
dotdict.str_key = str_val
dotdict.int_key = int_val
dotdict.flt_key = flt_val
dotdict.bln_key = bln_val
dotdict.lst_key = lst_val
dotdict.dct_key = dct_val
self.assertEqual(dotdict.str_key, str_val)
self.assertEqual(dotdict.int_key, int_val)
self.assertEqual(dotdict.flt_key, flt_val)
self.assertEqual(dotdict.bln_key, bln_val)
self.assertEqual(dotdict.lst_key, lst_val)
self.assertEqual(dotdict.dct_key, dct_val)
self.assertEqual(dotdict["str_key"], str_val)
self.assertEqual(dotdict["int_key"], int_val)
self.assertEqual(dotdict["flt_key"], flt_val)
self.assertEqual(dotdict["bln_key"], bln_val)
self.assertEqual(dotdict["lst_key"], lst_val)
self.assertEqual(dotdict["dct_key"], dct_val)
dotdict["str_key"] = None
self.assertIsNone(dotdict.str_key)
self.assertIsNone(dotdict.nil)
def test_dotdict_init(self):
print(self._testMethodName)
str_val = "Hello"
int_val = 8
flt_val = 1.0 / 12.0
bln_val = True
lst_val = ["VCO", "LFO", "ADSR", "VCF", "VCA"]
dct_val = {10: "Waveform", 11: "Frequency", 20: "Cutoff", 21: "Res"}
rawdict = {
"str_key": str_val,
"int_key": int_val,
"flt_key": flt_val,
"bln_key": bln_val,
"lst_key": lst_val,
"dct_key": dct_val,
}
dotdict = DotDict(rawdict)
self.assertEqual(dotdict.str_key, str_val)
self.assertEqual(dotdict.int_key, int_val)
self.assertEqual(dotdict.flt_key, flt_val)
self.assertEqual(dotdict.bln_key, bln_val)
self.assertEqual(dotdict.lst_key, lst_val)
self.assertEqual(dotdict.dct_key, dct_val)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
from aura_engine.core.channels import (
ChannelFactory,
ChannelName,
ChannelType,
LineChannel,
PlayoutStatusResponse,
QueueChannel,
StreamChannel,
)
from aura_engine.core.mixer import Mixer
from aura_engine.events import EngineEventDispatcher
from tests.core_client_mock import PlayoutClientMock
class TestChannelFactory(unittest.TestCase):
"""
Testing the ChannelFactory.
"""
mixer: Mixer = None
factory: ChannelFactory = None
def setUp(self):
config = AuraConfig.instance.config
AuraLogger(config)
dispatcher = EngineEventDispatcher(engine=None)
client = PlayoutClientMock(event_dispatcher=dispatcher)
self.mixer = client.mixer
self.factory = ChannelFactory(self.mixer)
def create_channels(self):
queue = self.factory.create_channel(0, ChannelName.QUEUE_A, self.mixer)
stream = self.factory.create_channel(0, ChannelName.HTTP_A, self.mixer)
live = self.factory.create_channel(0, ChannelName.LIVE_0, self.mixer)
return (queue, stream, live)
def test_create_channel(self):
print(self._testMethodName)
queue, stream, live = self.create_channels()
self.assertEqual(type(queue), QueueChannel)
self.assertEqual(type(stream), StreamChannel)
self.assertEqual(type(live), LineChannel)
def test_get_type(self):
print(self._testMethodName)
queue, stream, live = self.create_channels()
self.assertEqual(queue.get_type(), ChannelType.QUEUE)
self.assertEqual(stream.get_type(), ChannelType.HTTP)
self.assertEqual(live.get_type(), ChannelType.LIVE)
self.assertEqual(queue.get_type().numeric, ChannelType.QUEUE.numeric)
self.assertEqual(stream.get_type().numeric, ChannelType.HTTP.numeric)
self.assertEqual(live.get_type().numeric, ChannelType.LIVE.numeric)
def test_get_index(self):
print(self._testMethodName)
queue = self.factory.create_channel(0, ChannelName.QUEUE_A, self.mixer)
stream = self.factory.create_channel(1, ChannelName.HTTP_A, self.mixer)
live = self.factory.create_channel(2, ChannelName.LIVE_0, self.mixer)
self.assertEqual(queue.get_index(), 0)
self.assertEqual(stream.get_index(), 1)
self.assertEqual(live.get_index(), 2)
def test_load(self):
print(self._testMethodName)
queue, stream, live = self.create_channels()
queue.load("./file.flac")
stream.load("http://stream.local")
live.load()
def test_load_metadata(self):
print(self._testMethodName)
queue, stream, live = self.create_channels()
metadata = {"title": "A Title", "artist": "An Artist", "album": "An Album"}
queue.load("./file.flac", metadata=metadata)
stream.load("http://stream.local", metadata=metadata)
live.load(metadata=metadata)
def test_fade_in_out(self):
print(self._testMethodName)
queue, stream, live = self.create_channels()
queue.fade_out()
stream.fade_out()
live.fade_out()
queue.fade_in(volume=100, instant=True)
stream.fade_in(volume=100)
queue.fade_out()
live.fade_in(volume=100)
stream.fade_out()
live.fade_out(instant=True)
def test_roll(self):
print(self._testMethodName)
queue = self.factory.create_channel(0, ChannelName.QUEUE_A, self.mixer)
res = queue.roll(300)
self.assertTrue(res)
class TestPlayoutStatusResponse(unittest.TestCase):
"""
Testing the PlayoutStatusResponse.
"""
def test_enum(self):
print(self._testMethodName)
self.assertIs(PlayoutStatusResponse("OK"), PlayoutStatusResponse.SUCCESS)
self.assertIs(PlayoutStatusResponse("Done"), PlayoutStatusResponse.SUCCESS)
self.assertIs(PlayoutStatusResponse("Done!"), PlayoutStatusResponse.SUCCESS)
self.assertIs(PlayoutStatusResponse("Donee!"), PlayoutStatusResponse.SUCCESS)
self.assertIs(
PlayoutStatusResponse("polling"), PlayoutStatusResponse.STREAM_STATUS_POLLING
)
self.assertIs(
PlayoutStatusResponse("stopped"), PlayoutStatusResponse.STREAM_STATUS_STOPPED
)
self.assertIs(
PlayoutStatusResponse("connected <additional data>"),
PlayoutStatusResponse.STREAM_STATUS_CONNECTED,
)
self.assertIs(PlayoutStatusResponse("mkay"), PlayoutStatusResponse.INVALID)
self.assertIs(PlayoutStatusResponse(""), PlayoutStatusResponse.INVALID)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
from aura_engine.core.channels import (
ChannelName,
ChannelType,
LineChannel,
QueueChannel,
StreamChannel,
)
from aura_engine.core.mixer import Mixer
from aura_engine.events import EngineEventDispatcher
from tests.core_client_mock import PlayoutClientMock
class TestMixer(unittest.TestCase):
"""
Testing the Mixer.
Some mixer functions are already covered by TestChannelFactory.
See test_core_channels.py for details.
"""
mixer: Mixer = None
def setUp(self):
config = AuraConfig.instance.config
AuraLogger(config)
dispatcher = EngineEventDispatcher(engine=None)
client = PlayoutClientMock(event_dispatcher=dispatcher)
self.mixer = client.mixer
def test_get_inputs(self):
print(self._testMethodName)
inputs = self.mixer.get_inputs()
input_state = {
"ready": True,
"selected": True,
"single": True,
"volume": 0,
"remaining": 0.0,
}
self.assertEqual(len(inputs), 6)
self.assertEqual(inputs["in_queue_0"], input_state)
self.assertEqual(inputs["in_queue_1"], input_state)
self.assertEqual(inputs["in_stream_0"], input_state)
self.assertEqual(inputs["in_stream_1"], input_state)
self.assertEqual(inputs["aura_engine_line_in_0"], input_state)
self.assertEqual(inputs["aura_engine_line_in_1"], input_state)
def test_get_outputs(self):
print(self._testMethodName)
outputs = self.mixer.get_outputs()
self.assertEqual(len(outputs), 2)
self.assertEqual(outputs[0], "aura_engine_line_out_0")
self.assertEqual(outputs[1], "out_http_0")
def test_get_free_channel(self):
print(self._testMethodName)
free = self.mixer.get_free_channel(channel_type=ChannelType.LIVE)
self.assertTrue(isinstance(free, LineChannel))
def test_get_set_active_channel(self):
active = self.mixer.get_active_channel()
self.assertIsNone(active)
queue_channel = self.mixer.get_channel(channel_name=ChannelName.QUEUE_A)
self.mixer.set_active_channel(channel=queue_channel)
active = self.mixer.get_active_channel()
self.assertTrue(isinstance(active, QueueChannel))
stream_channel = self.mixer.get_channel(channel_name=ChannelName.HTTP_A)
self.mixer.set_active_channel(channel=stream_channel)
active = self.mixer.get_active_channel()
self.assertTrue(isinstance(active, StreamChannel))
line_channel = self.mixer.get_channel(channel_name=ChannelName.LIVE_1)
self.mixer.set_active_channel(channel=line_channel)
active = self.mixer.get_active_channel()
self.assertTrue(isinstance(active, LineChannel))
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
# from aura_engine.app import EngineRunner
from aura_engine.base.config import AuraConfig
# from unittest import mock
class TestApp(unittest.TestCase):
"""
Testing the Configuration.
"""
config = None
#
# Mock
#
def mocked_engine_start(*args, **kwargs):
print(args, kwargs)
print("Mocked start finished")
#
# Setup
#
def setUp(self):
self.config = AuraConfig.instance.config
#
# Test Cases
#
# FIXME For some unknown reason, this Mock leaks into other test cases
# When this test case is enabled, these tests fail:
# - tests.test_engine_executor - test_parent_child_executors_in_order
# - tests.test_engine_executor - test_parent_child_executors_with_child_before
# - tests.test_engine_executor - test_timer_store_replacement_after_parent_execution
#
# @mock.patch("aura_engine.engine.Engine.start", side_effect=mocked_engine_start)
# def test_run_app(self, engine_start_mock):
# runner = EngineRunner()
# self.assertIsNotNone(runner)
# runner.run()
# try:
# runner.exit_gracefully(signum=1, frame=1)
# except:
# pass
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.lang import DotDict
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.plugins.clock import ClockInfoHandler
from aura_engine.scheduling.domain import Playlist, PlaylistItem, Timeslot
from aura_engine.scheduling.timetable import TimetableService
class TestClockHandler(unittest.TestCase):
"""
Testing the timetable service.
"""
config = None
engine = None
#
# Setup and teardown
#
def setUp(self):
self.config = AuraConfig.instance.config
cache_location = self.config.general.cache_dir
self.timetable = TimetableService(cache_location)
f = self.timetable.timetable_file
self.timetable.timetable_file = f.replace("timetable.json", "timetable-test.json")
self.engine = DotDict({"scheduler": DotDict({"timetable": self.timetable})})
def tearDown(self):
pass
#
# Test Cases
#
def test_clock_scheduler_cycle(self):
print(self._testMethodName)
clockHandler = ClockInfoHandler(self.engine)
# Not yet implemented, so it passes in any case
timetable = None
clockHandler.on_scheduler_cycle(timetable)
def test_clock_on_fallback_active(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 500, end=now - 400, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 400, end=now + 400, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 400, end=now + 540, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 540, end=now + 720, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("file://alpha.flac", 100, 100, None)
beta = PlaylistItem("file://beta.flac", 600, 100, None)
gamma = PlaylistItem("file://gamma.flac", 100, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
clockHandler = ClockInfoHandler(self.engine)
clockHandler.on_fallback_active(ts1)
def test_clock_on_play(self):
print(self._testMethodName)
now = SU.timestamp()
ts1 = Timeslot(
id=1, repetition_id=None, start=now - 500, end=now - 400, show=None, episode=None
)
ts2 = Timeslot(
id=1, repetition_id=None, start=now - 400, end=now + 400, show=None, episode=None
)
ts3 = Timeslot(
id=1, repetition_id=None, start=now + 400, end=now + 540, show=None, episode=None
)
ts4 = Timeslot(
id=1, repetition_id=None, start=now + 540, end=now + 720, show=None, episode=None
)
pl = Playlist("999", "Playlist X", False)
alpha = PlaylistItem("file://alpha.flac", 100, 100, None)
beta = PlaylistItem("file://beta.flac", 600, 100, None)
gamma = PlaylistItem("file://gamma.flac", 100, 100, None)
pl.add(alpha)
pl.add(beta)
pl.add(gamma)
ts2.set_playlists(pl, None, None)
self.timetable.timetable = [ts1, ts2, ts3, ts4]
clockHandler = ClockInfoHandler(self.engine)
clockHandler.on_play(beta)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import unittest
import validators
from aura_engine.base.config import AuraConfig
class TestConfig(unittest.TestCase):
"""
Testing the Configuration.
"""
config = None
def setUp(self):
self.config = AuraConfig.instance
def test_config(self):
print(self._testMethodName)
# Regex for validating directory paths; including non-existing and prefixed "../"
directory_regex = r"^.*\/(?:\.?[^./]+)$"
# Check if config is available
self.assertIsNotNone(AuraConfig.instance.config_file_path)
# Check if "install_dir" is a valid directory (is evaluated at runtime)
self.assertTrue(os.path.isdir(self.config.confuse_config["install_dir"].get()))
# Reference to confuse config
cfg = self.config.config
# general
self.assertRegex(cfg.general.socket_dir, directory_regex)
self.assertRegex(cfg.general.cache_dir, directory_regex)
# log
self.assertTrue(os.path.isdir(cfg.log.directory))
self.assertIn(cfg.log.level, ["debug", "info", "warning", "error", "critical"])
# monitoring
self.assertTrue(
cfg.monitoring.heartbeat.host == ""
or validators.domain(cfg.monitoring.heartbeat.host)
or validators.ipv4(cfg.monitoring.heartbeat.host)
)
# api
self.assertTrue(validators.url(cfg.api.steering.status))
self.assertTrue(validators.url(cfg.api.steering.calendar))
self.assertTrue(validators.url(cfg.api.tank.status))
tank_playlist_url = cfg.api.tank.playlist.replace("${ID}", "1")
self.assertTrue(validators.url(tank_playlist_url))
self.assertTrue(validators.url(cfg.api.engine.status))
self.assertTrue(validators.url(cfg.api.engine.store_playlog))
self.assertTrue(validators.url(cfg.api.engine.store_clock))
engine_health_url = cfg.api.engine.store_health.replace("${ENGINE_NUMBER}", "1")
self.assertTrue(validators.url(engine_health_url))
# scheduler
self.assertRegex(cfg.scheduler.audio.source_folder, directory_regex)
self.assertRegex(cfg.scheduler.audio.source_extension, r"\.[a-zA-Z0-9]+$")
self.assertRegex(cfg.scheduler.audio.playlist_folder, directory_regex)
self.assertIsInstance(cfg.scheduler.audio.engine_latency_offset, float)
self.assertIsInstance(cfg.scheduler.fetching_frequency, int)
self.assertIsInstance(cfg.scheduler.scheduling_window_start, int)
self.assertIsInstance(cfg.scheduler.scheduling_window_end, int)
self.assertIsInstance(cfg.scheduler.preload_offset, int)
self.assertIsInstance(cfg.scheduler.input_stream.buffer, float)
self.assertIsInstance(cfg.scheduler.fade_in_time, float)
self.assertIsInstance(cfg.scheduler.fade_out_time, float)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import inspect
import time
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.events import EngineEventDispatcher, EventBinding
from aura_engine.scheduling.domain import PlaylistItem
#
# Mocks
#
class MockedPlayer:
"""
Mocked version of engine player.
"""
def roll(channel, seconds_to_roll: int):
print(f"called pre-rolling for {channel} seconds on player channel {seconds_to_roll}")
pass
class MockedScheduler:
"""
Mocked version of scheduler.
"""
engine = None
def __init__(self, engine):
self.engine = engine
def boot(self):
print("calling mocked 'boot' in scheduler")
def on_ready(self):
"""
Call when the engine has finished booting and is ready to play.
"""
print(f"called mocked 'on_ready' in scheduler")
class MockedEngine:
"""
Mocked version of engine.
"""
scheduler: MockedScheduler
player: MockedPlayer
def __init__(self):
"""
Init.
"""
self.scheduler = None
self.player = MockedPlayer
class MockEventHandler:
engine: MockedEngine
call_stack: [] = []
def __init__(self, engine: MockedEngine) -> None:
self.engine = engine
def on_initialized(self, _):
"""
Call when the engine is initialized, just before it is ready.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_boot(self):
"""
Call when the engine is starting up.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_ready(self, _):
"""
Call when the engine has finished booting and is ready to play.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_timeslot_start(self, timeslot):
"""
Call when a timeslot starts.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_timeslot_end(self, timeslot):
"""
Call when a timeslot ends.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_play(self, item):
"""
Call by the engine when some play command to Liquidsoap is issued.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_stop(self, channel):
"""
Call when the passed channel has stopped playing.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_fallback_active(self, timeslot):
"""
Call when a fallback is activated for the given timeslot.
This means there is no proper playlist scheduled.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_queue(self, items: list):
"""
Call when one or more playlist items have been queued and are currently being pre-loaded.
Args:
items (list): List of playlist items.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_sick(self, data):
"""
Call when the engine is in some unhealthy state.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
def on_resurrect(self, data):
"""
Call when the engine turned healthy again after being sick.
"""
m = inspect.stack()[0][3]
print(f"called mocked '{m}'")
self.call_stack.append(m)
class TestEventDispatcher(unittest.TestCase):
"""
Testing the event dispatcher.
"""
config = None
def setUp(self):
self.config = AuraConfig.instance.config
def test_event_dispatcher_init(self):
print(self._testMethodName)
engine = MockedEngine()
dispatcher = EngineEventDispatcher(engine)
binding = dispatcher.attach(MockEventHandler)
self.assertIsInstance(binding, EventBinding)
binding.subscribe("on_initialized")
binding.subscribe("on_boot")
binding.subscribe("on_ready")
binding.subscribe("on_timeslot_start")
binding.subscribe("on_timeslot_end")
binding.subscribe("on_play")
binding.subscribe("on_stop")
binding.subscribe("on_fallback_active")
binding.subscribe("on_queue")
binding.subscribe("on_sick")
binding.subscribe("on_resurrect")
scheduler = MockedScheduler(MockedEngine())
item = PlaylistItem("source", 2.3, 100, PlaylistItem.Metadata)
dispatcher.initialize(scheduler)
dispatcher.on_boot()
dispatcher.on_ready()
dispatcher.on_timeslot_start(None)
dispatcher.on_timeslot_end(None)
dispatcher.on_play(item)
dispatcher.on_stop(None)
dispatcher.on_fallback_active(None)
dispatcher.on_queue(item)
dispatcher.on_sick(None)
dispatcher.on_resurrect(None)
time.sleep(4)
self.assertEqual(11, len(MockEventHandler.call_stack))
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import unittest
from aura_engine.base.utils import SimpleUtil as SU
from aura_engine.control import EngineExecutor
class TestEngineExecutor1(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_single_executor(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none"]
due_time = SU.timestamp() + 2
def f(param):
global_state[0] = param
# Before the executor is done there should be the initial value
EngineExecutor("RANDOM_NAMESPACE", None, due_time, f, "hello singularity")
self.assertEqual("none", global_state[0])
# After 3 seconds there should be the updated value
time.sleep(3)
self.assertEqual("hello singularity", global_state[0])
class TestEngineExecutor2(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_two_executors(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none"]
def f(param):
global_state[0] = param
# Before the executor 1 is done there should be the initial value
due_time1 = SU.timestamp() + 3
EngineExecutor("EXECUTOR_1", None, due_time1, f, "hello world from executor 1")
self.assertEqual("none", global_state[0])
self.assertNotEqual("hello world from executor 1", global_state[0])
# Before the executor 2 is done there should be still the initial value
due_time2 = SU.timestamp() + 1
EngineExecutor("EXECUTOR_2", None, due_time2, f, "hello world from executor 2")
self.assertEqual("none", global_state[0])
self.assertNotEqual("hello world from executor 2", global_state[0])
# After 0.3 seconds there still should be the initial value
time.sleep(0.3)
self.assertEqual("none", global_state[0])
# After 1.5 seconds max there should be the updated value from executor 2
time.sleep(1.2)
self.assertEqual("hello world from executor 2", global_state[0])
# After 5 seconds max there should be the updated value from executor 1
time.sleep(3)
self.assertEqual("hello world from executor 1", global_state[0])
class TestEngineExecutor3(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_parent_child_executors_in_order(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none"]
def f(param):
global_state[0] = param
# Before the the parent is done there should be the initial value
due_time1 = SU.timestamp() + 0.5
parent = EngineExecutor("EXECUTOR_PARENT", None, due_time1, f, "hello world from parent")
self.assertEqual("none", global_state[0])
# Before the the child is done there should be the initial value
due_time2 = SU.timestamp() + 1
EngineExecutor("EXECUTOR_CHILD", parent, due_time2, f, "hello world from child")
self.assertEqual("none", global_state[0])
# After 0.3 seconds there still should be the initial value
time.sleep(0.3)
self.assertEqual("none", global_state[0])
# After 0.6 seconds max there should be the updated value from parent executor
time.sleep(0.3)
self.assertEqual("hello world from parent", global_state[0])
# After 1.2 seconds max there should be the updated value from child executor
time.sleep(1.2)
self.assertEqual("hello world from child", global_state[0])
class TestEngineExecutor4(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_parent_child_executors_with_child_before(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none", "never called by parent"]
def f(param):
global_state[0] = param
if param == "hello world from parent":
global_state[1] = param
# Before the the parent is done there should be the initial value
due_time1 = SU.timestamp() + 0.5
parent = EngineExecutor("EXECUTOR_PARENT", None, due_time1, f, "hello world from parent")
self.assertEqual("none", global_state[0])
# Before the the child is done there should be the initial value
due_time2 = SU.timestamp() + 1.5
EngineExecutor("EXECUTOR_CHILD", parent, due_time2, f, "hello world from child")
self.assertEqual("none", global_state[0])
# After 0.2 seconds there still should be the initial value
time.sleep(0.2)
self.assertEqual("none", global_state[0])
# After 0.4 seconds max there isn't a setting from the child yet, because it is waiting for
# the parent
time.sleep(0.2)
self.assertNotEqual("hello world from child", global_state[0])
# But the parent didn't set anything either, because it is scheduled for later
self.assertNotEqual("hello world from parent", global_state[0])
self.assertEqual("none", global_state[0])
# Double check if it has ever been called by parent
self.assertEqual("never called by parent", global_state[1])
# After 2.2 seconds max there should be the updated value from parent & child
# Because the child is due before the parent, it is executed right away,
# hence overwriting the value just set by the parent
time.sleep(2.2)
self.assertNotEqual("hello world from parent", global_state[0])
self.assertEqual("hello world from child", global_state[0])
# But we do not just believe what we expect, but check if it really has ever been called by
# a parent
self.assertEqual("hello world from parent", global_state[1])
class TestEngineExecutor5(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_timer_store_replacement_after_parent_execution(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none", "none"]
def f1(param):
global_state[0] = param
def f2(param):
global_state[1] = param
# There should be a total of 0 timers
timers = EngineExecutor.command_history()
self.assertEqual(0, len(timers))
# Before the the parent is done there should be the initial value
due_time1 = SU.timestamp() + 0.5
parent = EngineExecutor("EXECUTOR_PARENT", None, due_time1, f1, "hello world from parent")
self.assertEqual("none", global_state[0])
# Before the the child is done there should be the initial value
due_time2 = SU.timestamp() + 2
EngineExecutor("EXECUTOR_CHILD", parent, due_time2, f2, "hello world from child")
self.assertEqual("none", global_state[0])
# There should be a total of 2 timers
timers = EngineExecutor.command_history()
self.assertEqual(2, len(timers))
# Replacing the parent with a new instance
parent = EngineExecutor(
"EXECUTOR_PARENT", None, due_time1, f1, "hello world from alternative parent"
)
self.assertEqual("none", global_state[0])
# Let the parent execute and do its stuff...
time.sleep(1)
# ... now there should be the updated value from the alternative parent
self.assertEqual(False, parent.is_alive())
self.assertEqual("hello world from alternative parent", global_state[0])
# Now create a replacement child
# Before the the child is done there should be the initial value
EngineExecutor(
"EXECUTOR_CHILD", parent, due_time2, f2, "hello world from alternative child"
)
self.assertEqual("none", global_state[1])
# Wait for child execution
time.sleep(2)
# The child should not have been updated, since the parent was already finished before.
self.assertEqual("hello world from child", global_state[1])
# There should be a total of 2 timers, even though 4 got instantiated
timers = EngineExecutor.command_history()
self.assertEqual(2, len(timers))
class TestEngineExecutor6(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
# Review why this test case fails from time to time:
# @See https://gitlab.servus.at/aura/engine/-/issues/122
# def test_parent_child_replacement_in_time(self):
# print(self._testMethodName)
# # Initialize state and executor params
# EngineExecutor.timer_store = {}
# global_state = ["none", "none"]
# def f1(param):
# global_state[0] = param
# def f2(param):
# global_state[1] = param
# # There should be a total of 0 timers
# timers = EngineExecutor.command_history()
# self.assertEqual(0, len(timers))
# # Before the the parent is done there should be the initial value
# due_time1 = SU.timestamp() + 2
# parent = EngineExecutor("EXECUTOR_PARENT", None, due_time1, f1, "hello world from parent")
# self.assertEqual("none", global_state[0])
# # Before the the child is done there should be the initial value
# due_time2 = SU.timestamp() + 3
# EngineExecutor("EXECUTOR_CHILD", parent, due_time2, f2, "hello world from child")
# self.assertEqual("none", global_state[1])
# # There should be a total of 2 timers
# timers = EngineExecutor.command_history()
# self.assertEqual(2, len(timers))
# time.sleep(0.1)
# # Create some new parent & child
# parent = EngineExecutor(
# "EXECUTOR_PARENT", None, due_time1, f1, "hello world from alternative parent"
# )
# child = EngineExecutor(
# "EXECUTOR_CHILD", parent, due_time2, f2, "hello world from alternative child"
# )
# # Nothing is executed yet, event after 1 seconds
# self.assertEqual("none", global_state[0])
# self.assertEqual("none", global_state[1])
# time.sleep(1)
# self.assertEqual("none", global_state[0])
# self.assertEqual("none", global_state[1])
# self.assertEqual(True, parent.is_alive())
# self.assertEqual(True, parent.is_alive())
# # Some seconds later, though...
# time.sleep(4)
# # Parent finished: There should be the updated value from the alternative parent
# self.assertEqual(False, parent.is_alive())
# self.assertEqual("hello world from alternative parent", global_state[0])
# # Child finished: There should be the updated value from the alternative child
# self.assertEqual(False, child.is_alive())
# self.assertEqual("hello world from alternative child", global_state[1])
# # There should be a total of 2 timers, even though 4 got instantiated
# timers = EngineExecutor.command_history()
# self.assertEqual(2, len(timers))
class TestEngineExecutor7(unittest.TestCase):
"""
Testing the EngineExecutor.
"""
def setUp(self):
None
def test_dead_parent_with_lively_child(self):
print(self._testMethodName)
# Initialize state and executor params
EngineExecutor.timer_store = {}
global_state = ["none", "none"]
def f1(param):
global_state[0] = param
def f2(param):
global_state[1] = param
# Before the the parent is done there should be the initial value
due_time1 = SU.timestamp() + 1
parent = EngineExecutor("EXECUTOR_PARENT1", None, due_time1, f1, "hello parent1")
self.assertEqual("none", global_state[0])
# Before the the child is done there should be the initial value
due_time2 = SU.timestamp() + 3
child = EngineExecutor("EXECUTOR_CHILD1", parent, due_time2, f2, "hello child1")
self.assertEqual("none", global_state[1])
# Wait until the parent timer got executed
time.sleep(2)
self.assertEqual("hello parent1", global_state[0])
# Parent dead - child alive
self.assertEqual(False, parent.is_alive())
self.assertEqual(True, child.is_alive())
# Replacing the parent & child with a new instance
parent = EngineExecutor("EXECUTOR_PARENT1", None, due_time1, f1, "hello parent2")
child = EngineExecutor("EXECUTOR_CHILD1", parent, due_time2, f2, "hello child2")
# New parent = dead before finished initialization already, so actually never born
self.assertEqual(False, parent.is_alive())
# Even though the late parent would be executed by now, it wasn't, because the initial
# parent was finished at instantiation time already
self.assertEqual("hello parent1", global_state[0])
# Finally when the parent is already finished, the child should never update
time.sleep(2)
self.assertEqual("hello child1", global_state[1])
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest import mock
import requests
from aura_engine.base.api import LiquidsoapUtil
from aura_engine.base.config import AuraConfig
class TestLiquidsoapUtil(unittest.TestCase):
"""
Testing the Configuration.
"""
config = None
api = None
#
# Setup
#
def setUp(self):
pass
#
# Tests
#
def test_json_to_dict(self):
print(self._testMethodName)
json_str = '+{-"key"-:-"value"-}+'
json_dict = LiquidsoapUtil.json_to_dict(json_str)
# Check if config is available
self.assertIsNotNone(json_dict)
self.assertEqual("value", json_dict.get("key"))
def test_annotate_uri(self):
print(self._testMethodName)
uri = "/some/uri"
meta = {"cue": 1, "volume": 100}
uri = LiquidsoapUtil.annotate_uri(uri, meta)
self.assertEqual('annotate:cue="1",volume="100":/some/uri', uri)
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.base.logger import AuraLogger
class TestLogger(unittest.TestCase):
"""
Testing the Logger.
"""
aura_logger = None
def setUp(self):
self.config = AuraConfig.instance.config
self.aura_logger = AuraLogger(self.config)
def test_logger(self):
print(self._testMethodName)
self.assertTrue(self.aura_logger.logger.hasHandlers())
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest import mock
from aura_engine.base.config import AuraConfig
from aura_engine.plugins.monitor import AuraMonitor, MonitorResponseCode
class TestLogger(unittest.TestCase):
"""
Testing the Logger.
"""
monitor = None
#
# Mock
#
# Mock `requests.get` in `SimpleRestApi`
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
def json(self):
return self.json_data
print(f"Calling mocked 'requests.get' with '{args[0]}'")
if args[0] == "http://aura.test.available":
return MockResponse({"foo": "bar"}, 200)
return MockResponse(None, 404)
def setUp(self):
self.config = AuraConfig.instance.config
self.monitor = AuraMonitor(None)
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_url_response_success(self, mock_get):
print(self._testMethodName)
json = self.monitor.get_url_response("http://aura.test.available")
# Success
self.assertEqual("bar", json["foo"])
@mock.patch("aura_engine.base.api.requests.get", side_effect=mocked_requests_get)
def test_get_url_response_fail(self, mock_get):
print(self._testMethodName)
invalid_status = self.monitor.get_url_response("http://aura.test.not.available")
# Success
self.assertEqual(MonitorResponseCode.INVALID_STATE.value, invalid_status)
if __name__ == "__main__":
unittest.main()
#
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-now() - The Aura Engine Team.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import unittest
from aura_engine.base.config import AuraConfig
from aura_engine.core.channels import ChannelName, ChannelType
from aura_engine.resources import (
ResourceClass,
ResourceMapping,
ResourceType,
ResourceUtil,
)
from aura_engine.scheduling.domain import (
Episode,
Playlist,
PlaylistItem,
Show,
Timeslot,
)
class TestEngineResources(unittest.TestCase):
"""
Testing the engine resource utilities.
"""
config = None
def setUp(self):
self.config = AuraConfig.instance.config
def test_to_abs_path(self):
print(self._testMethodName)
path = ResourceUtil.to_abs_path("abc")
self.assertEqual("/", path[0])
def test_audio_store_path(self):
print(self._testMethodName)
path = ResourceUtil.audio_store_path()
self.assertEqual("/engine-core/audio/source", path.split("..")[1])
def test_playlist_folder_path(self):
print(self._testMethodName)
path = ResourceUtil.playlist_folder_path()
self.assertEqual("/engine-core/audio/playlist", path.split("..")[1])
def test_playlist_folder_path(self):
print(self._testMethodName)
path = ResourceUtil.source_to_filepath("file://some/file")
self.assertEqual("/engine-core/audio/source/some/file.flac", path.split("..")[1])
def test_resource_mapping(self):
print(self._testMethodName)
rm = ResourceMapping()
ct = rm.type_for_resource(ResourceType.FILE)
self.assertEqual(ChannelType.QUEUE, ct)
ct = rm.type_for_resource(ResourceType.STREAM_HTTP)
self.assertEqual(ChannelType.HTTP, ct)
ct = rm.type_for_resource(ResourceType.LINE)
self.assertEqual(ChannelType.LIVE, ct)
ct = rm.type_for_resource(ResourceType.M3U)
self.assertEqual(ChannelType.QUEUE, ct)
ct = rm.type_for_resource(ResourceType.POOL)
self.assertEqual(ChannelType.QUEUE, ct)
def test_live_channel_for_resource(self):
print(self._testMethodName)
rm = ResourceMapping()
live_channel: ChannelName = rm.live_channel_for_resource("line://0")
self.assertEqual("LIVE_0", live_channel.name)
self.assertEqual("aura_engine_line_in_0", live_channel.value)
live_channel: ChannelName = rm.live_channel_for_resource("line://1")
self.assertEqual("LIVE_1", live_channel.name)
self.assertEqual("aura_engine_line_in_1", live_channel.value)
live_channel: ChannelName = rm.live_channel_for_resource("line://2")
self.assertEqual("LIVE_2", live_channel.name)
self.assertEqual("aura_engine_line_in_2", live_channel.value)
live_channel: ChannelName = rm.live_channel_for_resource("line://3")
self.assertEqual("LIVE_3", live_channel.name)
self.assertEqual("aura_engine_line_in_3", live_channel.value)
live_channel: ChannelName = rm.live_channel_for_resource("line://4")
self.assertEqual("LIVE_4", live_channel.name)
self.assertEqual("aura_engine_line_in_4", live_channel.value)
def test_resource_class(self):
print(self._testMethodName)
rc_file = ResourceClass.FILE
rc_live = ResourceClass.LIVE
rc_stream = ResourceClass.STREAM
rc_playlist = ResourceClass.PLAYLIST
self.assertTrue(isinstance(rc_file.types, list))
self.assertEqual(ResourceType.FILE, rc_file.types[0])
self.assertEqual("fs", str(rc_file))
self.assertEqual(0, rc_file.value.get("numeric"))
self.assertEqual(rc_file.types, rc_file.value.get("types"))
self.assertTrue(isinstance(rc_stream.types, list))
self.assertEqual(ResourceType.STREAM_HTTP, rc_stream.types[0])
self.assertEqual("http", str(rc_stream))
self.assertEqual(1, rc_stream.value.get("numeric"))
self.assertEqual(rc_stream.types, rc_stream.value.get("types"))
self.assertTrue(isinstance(rc_live.types, list))
self.assertEqual(ResourceType.LINE, rc_live.types[0])
self.assertEqual("live", str(rc_live))
self.assertEqual(2, rc_live.value.get("numeric"))
self.assertEqual(rc_live.types, rc_live.value.get("types"))
self.assertTrue(isinstance(rc_playlist.types, list))
self.assertEqual(ResourceType.M3U, rc_playlist.types[0])
self.assertEqual(ResourceType.POOL, rc_playlist.types[1])
self.assertEqual("playlist", str(rc_playlist))
self.assertEqual(3, rc_playlist.value.get("numeric"))
self.assertEqual(rc_playlist.types, rc_playlist.value.get("types"))
def test_resource_util_get_content_type(self):
print(self._testMethodName)
rt = ResourceUtil.get_content_type("file://magic.resource")
self.assertEqual(ResourceType.FILE, rt)
rt = ResourceUtil.get_content_type("http://magic.resource")
self.assertEqual(ResourceType.STREAM_HTTP, rt)
rt = ResourceUtil.get_content_type("https://magic.resource")
self.assertEqual(ResourceType.STREAM_HTTP, rt)
rt = ResourceUtil.get_content_type("line://magic.resource")
self.assertEqual(ResourceType.LINE, rt)
rt = ResourceUtil.get_content_type("m3u://magic.resource")
self.assertEqual(ResourceType.M3U, rt)
rt = ResourceUtil.get_content_type("pool://magic.resource")
self.assertEqual(ResourceType.POOL, rt)
def test_resource_util_get_content_class(self):
print(self._testMethodName)
rc = ResourceUtil.get_content_class(ResourceType.FILE)
self.assertEqual(ResourceClass.FILE, rc)
rc = ResourceUtil.get_content_class(ResourceType.STREAM_HTTP)
self.assertEqual(ResourceClass.STREAM, rc)
rc = ResourceUtil.get_content_class(ResourceType.LINE)
self.assertEqual(ResourceClass.LIVE, rc)
rc = ResourceUtil.get_content_class(ResourceType.M3U)
self.assertEqual(ResourceClass.PLAYLIST, rc)
rc = ResourceUtil.get_content_class(ResourceType.POOL)
self.assertEqual(ResourceClass.PLAYLIST, rc)
def test_resource_util_gen_m3u(self):
print(self._testMethodName)
pl = Playlist(1, "some description")
e1 = PlaylistItem("file://file1", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file://file2", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file://file3", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
file_path: str = "./.cache/test.m3u"
ResourceUtil.generate_m3u_file(file_path, pl.items)
print("created M3U file:" + file_path)
self.assertTrue(pathlib.Path(file_path).is_file())
def test_resource_util_get_items_string(self):
print(self._testMethodName)
ts = Timeslot(id=42, repetition_id=None, start=100000, end=100360, show=None, episode=None)
pl = Playlist(1, "That's a real playlist with timeslot connection")
e1 = PlaylistItem("file://file1", 2, 100, PlaylistItem.Metadata("a1", "b1", "c1"))
e2 = PlaylistItem("file://file2", 3, 100, PlaylistItem.Metadata("a2", "b2", "c2"))
e3 = PlaylistItem("file://file3", 5, 100, PlaylistItem.Metadata("a3", "b3", "c3"))
pl.add(e1)
pl.add(e2)
pl.add(e3)
ts.set_playlists(pl, None, None)
items_str = ResourceUtil.get_items_string(pl.items)
expect_str = "PlaylistItem [04:46:40 - 04:46:42 | 2.00sec | Source: ...file://file1], PlaylistItem [04:46:42 - 04:46:45 | 3.00sec | Source: ...file://file2], PlaylistItem [04:46:45 - 04:46:50 | 5.00sec | Source: ...file://file3]"
print(items_str)
self.assertEqual(expect_str, items_str)
def test_generate_track_metadata(self):
print(self._testMethodName)
show = Show(id=123, name="some show")
episode = Episode(id="1432", title="SuperSonicShow 23/1", memo="")
ts = Timeslot(
id=42, repetition_id=None, start=100000, end=100360, show=show, episode=episode
)
pl = Playlist(1, "That's a real playlist with timeslot connection")
e1 = PlaylistItem(
"file://file1", 5, 100, PlaylistItem.Metadata("artist", "album", "track1")
)
e2 = PlaylistItem(
"file://file2", 5, 100, PlaylistItem.Metadata("artist", "album", "track2")
)
e3 = PlaylistItem(
"file://file3", 5, 100, PlaylistItem.Metadata("artist", "album", "track3")
)
pl.add(e1)
pl.add(e2)
pl.add(e3)
ts.set_playlists(pl, None, None)
# Test playing track without track_start assignment
e1.play.set_playing()
metadata = ResourceUtil.generate_track_metadata(e1, False)
self.assertEqual("", metadata.get("track_start"))
# Test playing track without track_start assignment
e1.play.set_playing()
e1.play.play_start = 1710445690.0
metadata = ResourceUtil.generate_track_metadata(e1, True)
self.assertEqual("1970/01/02 04:46:40", metadata.get("track_start"))
# Test other generated metadata
metadata = ResourceUtil.generate_track_metadata(e3, False)
self.assertEqual("some show", metadata.get("show_name"))
self.assertEqual(123, metadata.get("show_id"))
self.assertEqual(42, metadata.get("timeslot_id"))
self.assertEqual(1, metadata.get("playlist_id"))
self.assertEqual("3.0", metadata.get("playlist_item"))
self.assertEqual(0, metadata.get("track_type"))
self.assertEqual("artist", metadata.get("track_artist"))
self.assertEqual("album", metadata.get("track_album"))
self.assertEqual("track3", metadata.get("track_title"))
if __name__ == "__main__":
unittest.main()