Commit fe69527d authored by Gottfried Gaisbauer's avatar Gottfried Gaisbauer
Browse files

communication changed from zmq to redis. it is much, much faster

parent 9d536a7d
This diff is collapsed.
......@@ -3,9 +3,10 @@ import sys
import threading
from libraries.base.config import ConfigReader
from libraries.reporting.messenger import RedisMessenger
#from libraries.reporting.messenger import RedisListener
from modules.controller.controller import AuraController
from modules.communication.zmq.zmqadapter import ServerZMQAdapter
#from modules.communication.zmq.zmqadapter import ServerZMQAdapter
from modules.communication.redis.redisadapter import ServerRedisAdapter
class Aura():
......@@ -18,13 +19,15 @@ class Aura():
self.config = ConfigReader()
self.config.loadConfig()
messenger = RedisMessenger()
messenger.set_channel("aura")
server = object
self.controller = AuraController(self.config, debug=False)
self.server = ServerZMQAdapter(self.controller, self.config, debug=True)
#self.server = ServerZMQAdapter(self.controller, self.config, debug=True)
#self.redislistener =
self.messenger = ServerRedisAdapter()
self.messenger.set_controller(self.controller)
self.messenger.set_config(self.config)
def receive_signal(signum, stack):
print("received signal")
......@@ -34,7 +37,7 @@ class Aura():
def join_comm(self):
# start listener thread
self.server.join_comm()
self.messenger.start()
# # ## ## ## ## ## # #
# # ENTRY FUNCTION # #
......
......@@ -46,9 +46,13 @@ class Guru:
parser.add_argument("-vm", "--volume", action="store", dest="set_volume", default=0, metavar=("MIXERNUM","VOLUME"), nargs=2,
help="Set volume of a mixer source", type=int)
parser.add_argument("-as", "--add-source", action="store", dest="add_source", default="",
parser.add_argument("-as", "--add-source", action="store", dest="add_source", default=False,
help="Add new source to LiquidSoap mixer [Experimental]")
# redis comm. test
parser.add_argument("-rm", "--redis-message", action="store", dest="redis_message", default=False, metavar=("CHANNEL", "MESSAGE"), nargs=2,
help="Send a redis message to the Listeners")
# calls from liquidsoap
parser.add_argument("-gnft", "--get-next-for-fallback-file", action="store", dest="type", default=False,
help="For which type you wanna have a next audio file?")
......
# -*- coding: utf-8 -*-
import datetime
import time
import redis
import logging
import datetime
import threading
from libraries.reporting.statestore import RedisStateStore
from libraries.reporting.mail import AuraMailer
......@@ -10,17 +12,19 @@ from libraries.reporting.mail import AuraMailer
Meldungen an den StateStore schicken
"""
class RedisMessenger():
rstore = None
def __init__(self):
"""
Constructor
"""
self.channel = 'main'
self.section = ''
self.rstore = RedisStateStore()
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
"""
Constructor
"""
self.channel = 'main'
self.section = ''
self.rstore = RedisStateStore()
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
......@@ -260,3 +264,4 @@ class RedisMessenger():
next = b''
return next.decode('utf-8')
......@@ -6,7 +6,7 @@ import datetime
import json
import re
import uuid
import threading
class RedisStateStore(object):
......@@ -296,3 +296,14 @@ class RedisStateStore(object):
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
#pb = self.db.pub
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)
......@@ -87,7 +87,7 @@ class LiquidSoapClient():
# self.client.setblocking(0)
data = '';
self.client.settimeout(2)
self.client.settimeout(3)
# recv something
# while True:
......@@ -114,7 +114,11 @@ class LiquidSoapClient():
ret = ret.splitlines()
try:
# print("pop from empty list?")
# print(ret)
ret.pop() # pop 'END' out
# print(ret)
# print("yes")
self.message = str.join('\n', ret)
# self.message = str(b'\n'.join(ret))
except Exception as e:
......
......@@ -14,10 +14,10 @@ class LiquidSoapInitThread(threading.Thread):
self.socket = socket
self.liquidsoapcommunicator = liquidsoapcommunicator
def run(self, ):
def run(self):
try:
# sleep needed, because the socket is created to slow by liquidsoap
time.sleep(1)
time.sleep(10)
self.liquidsoapcommunicator.enable_transaction()
self.liquidsoapcommunicator.open_conn(self.socket)
self.liquidsoapcommunicator.__send_lqc_command__(self.socket, "mixer", "volume", "0", "0")
......
import threading
import time
import sys
import redis
import random
import string
import simplejson
import traceback
from datetime import datetime
from threading import Event
from libraries.security.whitelist import AuraWhitelist
from libraries.security.user import AuraUser
from modules.scheduling.scheduler import AuraScheduler
from libraries.reporting.statestore import RedisStateStore
from libraries.reporting.messenger import RedisMessenger
class ServerRedisAdapter(threading.Thread, RedisMessenger):
debug = False
pubsub = None
config = None
redisdb = None
channel = ""
auracontroller = None
def __init__(self, debug=False):
threading.Thread.__init__(self)
RedisMessenger.__init__(self)
self.debug = debug
# init
threading.Thread.__init__ (self)
self.shutdown_event = Event()
self.channel = 'aura'
self.section = ''
self.rstore = RedisStateStore()
self.errnr = '00'
self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
self.fromMail = ''
self.adminMails = ''
def set_controller(self, auracontroller):
self.auracontroller = auracontroller
def set_config(self, config):
self.config = config
# ------------------------------------------------------------------------------------------ #
def run(self):
self.redisdb = redis.Redis()
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(self.channel)
print("waiting for REDIS message")
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
if item['data'] == "KILL":
break
else:
print(item)
self.work(item)
print("waiting for REDIS message")
self.pubsub.unsubscribe()
print(self, "unsubscribed from "+self.channel+" and finished")
def listen_for_one_message(self, channel):
self.redisdb = redis.Redis()
self.pubsub = self.redisdb.pubsub()
self.pubsub.subscribe(channel)
for item in self.pubsub.listen():
if item["type"] == "subscribe":
continue
item["channel"] = item["channel"].decode("utf-8")
if isinstance(item["data"], bytes):
item["data"] = item["data"].decode("utf-8")
self.pubsub.unsubscribe()
break
return item["data"]
def work(self, item):
if item["data"] == "fetch_new_programme":
self.auracontroller.fetch_new_programme()
elif item["data"] == "liquid_startup":
self.auracontroller.liquid_startup()
elif item["data"] == "get_act_programme":
self.auracontroller.get_act_programme()
else:
raise Exception("Cannot understand command: "+item["data"])
# ------------------------------------------------------------------------------------------ #
def join_comm(self):
try:
while self.is_alive():
print(str(datetime.now())+" joining")
self.join()
print("join out")
# if cnt % 30 == 0:
# print(datetime.datetime.now().isoformat())
# server.printLastMessages()
# cnt = 0
# cnt = cnt + 1
except (KeyboardInterrupt, SystemExit):
# Dem Server den Shutdown event setzen
# server.shutdown_event.set()
# Der Server wartet auf Eingabe
# Daher einen Client initiieren, der eine Nachricht schickt
self.halt()
sys.exit('Terminated')
# ------------------------------------------------------------------------------------------ #
def halt(self):
"""
Stop the server
"""
if self.shutdown_event.is_set():
return
try:
del self.auracontroller
except:
pass
self.shutdown_event.set()
result = 'failed'
try:
result = self.socket.unbind("tcp://"+self.ip+":"+self.port)
except:
pass
#self.socket.close()
# ------------------------------------------------------------------------------------------ #
def send(self, message):
"""
Send a message to the client
:param message: string
"""
if not self.can_send:
print("sending a "+str(len(message))+" long message via ZMQ.")
self.socket.send(message.encode("utf-8"))
self.can_send = False
else:
print("cannot send message via ZMQ: "+str(message))
# ------------------------------------------------------------------------------------------ #
@staticmethod
def get_accounts(self):
"""
Get accounts from redis db
:return: llist - a list of accounts
"""
accounts = AuraUser().getLogins()
db = redis.Redis()
internaccount = db.get('internAccess')
if not internaccount:
user = ''.join(random.sample(string.ascii_lowercase,10))
password = ''.join(random.sample(string.ascii_lowercase+string.ascii_uppercase+string.digits,22))
db.set('internAccess', user + ':' + password)
intern = [user, password]
else:
intern = internaccount.split(':')
accounts[intern[0]] = intern[1]
return accounts
class ClientRedisAdapter(RedisMessenger):
debug = False
def __init__(self, debug=True):
RedisMessenger.__init__(self)
self.debug = debug
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
self.rstore.publish(channel, message)
......@@ -24,6 +24,7 @@ from libraries.database.broadcasts import ScheduleEntry
#from modules.communication.liquidsoap.liquidcontroller import LiquidController
from modules.communication.liquidsoap.LiquidSoapCommunicator import LiquidSoapCommunicator
from modules.scheduling.scheduler import AuraScheduler
from modules.communication.redis.redisadapter import ClientRedisAdapter
......@@ -43,6 +44,7 @@ class AuraController():
config = None
sender = None
debug = False
redisclient = None
# Constructor
def __init__(self, config, debug):
......@@ -71,6 +73,8 @@ class AuraController():
self.scheduler = AuraScheduler(config, self.liquidsoapcommunicator, self.debug)
self.config = config
self.redisclient = ClientRedisAdapter()
# ------------------------------------------------------------------------------------------ #
def set_sender(self, sender):
"""
......@@ -118,6 +122,7 @@ class AuraController():
if response == "fetching_finished":
programme = acs.get_calendar_data()
self.scheduler.reload_programme()
self.redisclient.publish("fetch_new_programme_reply", simplejson.dumps(programme))
return programme
else:
print("Got an unknown response from AuraCalendarService: "+response)
......@@ -125,36 +130,28 @@ class AuraController():
# ------------------------------------------------------------------------------------------ #
def get_act_programme(self):
programme = None
try:
programme = self.scheduler.get_act_programme()
except NoProgrammeLoadedException as e:
print("WARNING: no programme in memory. i have to reload it!")
# refetch the programme from pv and importer
self.fetch_new_programme()
# is the recursion here really needed, or an additional error source?
return self.get_act_programme()
programme = self.get_act_programme()
self.success("00")
self.redisclient.publish("get_act_programme_reply", programme[0])
return programme
except Exception as e:
traceback.print_exc()
# exc_type, exc_obj, exc_tb = sys.exc_info()
# fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
# print(e.with_traceback(True)) # exc_type, fname, exc_tb.tb_lineno)
self.fatal("01")
else:
# entries = []
# for p in programme:
# entries.append(p["playlist"]["entries"])
# print(p)
# for p in programme["entries"]:
# print(p)
#retstring = json.dumps([p._asdict() for p in programme], default=alchemyencoder)
#retstring = json.dumps([p._asdict() for p in programme["entries"]], default=alchemyencoder)
#retstring = json.dumps(programme["entries"], default=alchemyencoder)
#retstring = json.dumps([p["playlist"]["entries"] for p in programme], default=alchemyencoder)
#print("WANNA SEND")
#print(retstring)
self.success("00")
self.redisclient.publish("get_act_programme_reply", programme[0])
return programme
# ------------------------------------------------------------------------------------------ #
......
......@@ -76,7 +76,8 @@ def fallback_create(~skip=true, name, requestor)
# Create the request.dynamic source
# Set conservative to true to queue
# several songs in advance
source = request.dynamic(conservative=true, length=400., id="pool_"^name, requestor, timeout=60.)
#source = request.dynamic(conservative=true, length=50., id="pool_"^name, requestor, timeout=60.)
source = request.dynamic(length=50., id="pool_"^name, requestor, timeout=60.)
# Apply normalization using replaygain information
source = amplify(1., override="replay_gain", source)
......
This diff is collapsed.
......@@ -60,6 +60,12 @@ def alchemyencoder(obj):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return float(obj)
elif isinstance(obj, bool):
if obj:
return "True"
return "False"
else:
return obj
"""
Comba Scheduler Class
......@@ -116,7 +122,7 @@ class AuraScheduler():
self.schedule_events = ScheduleEntry.query.filter()
print(self.schedule_events)
#print(self.schedule_events)
#errors_file = os.path.dirname(os.path.realpath(__file__)) + '/error/scheduler_error.js'
json_data = open(self.auraconfig.get("install_dir") + "/errormessages/scheduler_error.js")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment