adapter.py 10.7 KB
Newer Older
1
#
David Trattnig's avatar
David Trattnig committed
2
# Aura Engine (https://gitlab.servus.at/aura/engine)
3
#
David Trattnig's avatar
David Trattnig committed
4
# Copyright (C) 2017-2020 - The Aura Engine Team.
5
#
David Trattnig's avatar
David Trattnig committed
6
7
8
9
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
10
#
David Trattnig's avatar
David Trattnig committed
11
12
13
14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
15
#
David Trattnig's avatar
David Trattnig committed
16
17
18
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

19

20
import sys
21
import time
David Trattnig's avatar
David Trattnig committed
22
import json
23
import redis
24
import logging
25
import threading
26
27
28

from datetime import datetime
from threading import Event
29

30
from modules.communication.redis.messenger import RedisMessenger
David Trattnig's avatar
David Trattnig committed
31
from modules.communication.redis.statestore import RedisStateStore
32
# from modules.communication.connection_tester import ConnectionTester
33
from modules.base.exceptions import RedisConnectionException
34
35
from modules.base.enum import RedisChannel, FallbackType
from modules.base.utils import TerminalColors
36
37


38
# ------------------------------------------------------------------------------------------ #
39
40
41
42
43
44
class ServerRedisAdapter(threading.Thread, RedisMessenger):
    debug = False
    pubsub = None
    config = None
    redisdb = None
    channel = ""
45
    scheduler = None
46
    redisclient = None
47
#    connection_tester = None
David Trattnig's avatar
David Trattnig committed
48
    soundsystem = None
49
50
    socket = None
    
51
    # ------------------------------------------------------------------------------------------ #
52
    def __init__(self, config):
53
        threading.Thread.__init__(self)
54
        RedisMessenger.__init__(self, config)
55
56

        # init
57
58
        #threading.Thread.__init__ (self)
        self.config = config
59
60
        self.shutdown_event = Event()

61
        self.channel = RedisChannel.STANDARD.value
62
        self.section = ''
63
        self.rstore = RedisStateStore(config)
64
65
66
67
        self.errnr = '00'
        self.components = {'controller':'01', 'scheduling':'02', 'playd':'03', 'recorder':'04', 'helpers':'09'}
        self.fromMail = ''
        self.adminMails = ''
68
        self.can_send = None
69
        self.redisclient = ClientRedisAdapter(config)
70
#        self.connection_tester = ConnectionTester()
71

72
73
    # ------------------------------------------------------------------------------------------ #
    def run(self):
74
        self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"))
75
76
77
        self.pubsub = self.redisdb.pubsub()
        self.pubsub.subscribe(self.channel)

78
        self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
79

80
        # listener loop
81
82
83
84
        for item in self.pubsub.listen():
            if item["type"] == "subscribe":
                continue

85
            self.logger.debug(TerminalColors.YELLOW.value + "received REDIS message: " + TerminalColors.ENDC.value + str(item))
86

87
88
            item["channel"] = self.decode_if_needed(item["channel"])
            item["data"] = self.decode_if_needed(item["data"])
89
90

            try:
91
                self.work(item)
92
            except RedisConnectionException as rce:
93
                self.logger.error(str(rce))
94

95
            if not self.shutdown_event.is_set():
96
                self.logger.debug(TerminalColors.YELLOW.value + "waiting for REDIS message on channel " + self.channel + TerminalColors.ENDC.value)
97
98

        self.pubsub.unsubscribe()
99
100
101

        if not self.shutdown_event.is_set():
            self.logger.warning("unsubscribed from " + self.channel + " and finished")
102

103
104
105
106
107
108
    # ------------------------------------------------------------------------------------------ #
    def decode_if_needed(self, val):
        if isinstance(val, bytes):
            return val.decode("utf-8")
        return val

109
110
    # ------------------------------------------------------------------------------------------ #
    def listen_for_one_message(self, channel, socket_timeout=2):
111
        self.redisdb = redis.Redis(host=self.config.get("redis_host"), port=self.config.get("redis_port"), db=self.config.get("redis_db"), socket_timeout=socket_timeout)
112
113
114
        self.pubsub = self.redisdb.pubsub()
        self.pubsub.subscribe(channel)

115
        try:
116
            self.logger.debug("I am listening on channel '"+channel+"' for "+str(socket_timeout)+" seconds")
117

118
119
120
121
122
123
            for item in self.pubsub.listen():
                it = self.receive_message(item)
                if it is not None:
                    break
        except redis.exceptions.TimeoutError as te:
            raise te
124
125
126

        return item["data"]

127
128
129
    # ------------------------------------------------------------------------------------------ #
    def receive_message(self, item):
        if item["type"] == "subscribe":
130
            self.logger.info("i am subscribed to channel " + item["channel"].decode("utf-8"))
131
132
133
134
135
136
137
138
139
140
            return None

        item["channel"] = item["channel"].decode("utf-8")
        if isinstance(item["data"], bytes):
            item["data"] = item["data"].decode("utf-8")

        self.pubsub.unsubscribe()
        return item

    # ------------------------------------------------------------------------------------------ #
141
142
    def work(self, item):
        if item["data"] == "fetch_new_programme":
143
144
            #self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.fetch_new_programme)
            self.execute(RedisChannel.FNP_REPLY.value, self.scheduler.get_act_programme_as_string)
145

146
        elif item["data"] == "shutdown":
