# # Aura Engine (https://gitlab.servus.at/aura/engine) # # Copyright (C) 2017-2020 - The Aura Engine Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time import datetime import json import re import uuid import redis class RedisStateStore(object): """Store and get Reports from redis""" def __init__(self, config, **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db")) self.channel = '*' self.section = '*' self.separator = '_' self.daily = False # ------------------------------------------------------------------------------------------ # def set_channel(self, channel): """ Kanal setzen @type channel: string @param channel: Kanal """ self.channel = channel # ------------------------------------------------------------------------------------------ # def set_section(self, section): """ Sektion setzen @type section: string @param section: Sektion """ self.section = section # ------------------------------------------------------------------------------------------ # def set_alive_state(self): """ Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist """ self.set_state('alive', 'Hi', 21) # ------------------------------------------------------------------------------------------ # def get_alive_state(self, channel): """ Alive Status eines Channels ermitteln @type channel: string @param channel: der Channel @rtype: string/None @return: Ein String, oder None, bei negativem Ergebnis """ return self.get_state('alive', channel) # ------------------------------------------------------------------------------------------ # def set_state(self, name, value, expires=None, channel=None): """ Setzt einen Status @type name: string @param name: Name des state @type value: string @param value: Wert @type channel: string @param channel: Kanal (optional) """ if not channel: channel = self.channel key = self.__create_key__(channel + 'State', name) if value == "": self.db.delete(key) else: # publish on channel message = json.dumps({'eventname':name, 'value': value}) self.db.publish(channel + 'Publish', message) # store in database self.db.set(key, value) if(expires): self.db.expire(key, 21) # ------------------------------------------------------------------------------------------ # def get_state(self, name, channel): """ Holt einen Status @type name: string @param name: Name des state @type channel: string @param channel: Kanal (optional) """ key = self.__create_key__(channel + 'State', name) return self.db.get(key) # ------------------------------------------------------------------------------------------ # def queue_add_event(self, eventtime, name, value, channel=None): """ Kündigt einen Event an @type eventtime: string @param eventtime: Datum und Zeit des events @type name: string @param name: Name des Events @type value: dict @param value: Werte @type channel: string @param channel: Kanal (optional) """ timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M") expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60 self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel) # ------------------------------------------------------------------------------------------ # def queue_remove_events(self, name=None, channel=None): """ Löscht Events @type name: string @param name: Name des Events @type channel: string @param channel: Kanal (optional) """ query = channel + 'Evqueue_' if channel else '*Evqueue_' query = query + '*_' + name if name else query + '*_*' keys = self.db.keys(query) for delkey in keys: self.db.delete(delkey) # ------------------------------------------------------------------------------------------ # def fire_event(self, name, value, channel=None): """ Feuert einen Event @type name: string @param name: Name des Events @type value: dict @param value: Werte @type channel: string @param channel: Kanal (optional) """ eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M") self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel) # ------------------------------------------------------------------------------------------ # def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None): """ Feuert einen Event @type eventtime: string @param eventtime: Datum und Zeit des events @type value: dict @param value: Werte @type channel: string @param channel: Kanal (optional) """ if not channel: channel = self.channel timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M") key = self.__create_key__(channel + type, eventtime, name) value['starts'] = eventtime[0:16] value['eventchannel'] = channel value['eventname'] = name self.db.hset(key, namespace, value) self.db.expire(key, expire) # ------------------------------------------------------------------------------------------ # def get_event_queue(self, name=None, channel=None): """ Holt events eines Kanals @type channel: string @param channel: Kanal (optional) @rtype: list @return: Liste der Events """ query = channel + 'Evqueue_' if channel else '*Evqueue_' query = query + '*_' + name if name else query + '*_*' keys = self.db.keys(query) keys.sort() entries = self.__get_entries__(keys, 'evqueue') return entries # ------------------------------------------------------------------------------------------ # def get_events(self, name=None, channel=None): """ Holt events eines Kanals @type channel: string @param channel: Kanal (optional) @rtype: list @return: Liste der Events """ query = channel + 'Event_' if channel else '*Event_' query = query + '*_' + name if name else query + '*_*' keys = self.db.keys(query) keys.sort() entries = self.__get_entries__(keys, 'events') return entries # ------------------------------------------------------------------------------------------ # def get_next_event(self, name=None, channel=None): """ Holt den aktuellsten Event @type channel: string @param channel: Kanal (optional) @rtype: dict/boolean @return: ein Event oder False """ events = self.get_event_queue(name, channel) if len(events) > 0: result = events.pop(0) else: result = False return result # ------------------------------------------------------------------------------------------ # def store(self, level, value): """ Hash speichern @type level: string @param level: der errorlevel @type value: dict @param value: Werte als dict """ microtime = str(time.time()) value['microtime'] = microtime value['level'] = level key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1())) self.db.hset(key, self.channel, value) self.db.expire(key, 864000) # ------------------------------------------------------------------------------------------ # def __get_keys__(self, level ='*'): """ Redis-Keys nach Suchkriterium ermitteln @type level: string @param level: einen Errorlevel filtern @rtype: list @return: Die Keys auf die das Suchkriterium zutrifft """ key = self.__create_key__(self.channel, self.section, level) microtime = str(time.time()) search = microtime[0:4] + '*' if self.daily else '*' return self.db.keys(key + self.separator + '*') # ------------------------------------------------------------------------------------------ # def __create_key__(self, *args): """ Key erschaffen - beliebig viele Argumente @rtype: string @return: Der key """ return self.separator.join(args) def get_entries(self, level ='*'): """ Liste von Hashs nach Suchkriterium erhalten @type level: string @param level: einen Errorlevel filtern @rtype: list @return: Redis Hashs """ def tsort(x,y): if float(x.split('_',4)[3]) > float(y.split('_',4)[3]): return 1 elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]): return -1 else: return 0 keys = self.__get_keys__(level) keys.sort(tsort) entries = self.__get_entries__(keys, self.channel) entries = sorted(entries, key=lambda k: k['microtime'], reverse=True) return entries # ------------------------------------------------------------------------------------------ # def __get_entries__(self, keys, channel): entries = [] for key in keys: entry = self.db.hget(key,channel) entry = json.dumps(entry.decode('utf-8')) if not (entry is None): try: entry = entry.decode('utf-8').replace('None','"None"') entry = re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"") entry = json.loads(entry) entry['key'] = key entries.append(entry) except: pass return entries # ------------------------------------------------------------------------------------------ # def publish(self, channel, message): subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel) if channel.lower().find("reply") < 0 and subscriber_count[1] == 0: raise Exception("No subscriber! Is Aura daemon running?") self.db.publish(channel, message)