Skip to content
Snippets Groups Projects
Forked from AURA / engine
1704 commits behind, 181 commits ahead of the upstream repository.
statestore.py 11.54 KiB
#
#  engine
#
#  Playout Daemon for autoradio project
#
#
#  Copyright (C) 2017-2018 Gottfried Gaisbauer <gottfried.gaisbauer@servus.at>
#
#  This file is part of engine.
#
#  engine is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  any later version.
#
#  engine is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with engine. If not, see <http://www.gnu.org/licenses/>.
#

import redis
import time
import datetime
import json
import re
import uuid


class RedisStateStore(object):

    """Store and get Reports from redis"""

    def __init__(self, config, **redis_kwargs):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
        self.channel = '*'
        self.section = '*'
        self.separator = '_'
        self.daily = False

    # ------------------------------------------------------------------------------------------ #
    def set_channel(self, channel):
        """
        Kanal setzen
        @type channel: string
        @param channel: Kanal
        """
        self.channel = channel

    # ------------------------------------------------------------------------------------------ #
    def set_section(self, section):
        """
        Sektion setzen
        @type section: string
        @param section: Sektion
        """
        self.section = section

    # ------------------------------------------------------------------------------------------ #
    def set_alive_state(self):
        """
        Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
        """
        self.set_state('alive', 'Hi', 21)

    # ------------------------------------------------------------------------------------------ #
    def get_alive_state(self, channel):
        """
        Alive Status eines Channels ermitteln
        @type channel:  string
        @param channel: der Channel
        @rtype: string/None
        @return: Ein String, oder None, bei negativem Ergebnis
        """
        return self.get_state('alive', channel)

    # ------------------------------------------------------------------------------------------ #
    def set_state(self, name, value, expires=None, channel=None):
        """
        Setzt einen Status
        @type name: string
        @param name: Name des state
        @type value: string
        @param value: Wert
        @type channel: string
        @param channel: Kanal (optional)
        """
        if not channel:
            channel = self.channel

        key = self.__create_key__(channel + 'State', name)

        if value == "":
            self.db.delete(key)
        else:
            # publish on channel
            message = json.dumps({'eventname':name, 'value': value})
            self.db.publish(channel + 'Publish', message)
            # store in database
            self.db.set(key, value)
            if(expires):
                self.db.expire(key, 21)

    # ------------------------------------------------------------------------------------------ #
    def get_state(self, name, channel):
        """
        Holt einen Status
        @type name: string
        @param name: Name des state
        @type channel: string
        @param channel: Kanal (optional)
        """
        key = self.__create_key__(channel + 'State', name)
        return self.db.get(key)

    # ------------------------------------------------------------------------------------------ #
    def queue_add_event(self, eventtime, name, value, channel=None):
        """
        Kündigt einen Event an
        @type eventtime: string
        @param eventtime: Datum und Zeit des events
        @type name: string
        @param name: Name des Events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
        expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
        self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)

    # ------------------------------------------------------------------------------------------ #
    def queue_remove_events(self, name=None, channel=None):
        """
        Löscht Events
        @type name: string
        @param name: Name des Events
        @type channel: string
        @param channel: Kanal (optional)
        """
        query = channel + 'Evqueue_' if channel else '*Evqueue_'
        query = query + '*_' + name  if name else query + '*_*'

        keys = self.db.keys(query)

        for delkey in keys:
            self.db.delete(delkey)

    # ------------------------------------------------------------------------------------------ #
    def fire_event(self, name, value, channel=None):
        """
        Feuert einen Event
        @type name: string
        @param name: Name des Events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
        self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)

    # ------------------------------------------------------------------------------------------ #
    def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
        """
        Feuert einen Event
        @type eventtime: string
        @param eventtime: Datum und Zeit des events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        if not channel:
            channel = self.channel

        timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
        key = self.__create_key__(channel + type, eventtime, name)

        value['starts'] = eventtime[0:16]
        value['eventchannel'] = channel
        value['eventname'] = name
        self.db.hset(key, namespace, value)
        self.db.expire(key, expire)

    # ------------------------------------------------------------------------------------------ #
    def get_event_queue(self, name=None, channel=None):
        """
        Holt events eines Kanals
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: list
        @return: Liste der Events
        """
        query = channel + 'Evqueue_' if channel else '*Evqueue_'
        query = query + '*_' + name  if name else query + '*_*'
        keys = self.db.keys(query)
        keys.sort()
        entries = self.__get_entries__(keys, 'evqueue')
        return entries

    # ------------------------------------------------------------------------------------------ #
    def get_events(self, name=None, channel=None):
        """
        Holt events eines Kanals
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: list
        @return: Liste der Events
        """
        query = channel + 'Event_' if channel else '*Event_'
        query = query + '*_' + name  if name else query + '*_*'
        keys = self.db.keys(query)
        keys.sort()
        entries = self.__get_entries__(keys, 'events')
        return entries

    # ------------------------------------------------------------------------------------------ #
    def get_next_event(self, name=None, channel=None):
        """
        Holt den aktuellsten Event
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: dict/boolean
        @return: ein Event oder False
        """
        events = self.get_event_queue(name, channel)
        if len(events) > 0:
            result = events.pop(0)
        else:
            result = False

        return result

    # ------------------------------------------------------------------------------------------ #
    def store(self, level, value):
        """
        Hash speichern
        @type level: string
        @param level: der errorlevel
        @type value: dict
        @param value: Werte als dict
        """
        microtime = str(time.time())
        value['microtime'] = microtime
        value['level'] = level
        key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
        self.db.hset(key, self.channel, value)
        self.db.expire(key, 864000)

    # ------------------------------------------------------------------------------------------ #
    def __get_keys__(self, level ='*'):
        """
        Redis-Keys nach Suchkriterium ermitteln
        @type level: string
        @param level: einen Errorlevel filtern
        @rtype: list
        @return: Die Keys auf die das Suchkriterium zutrifft
        """
        key = self.__create_key__(self.channel, self.section, level)
        microtime = str(time.time())
        search = microtime[0:4] + '*' if self.daily else '*'
        return self.db.keys(key + self.separator + '*')

    # ------------------------------------------------------------------------------------------ #
    def __create_key__(self, *args):
        """
        Key erschaffen - beliebig viele Argumente
        @rtype: string
        @return: Der key
        """
        return self.separator.join(args)

    def get_entries(self, level ='*'):
        """
        Liste von Hashs nach Suchkriterium erhalten
        @type level: string
        @param level: einen Errorlevel filtern
        @rtype: list
        @return: Redis Hashs
        """
        def tsort(x,y):

            if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
                return 1
            elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
                return -1
            else:
                return 0

        keys = self.__get_keys__(level)

        keys.sort(tsort)
        entries = self.__get_entries__(keys, self.channel)
        entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
        return entries

    # ------------------------------------------------------------------------------------------ #
    def __get_entries__(self, keys, channel):
        entries = []
        for key in keys:
            entry = self.db.hget(key,channel)
            entry = json.dumps(entry.decode('utf-8'))

            if not (entry is None):
                try:
                    entry = entry.decode('utf-8').replace('None','"None"')
                    entry =  re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
                    entry = json.loads(entry)
                    entry['key'] = key
                    entries.append(entry)
                except:
                    pass

        return entries

    # ------------------------------------------------------------------------------------------ #
    def publish(self, channel, message):
        subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)

        if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
            raise Exception("No subscriber! Is Aura daemon running?")

        self.db.publish(channel, message)