147
            self.terminate()
148

149
        elif item["data"] == "init_player":
David Trattnig's avatar
David Trattnig committed
150
            self.execute(RedisChannel.IP_REPLY.value, self.soundsystem.init_player)
151
152

        elif item["data"] == "get_act_programme":
153
            self.execute(RedisChannel.GAP_REPLY.value, self.scheduler.get_act_programme_as_string)
154

David Trattnig's avatar
David Trattnig committed
155
156
157
        elif item["data"] == "get_status":
            def get_status_string():
                status = self.soundsystem.monitoring.get_status()
David Trattnig's avatar
David Trattnig committed
158
                return json.dumps(status)
David Trattnig's avatar
David Trattnig committed
159
160
161

            self.execute(RedisChannel.GS_REPLY.value, get_status_string)

162
163
#        elif item["data"] == "get_connection_status":
#            self.execute(RedisChannel.GCS_REPLY.value, self.connection_tester.get_connection_status)
164

165
        elif item["data"] == "print_message_queue":
166
            self.execute(RedisChannel.PMQ_REPLY.value, self.scheduler.print_message_queue)
167

168
169
        elif item["data"].find("set_next_file") >= 0:
            playlist = item["data"].split()[1]
170
            playlist = playlist[0:len(playlist)-8]
171
            self.execute(RedisChannel.SNF_REPLY.value, self.scheduler.set_next_file_for, playlist)
172

173
174
175
176
177
        elif item["data"].find("get_next_file") >= 0:
            playlist = item["data"].split()[1]
            #playlist = playlist[0:len(playlist)-8]
            self.execute(RedisChannel.GNF_REPLY.value, self.scheduler.get_next_file_for, playlist)

178
179
        elif item["data"].find("on_play") >= 0:
            source = item["data"].split("on_play ")[1]
David Trattnig's avatar
David Trattnig committed
180
            self.execute(RedisChannel.TS_REPLY.value, self.scheduler.soundsystem.on_play, source)
181

182
183
184
        elif item["data"] == "recreate_db":
            self.execute(RedisChannel.RDB_REPLY.value, self.scheduler.recreate_database)

David Trattnig's avatar
David Trattnig committed
185
186
        elif item["data"] == "status":
            return True
187

188
        else:
189
190
191
            raise RedisConnectionException("ServerRedisAdapter Cannot understand command: " + item["data"])

    # ------------------------------------------------------------------------------------------ #
192
193
194
195
196
197
198
    def execute(self, channel, f, param1=None, param2=None, param3=None):
        if param1 != None:
            if param2 != None:
                if param3 != None:
                    reply = f(param1, param2, param3)
                else:
                    reply = f(param1, param2)
199
200
            else:
                reply = f(param1)
201
202
        else:
            reply = f()
203
204
205
206

        if reply is None:
            reply = ""

207
        # sometimes the sender is faster than the receiver. redis messages would be lost
208
209
        time.sleep(0.1)

210
        self.logger.debug(TerminalColors.YELLOW.value + "replying REDIS message " + TerminalColors.ENDC.value + reply + TerminalColors.YELLOW.value + " on channel " + channel + TerminalColors.ENDC.value)
211

212
213
        # publish
        self.redisclient.publish(channel, reply)
214
215
216
217
218

    # ------------------------------------------------------------------------------------------ #
    def join_comm(self):
        try:
            while self.is_alive():
219
                self.logger.debug(str(datetime.now())+" joining")
220
                self.join()
221
                self.logger.warning("join out")
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239

        except (KeyboardInterrupt, SystemExit):
            # Dem Server den Shutdown event setzen
            # server.shutdown_event.set()
            # Der Server wartet auf Eingabe
            # Daher einen Client initiieren, der eine Nachricht schickt
            self.halt()
            sys.exit('Terminated')

    # ------------------------------------------------------------------------------------------ #
    def halt(self):
        """
        Stop the server
        """
        if self.shutdown_event.is_set():
            return
        self.shutdown_event.set()
        try:
David Trattnig's avatar
David Trattnig committed
240
            self.socket.unbind("tcp://"+self.ip+":"+self.port)
241
242
        except:
            pass
243

244
        # self.socket.close()
245
246
247
248
249
250
251

    # ------------------------------------------------------------------------------------------ #
    def send(self, message):
        """
        Send a message to the client
        :param message: string
        """
252
253

        # FIXME Review logic
254
        if not self.can_send:
255
            self.logger.debug("sending a "+str(len(message))+" long message via REDIS.")
256
257
258
            self.socket.send(message.encode("utf-8"))
            self.can_send = False
        else:
259
            self.logger.warning("cannot send message via REDIS: "+str(message))
260

261

262
263
264
265
266
267
268
269
270
271
272

    def terminate(self):
        """
        Called when thread is stopped or a signal to terminate is received.
        """
        self.shutdown_event.set()
        self.scheduler.terminate()
        self.pubsub.close()
        self.logger.info("Shutdown event received. Bye bye ...")


273
# ------------------------------------------------------------------------------------------ #
274
275
class ClientRedisAdapter(RedisMessenger):

276
277
    def __init__(self, config):
        RedisMessenger.__init__(self, config)
278
279
280

    # ------------------------------------------------------------------------------------------ #
    def publish(self, channel, message):
281
        if type(channel) == RedisChannel:
282
283
            channel = channel.value

284
        self.rstore.publish(channel, message)