statestore.py 11.5 KB
Newer Older
1
#
David Trattnig's avatar
David Trattnig committed
2
# Aura Engine (https://gitlab.servus.at/aura/engine)
3
#
David Trattnig's avatar
David Trattnig committed
4
# Copyright (C) 2017-2020 - The Aura Engine Team.
5
#
David Trattnig's avatar
David Trattnig committed
6
7
8
9
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
10
#
David Trattnig's avatar
David Trattnig committed
11
12
13
14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
15
#
David Trattnig's avatar
David Trattnig committed
16
17
18
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

19
20
21
22

import redis
import time
import datetime
23
import json
24
25
import re
import uuid
26

27
28
29
30
31

class RedisStateStore(object):

    """Store and get Reports from redis"""

32
    def __init__(self, config, **redis_kwargs):
33
        """The default connection parameters are: host='localhost', port=6379, db=0"""
34
        self.db = redis.Redis(host=config.get("redis_host"), port=config.get("redis_port"), db=config.get("redis_db"))
35
36
37
38
39
40
        self.channel = '*'
        self.section = '*'
        self.separator = '_'
        self.daily = False

    # ------------------------------------------------------------------------------------------ #
41
    def set_channel(self, channel):
42
43
44
45
46
47
48
49
        """
        Kanal setzen
        @type channel: string
        @param channel: Kanal
        """
        self.channel = channel

    # ------------------------------------------------------------------------------------------ #
50
    def set_section(self, section):
51
52
53
54
55
56
57
58
        """
        Sektion setzen
        @type section: string
        @param section: Sektion
        """
        self.section = section

    # ------------------------------------------------------------------------------------------ #
59
    def set_alive_state(self):
60
61
62
        """
        Alive Funktion - alle 20 Sekunden melden, dass man noch am Leben ist
        """
63
        self.set_state('alive', 'Hi', 21)
64
65

    # ------------------------------------------------------------------------------------------ #
66
    def get_alive_state(self, channel):
67
68
69
70
71
72
73
        """
        Alive Status eines Channels ermitteln
        @type channel:  string
        @param channel: der Channel
        @rtype: string/None
        @return: Ein String, oder None, bei negativem Ergebnis
        """
74
        return self.get_state('alive', channel)
75
76

    # ------------------------------------------------------------------------------------------ #
77
    def set_state(self, name, value, expires=None, channel=None):
78
79
80
81
82
83
84
85
86
87
88
89
        """
        Setzt einen Status
        @type name: string
        @param name: Name des state
        @type value: string
        @param value: Wert
        @type channel: string
        @param channel: Kanal (optional)
        """
        if not channel:
            channel = self.channel

90
        key = self.__create_key__(channel + 'State', name)
91
92
93
94
95

        if value == "":
            self.db.delete(key)
        else:
            # publish on channel
96
            message = json.dumps({'eventname':name, 'value': value})
97
98
99
100
101
102
103
            self.db.publish(channel + 'Publish', message)
            # store in database
            self.db.set(key, value)
            if(expires):
                self.db.expire(key, 21)

    # ------------------------------------------------------------------------------------------ #
104
    def get_state(self, name, channel):
105
106
107
108
109
110
111
        """
        Holt einen Status
        @type name: string
        @param name: Name des state
        @type channel: string
        @param channel: Kanal (optional)
        """
112
        key = self.__create_key__(channel + 'State', name)
113
114
115
        return self.db.get(key)

    # ------------------------------------------------------------------------------------------ #
116
    def queue_add_event(self, eventtime, name, value, channel=None):
117
118
119
120
121
122
123
124
125
126
127
128
129
        """
        Kündigt einen Event an
        @type eventtime: string
        @param eventtime: Datum und Zeit des events
        @type name: string
        @param name: Name des Events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
        expire = int(time.mktime(timeevent.timetuple()) - time.time()) + 60
130
        self.__set_event__(name, eventtime, value, 'Evqueue', 'evqueue', expire, channel)
131
132

    # ------------------------------------------------------------------------------------------ #
133
    def queue_remove_events(self, name=None, channel=None):
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
        """
        Löscht Events
        @type name: string
        @param name: Name des Events
        @type channel: string
        @param channel: Kanal (optional)
        """
        query = channel + 'Evqueue_' if channel else '*Evqueue_'
        query = query + '*_' + name  if name else query + '*_*'

        keys = self.db.keys(query)

        for delkey in keys:
            self.db.delete(delkey)

    # ------------------------------------------------------------------------------------------ #
150
    def fire_event(self, name, value, channel=None):
151
152
153
154
155
156
157
158
159
160
        """
        Feuert einen Event
        @type name: string
        @param name: Name des Events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        eventtime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
161
        self.__set_event__(name, eventtime, value, 'Event', 'events', 60, channel)
162
163

    # ------------------------------------------------------------------------------------------ #
164
    def __set_event__(self, name, eventtime, value, type, namespace, expire, channel=None):
165
166
167
168
169
170
171
172
173
174
175
176
177
        """
        Feuert einen Event
        @type eventtime: string
        @param eventtime: Datum und Zeit des events
        @type value: dict
        @param value: Werte
        @type channel: string
        @param channel: Kanal (optional)
        """
        if not channel:
            channel = self.channel

        timeevent = datetime.datetime.strptime(eventtime[0:16],"%Y-%m-%dT%H:%M")
