Skip to content
Snippets Groups Projects
Commit ca02e5d6 authored by Chris Pastl's avatar Chris Pastl
Browse files

refactor: use enum for core mock functions

parent 8c09f36a
No related branches found
No related tags found
1 merge request!38Test cases for "src/aura_engine/core"
Pipeline #7977 failed
......@@ -18,6 +18,7 @@
import logging
from enum import StrEnum, auto
from time import sleep
from urllib.parse import urlparse
......@@ -30,9 +31,54 @@ from aura_engine.core.mixer import Mixer
from aura_engine.events import EngineEventDispatcher
# enums reflecting namespace, action and reponses
class Namespace(StrEnum):
MIXER = auto()
CHANNEL = auto()
@classmethod
def _missing_(cls, value):
# map 'mixer' to .MIXER and values of ChannelName to .CHANNEL
if isinstance(value, Namespace):
return cls(value)
elif value in ChannelName._value2member_map_.values():
return cls.CHANNEL
else:
raise TypeError
class MixerAction(StrEnum):
INPUTS = auto()
STATUS = auto()
VOLUME = auto()
SELECT = auto()
ACTIVATE = auto()
class ChannelAction(StrEnum):
PUSH = auto()
LOAD = auto()
URL = auto()
STATUS = auto()
START = auto()
STOP = auto()
CLEAR = auto()
ROLL = auto()
SET_TRACK_METADATA = auto()
class Response(StrEnum):
"""
Using custom reponse type since PlayoutStatusResponse.SUCCESS is list instead of single string.
"""
SUCCESS = "OK" # PlayoutStatusResponse.SUCCESS
INVALID_URL = "Invalid URL" # some arbitrary string which is not "OK" signaling failure
class CoreClientMock(CoreClient):
"""
Suppress unwanted behavior by subclassing and overriding.
Suppress unwanted behavior by subclassing and overriding necessary functions.
"""
instance = None
......@@ -41,6 +87,7 @@ class CoreClientMock(CoreClient):
event_dispatcher = None
conn = None
stream_url = None
resource_id = -1
volumes = [0, 0, 0, 0]
selections = [0, 0, 0, 0]
......@@ -61,70 +108,109 @@ class CoreClientMock(CoreClient):
CoreClientMock.instance = CoreClientMock(event_dispatcher)
return CoreClientMock.instance
def _validate_url(self, url: str) -> bool:
"""
Check url validity.
"""
parse = urlparse(url)
return all([parse.scheme, parse.netloc])
def _get_resource_id(self) -> int:
"""
Increment and return index starting with 0.
"""
self.resource_id += 1
return self.resource_id
@synchronized
def connect(self):
sleep(0.5)
pass
@synchronized
def disconnect(self):
sleep(0.5)
pass
@synchronized
def exec(self, namespace: str, action: str, args: str = "") -> str:
self.logger.debug(
f"Core mock namespace: '{namespace}', action: '{action}', args: '{args}'"
)
if namespace == "mixer":
if action == "inputs":
return f"{ChannelName.QUEUE_A} {ChannelName.QUEUE_B}"
elif action == "status":
chn = int(args)
vol = self.volumes[chn]
return f"ready=true selected=true single=false volume={vol}% remaining=0"
elif action == "volume":
argv = args.split(" ")
chn = int(argv[0])
vol = self.volumes[chn]
self.volumes[chn] = int(float(argv[1]) / 100.0)
resp = f"volume={vol}% remaining=0"
# print(resp)
return resp
elif action == "select":
argv = args.split(" ")
chn = int(argv[0])
vol = self.volumes[chn]
sel = 1 if argv[1] == "true" else 0
self.selections[chn] = sel
return f"volume={vol}% remaining=0 selected={sel}"
elif action == "activate":
return "OK"
elif namespace in ChannelName._value2member_map_.values():
if action == "push":
sleep(5) # simulate some loading time
return 1 # res id
elif action == "load":
sleep(3) # simulate some loading time
return 2 # res id
elif action == "url":
parse = urlparse(args)
if all([parse.scheme, parse.netloc]):
self.stream_url = args
return "OK"
else:
return "Invalid URL"
elif action == "status":
return PlayoutStatusResponse.STREAM_STATUS_CONNECTED.value + " " + self.stream_url
elif action == "start":
sleep(1) # simulate small start delay
return "OK"
elif action == "stop":
return "OK"
elif action == "clear":
return "OK"
elif action == "roll":
return "OK"
elif action == "set_track_metadata":
return "OK"
log = self.logger.debug
log(f"Core mock request 'ns: {namespace}', action: '{action}', args: '{args}'")
response: str = None
ns = Namespace(namespace)
if ns is None:
raise TypeError
match ns:
case Namespace.MIXER:
act = MixerAction(action)
if act is None:
raise TypeError
match act:
case MixerAction.INPUTS:
response = f"{ChannelName.QUEUE_A} {ChannelName.QUEUE_B}"
case MixerAction.STATUS:
chn = int(args)
vol = self.volumes[chn]
response = (
f"ready=true selected=true single=false volume={vol}% remaining=0"
)
case MixerAction.VOLUME:
argv = args.split(" ")
chn = int(argv[0])
vol = self.volumes[chn]
self.volumes[chn] = int(float(argv[1]) / 100.0)
response = f"volume={vol}% remaining=0"
case MixerAction.SELECT:
argv = args.split(" ")
chn = int(argv[0])
vol = self.volumes[chn]
sel = 1 if argv[1] == "true" else 0
self.selections[chn] = sel
response = f"volume={vol}% remaining=0 selected={sel}"
case MixerAction.ACTIVATE:
response = Response.SUCCESS
case Namespace.CHANNEL:
act = ChannelAction(action)
if act is None:
raise TypeError
match act:
case ChannelAction.PUSH:
sleep(5)
resid = self._get_resource_id()
response = str(resid)
case ChannelAction.LOAD:
sleep(3)
resid = self._get_resource_id()
response = str(resid)
case ChannelAction.URL:
if self._validate_url(args):
self.stream_url = args
response = Response.SUCCESS
else:
response = Response.INVALID_URL
case ChannelAction.STATUS:
response = (
PlayoutStatusResponse.STREAM_STATUS_CONNECTED.value
+ " "
+ self.stream_url
)
case ChannelAction.START:
sleep(1)
response = Response.SUCCESS
case ChannelAction.STOP:
response = Response.SUCCESS
case ChannelAction.CLEAR:
response = Response.SUCCESS
case ChannelAction.ROLL:
response = Response.SUCCESS
case ChannelAction.SET_TRACK_METADATA:
response = Response.SUCCESS
if response is not None:
log(f"Core mock reponse '{response}'")
return response
raise Exception(f"Unhandled namespace: '{namespace}', action: '{action}', args: '{args}'")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment