Forked from
AURA / engine
1704 commits behind, 181 commits ahead of the upstream repository.
-
Christian Pointner authoredChristian Pointner authored
statestore.py 11.54 KiB
#
# engine
#
# Playout Daemon for autoradio project
#
#
# Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
# This file is part of engine.
#
# engine is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# engine is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with engine. If not, see <http://www.gnu.org/licenses/>.
#
import redis
import time
import datetime
import json
import re
import uuid
class RedisStateStore(object):
"""Store and get Reports from redis"""
def __init__(self, config, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
self.channel = '*'
self.section = '*'
self.separator = '_'
self.daily = False
# ------------------------------------------------------------------------------------------ #
def set_channel(self, channel):
"""
Kanal setzen
@type channel: string
@param channel: Kanal
"""
self.channel = channel
# ------------------------------------------------------------------------------------------ #
def set_section(self, section):
"""
Sektion setzen
@type section: string
@param section: Sektion
"""
self.section = section
# ------------------------------------------------------------------------------------------ #
def set_alive_state(self):
"""
Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
"""
self.set_state('alive', 'Hi', 21)
# ------------------------------------------------------------------------------------------ #
def get_alive_state(self, channel):
"""
Alive Status eines Channels ermitteln
@type channel: string
@param channel: der Channel
@rtype: string/None
@return: Ein String, oder None, bei negativem Ergebnis
"""
return self.get_state('alive', channel)
# ------------------------------------------------------------------------------------------ #
def set_state(self, name, value, expires=None, channel=None):
"""
Setzt einen Status
@type name: string
@param name: Name des state
@type value: string
@param value: Wert
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
key = self.__create_key__(channel + 'State', name)
if value == "":
self.db.delete(key)
else:
# publish on channel
message = json.dumps({'eventname':name, 'value': value})
self.db.publish(channel + 'Publish', message)
# store in database
self.db.set(key, value)
if(expires):
self.db.expire(key, 21)
# ------------------------------------------------------------------------------------------ #
def get_state(self, name, channel):
"""
Holt einen Status
@type name: string
@param name: Name des state
@type channel: string
@param channel: Kanal (optional)
"""
key = self.__create_key__(channel + 'State', name)
return self.db.get(key)
# ------------------------------------------------------------------------------------------ #
def queue_add_event(self, eventtime, name, value, channel=None):
"""
Kündigt einen Event an
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
# ------------------------------------------------------------------------------------------ #
def queue_remove_events(self, name=None, channel=None):
"""
Löscht Events
@type name: string
@param name: Name des Events
@type channel: string
@param channel: Kanal (optional)
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
for delkey in keys:
self.db.delete(delkey)
# ------------------------------------------------------------------------------------------ #
def fire_event(self, name, value, channel=None):
"""
Feuert einen Event
@type name: string
@param name: Name des Events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
# ------------------------------------------------------------------------------------------ #
def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
"""
Feuert einen Event
@type eventtime: string
@param eventtime: Datum und Zeit des events
@type value: dict
@param value: Werte
@type channel: string
@param channel: Kanal (optional)
"""
if not channel:
channel = self.channel
timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
key = self.__create_key__(channel + type, eventtime, name)
value['starts'] = eventtime[0:16]
value['eventchannel'] = channel
value['eventname'] = name
self.db.hset(key, namespace, value)
self.db.expire(key, expire)
# ------------------------------------------------------------------------------------------ #
def get_event_queue(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Evqueue_' if channel else '*Evqueue_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'evqueue')
return entries
# ------------------------------------------------------------------------------------------ #
def get_events(self, name=None, channel=None):
"""
Holt events eines Kanals
@type channel: string
@param channel: Kanal (optional)
@rtype: list
@return: Liste der Events
"""
query = channel + 'Event_' if channel else '*Event_'
query = query + '*_' + name if name else query + '*_*'
keys = self.db.keys(query)
keys.sort()
entries = self.__get_entries__(keys, 'events')
return entries
# ------------------------------------------------------------------------------------------ #
def get_next_event(self, name=None, channel=None):
"""
Holt den aktuellsten Event
@type channel: string
@param channel: Kanal (optional)
@rtype: dict/boolean
@return: ein Event oder False
"""
events = self.get_event_queue(name, channel)
if len(events) > 0:
result = events.pop(0)
else:
result = False
return result
# ------------------------------------------------------------------------------------------ #
def store(self, level, value):
"""
Hash speichern
@type level: string
@param level: der errorlevel
@type value: dict
@param value: Werte als dict
"""
microtime = str(time.time())
value['microtime'] = microtime
value['level'] = level
key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
self.db.hset(key, self.channel, value)
self.db.expire(key, 864000)
# ------------------------------------------------------------------------------------------ #
def __get_keys__(self, level ='*'):
"""
Redis-Keys nach Suchkriterium ermitteln
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Die Keys auf die das Suchkriterium zutrifft
"""
key = self.__create_key__(self.channel, self.section, level)
microtime = str(time.time())
search = microtime[0:4] + '*' if self.daily else '*'
return self.db.keys(key + self.separator + '*')
# ------------------------------------------------------------------------------------------ #
def __create_key__(self, *args):
"""
Key erschaffen - beliebig viele Argumente
@rtype: string
@return: Der key
"""
return self.separator.join(args)
def get_entries(self, level ='*'):
"""
Liste von Hashs nach Suchkriterium erhalten
@type level: string
@param level: einen Errorlevel filtern
@rtype: list
@return: Redis Hashs
"""
def tsort(x,y):
if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
return 1
elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
return -1
else:
return 0
keys = self.__get_keys__(level)
keys.sort(tsort)
entries = self.__get_entries__(keys, self.channel)
entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
return entries
# ------------------------------------------------------------------------------------------ #
def __get_entries__(self, keys, channel):
entries = []
for key in keys:
entry = self.db.hget(key,channel)
entry = json.dumps(entry.decode('utf-8'))
if not (entry is None):
try:
entry = entry.decode('utf-8').replace('None','"None"')
entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
entry = json.loads(entry)
entry['key'] = key
entries.append(entry)
except:
pass
return entries
# ------------------------------------------------------------------------------------------ #
def publish(self, channel, message):
subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)
if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
raise Exception("No subscriber! Is Aura daemon running?")
self.db.publish(channel, message)