178
        key = self.__create_key__(channel + type, eventtime, name)
179
180
181
182
183
184
185
186

        value['starts'] = eventtime[0:16]
        value['eventchannel'] = channel
        value['eventname'] = name
        self.db.hset(key, namespace, value)
        self.db.expire(key, expire)

    # ------------------------------------------------------------------------------------------ #
187
    def get_event_queue(self, name=None, channel=None):
188
189
190
191
192
193
194
195
196
197
198
        """
        Holt events eines Kanals
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: list
        @return: Liste der Events
        """
        query = channel + 'Evqueue_' if channel else '*Evqueue_'
        query = query + '*_' + name  if name else query + '*_*'
        keys = self.db.keys(query)
        keys.sort()
199
        entries = self.__get_entries__(keys, 'evqueue')
200
201
202
        return entries

    # ------------------------------------------------------------------------------------------ #
203
    def get_events(self, name=None, channel=None):
204
205
206
207
208
209
210
211
212
213
214
        """
        Holt events eines Kanals
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: list
        @return: Liste der Events
        """
        query = channel + 'Event_' if channel else '*Event_'
        query = query + '*_' + name  if name else query + '*_*'
        keys = self.db.keys(query)
        keys.sort()
215
        entries = self.__get_entries__(keys, 'events')
216
217
218
        return entries

    # ------------------------------------------------------------------------------------------ #
219
    def get_next_event(self, name=None, channel=None):
220
221
222
223
224
225
226
        """
        Holt den aktuellsten Event
        @type channel: string
        @param channel: Kanal (optional)
        @rtype: dict/boolean
        @return: ein Event oder False
        """
227
        events = self.get_event_queue(name, channel)
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
        if len(events) > 0:
            result = events.pop(0)
        else:
            result = False

        return result

    # ------------------------------------------------------------------------------------------ #
    def store(self, level, value):
        """
        Hash speichern
        @type level: string
        @param level: der errorlevel
        @type value: dict
        @param value: Werte als dict
        """
        microtime = str(time.time())
        value['microtime'] = microtime
        value['level'] = level
247
        key = self.__create_key__(self.channel, self.section, level, microtime, str(uuid.uuid1()))
248
249
250
251
        self.db.hset(key, self.channel, value)
        self.db.expire(key, 864000)

    # ------------------------------------------------------------------------------------------ #
252
    def __get_keys__(self, level ='*'):
253
254
255
256
257
258
259
        """
        Redis-Keys nach Suchkriterium ermitteln
        @type level: string
        @param level: einen Errorlevel filtern
        @rtype: list
        @return: Die Keys auf die das Suchkriterium zutrifft
        """
260
        key = self.__create_key__(self.channel, self.section, level)
261
262
263
264
265
        microtime = str(time.time())
        search = microtime[0:4] + '*' if self.daily else '*'
        return self.db.keys(key + self.separator + '*')

    # ------------------------------------------------------------------------------------------ #
266
    def __create_key__(self, *args):
267
268
269
270
271
272
273
        """
        Key erschaffen - beliebig viele Argumente
        @rtype: string
        @return: Der key
        """
        return self.separator.join(args)

274
    def get_entries(self, level ='*'):
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
        """
        Liste von Hashs nach Suchkriterium erhalten
        @type level: string
        @param level: einen Errorlevel filtern
        @rtype: list
        @return: Redis Hashs
        """
        def tsort(x,y):

            if float(x.split('_',4)[3]) > float(y.split('_',4)[3]):
                return 1
            elif float(x.split('_',4)[3]) < float(y.split('_',4)[3]):
                return -1
            else:
                return 0

291
        keys = self.__get_keys__(level)
292
293

        keys.sort(tsort)
294
        entries = self.__get_entries__(keys, self.channel)
295
296
297
298
        entries = sorted(entries, key=lambda k: k['microtime'], reverse=True)
        return entries

    # ------------------------------------------------------------------------------------------ #
299
    def __get_entries__(self, keys, channel):
300
301
302
        entries = []
        for key in keys:
            entry = self.db.hget(key,channel)
303
            entry = json.dumps(entry.decode('utf-8'))
304
305
306
307
308

            if not (entry is None):
                try:
                    entry = entry.decode('utf-8').replace('None','"None"')
                    entry =  re.sub("########[^]]*########", lambda x:x.group(0).replace('\"','').replace('\'',''),entry.replace("\\\"","########").replace("\\'","++++++++").replace("'",'"').replace('u"','"').replace('"{','{').replace('}"','}')).replace("########","\"")
309
                    entry = json.loads(entry)
310
311
312
313
314
315
                    entry['key'] = key
                    entries.append(entry)
                except:
                    pass

        return entries
316
317
318
319
320

    # ------------------------------------------------------------------------------------------ #
    def publish(self, channel, message):
        subscriber_count = self.db.execute_command('PUBSUB', 'NUMSUB', channel)

321
322
        if channel.lower().find("reply") < 0 and subscriber_count[1] == 0:
            raise Exception("No subscriber! Is Aura daemon running?")
323
324
325

        self.db.publish(channel, message)