monitor.py 10.8 KB
Newer Older
1
#
David Trattnig's avatar
David Trattnig committed
2
3
4
# Aura Engine (https://gitlab.servus.at/aura/engine)
#
# Copyright (C) 2017-2020 - The Aura Engine Team.
5
6
7
8
9
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
David Trattnig's avatar
David Trattnig committed
10
#
11
12
13
14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
David Trattnig's avatar
David Trattnig committed
15
#
16
17
18
19
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


20
21
import meta

David Trattnig's avatar
David Trattnig committed
22
import sys
23
24
import urllib
import logging
David Trattnig's avatar
David Trattnig committed
25
import json
26
import os.path
David Trattnig's avatar
David Trattnig committed
27
import threading
28
import platform
David Trattnig's avatar
David Trattnig committed
29

30
from os import path
David Trattnig's avatar
David Trattnig committed
31
from enum import Enum
David Trattnig's avatar
David Trattnig committed
32
from time import time, ctime, sleep
33
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST 
34
35


36
37
from modules.communication.redis.adapter import ClientRedisAdapter
from modules.base.utils import SimpleUtil
38

David Trattnig's avatar
David Trattnig committed
39

David Trattnig's avatar
David Trattnig committed
40
41
42
43
44
45
class MonitorResponseCode(Enum):
    OK = "OK"
    INVALID_STATE = "INVALID-STATE"



46
47
class Monitoring:
    """
David Trattnig's avatar
David Trattnig committed
48
49
50
51
52
53
    Engine Monitoring is in charge of:

    - Checking the overall status of all components and external API endpoints
    - Checking the vital parts, which are minimal requirements for running the engine
    - Sending a heartbeat to a defined server via socket

54
55
56
    """
    logger = None
    soundsystem = None
57
    mailer = None
58
    status = None
59
60
    already_invalid = None
    engine_id = None
61

David Trattnig's avatar
David Trattnig committed
62
63
64
65
66
67
    heartbeat_server = None
    heartbeat_port = None
    heartbeat_frequency = None
    heartbeat_socket = None
    heartbeat_running = None

68

69
    def __init__(self, config, soundsystem, mailer):
70
71
72
73
74
75
        """
        Initialize Monitoring
        """
        self.logger = logging.getLogger("AuraEngine")
        self.config = config
        self.soundsystem = soundsystem
76
        self.mailer = mailer
77
        self.status = dict()
David Trattnig's avatar
David Trattnig committed
78
        self.status["engine"] = dict()
79
        self.status["soundsystem"] = dict()
David Trattnig's avatar
David Trattnig committed
80
        self.status["redis"] = dict()
David Trattnig's avatar
David Trattnig committed
81
        self.status["api"] = dict()
David Trattnig's avatar
David Trattnig committed
82
83
        self.status["api"]["steering"] = dict()
        self.status["api"]["tank"] = dict()
David Trattnig's avatar
David Trattnig committed
84
        self.status["api"]["engine"] = dict()
85
        self.already_invalid = False
86

David Trattnig's avatar
David Trattnig committed
87
88
89
90
91
92
93
        # Heartbeat settings
        self.heartbeat_running = False
        self.heartbeat_server = config.get("heartbeat_server")
        self.heartbeat_port = config.get("heartbeat_port")
        self.heartbeat_frequency = config.get("heartbeat_frequency")
        self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)

94
95
        self.engine_id = self.get_engine_id()
        
David Trattnig's avatar
David Trattnig committed
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

    #
    # PUBLIC METHODS
    #


    def get_status(self):
        """
        Retrieves the current monitoring status.
        """
        return self.status



    def has_valid_status(self, update_vitality_only):
        """
        Checks if the current status is valid to run engine. By default it
        does not request new status information, rather using the cached one.
        To request new data either call `get_status()` before or use the
        `update_vital` parameter.

        Args:
            update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
        """
David Trattnig's avatar
David Trattnig committed
120
121
        is_valid = False

David Trattnig's avatar
David Trattnig committed
122
123
124
125
126
127
128
129
        if update_vitality_only:
            self.update_vitality_status()
        else:
            self.update_status()
            
        try:
            if self.status["soundsystem"]["active"] \
                and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
David Trattnig's avatar
David Trattnig committed
130
                and self.status["redis"]["active"] \
David Trattnig's avatar
David Trattnig committed
131
132
133
                and self.status["audio_store"]["exists"]:

                self.status["engine"]["status"] = MonitorResponseCode.OK.value
David Trattnig's avatar
David Trattnig committed
134
                is_valid = True
David Trattnig's avatar
David Trattnig committed
135
136
137
138
139
140
141
            else:
                self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value

        except Exception as e:
            self.logger.error("Exception while validating engine status: " + str(e))
            self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
        
David Trattnig's avatar
David Trattnig committed
142
        return is_valid
David Trattnig's avatar
David Trattnig committed
143
144
145
146
147
148
149



    #
    # PRIVATE METHODS
    #

150
151
152
153
154

    def update_status(self):
        """
        Requests the current status of all components
        """
David Trattnig's avatar
David Trattnig committed
155
        self.status["engine"]["version"] = meta.__version__
David Trattnig's avatar
David Trattnig committed
156

David Trattnig's avatar
David Trattnig committed
157
158
159
160
        self.soundsystem.enable_transaction(self.soundsystem.client)
        self.status["soundsystem"]["version"]   = self.soundsystem.version()
        self.status["soundsystem"]["uptime"]    = self.soundsystem.uptime()
        self.status["soundsystem"]["io"]        = self.get_io_state()
David Trattnig's avatar
David Trattnig committed
161
        self.status["soundsystem"]["mixer"]     = self.soundsystem.mixer_status()
162
        #self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
David Trattnig's avatar
David Trattnig committed
163
164
        self.soundsystem.disable_transaction(self.soundsystem.client)
    
David Trattnig's avatar
David Trattnig committed
165
        self.status["api"]["steering"]["url"]       = self.config.get("api_steering_status")
David Trattnig's avatar
David Trattnig committed
166
        self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
David Trattnig's avatar
David Trattnig committed
167

David Trattnig's avatar
David Trattnig committed
168
169
170
171
        self.status["api"]["tank"]["url"]           = self.config.get("api_tank_status")
        self.status["api"]["tank"]["available"]     = self.validate_url_connection(self.config.get("api_tank_status"))
        self.status["api"]["tank"]["status"]        = self.get_url_response(self.config.get("api_tank_status"))

David Trattnig's avatar
David Trattnig committed
172
        self.status["api"]["engine"]["url"]         = self.config.get("exposed_api_url")
David Trattnig's avatar
David Trattnig committed
173
174
        self.status["api"]["engine"]["available"]   = self.validate_url_connection(self.config.get("exposed_api_url"))

David Trattnig's avatar
David Trattnig committed
175
        self.update_vitality_status()
176
177
178



David Trattnig's avatar
David Trattnig committed
179
    def update_vitality_status(self):
David Trattnig's avatar
David Trattnig committed
180
        """
David Trattnig's avatar
David Trattnig committed
181
        Refreshes the vital status info which are required for the engine to survive.
David Trattnig's avatar
David Trattnig committed
182
        """
David Trattnig's avatar
David Trattnig committed
183
        self.soundsystem.enable_transaction(self.soundsystem.client)
David Trattnig's avatar
David Trattnig committed
184
        self.status["soundsystem"]["active"]  = self.soundsystem.is_active()
David Trattnig's avatar
David Trattnig committed
185
        self.soundsystem.disable_transaction(self.soundsystem.client)
David Trattnig's avatar
David Trattnig committed
186

David Trattnig's avatar
David Trattnig committed
187
188
        self.status["redis"]["active"] = self.validate_redis_connection()
        self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
David Trattnig's avatar
David Trattnig committed
189

David Trattnig's avatar
David Trattnig committed
190
        # After first update start the Heartbeat Monitor
David Trattnig's avatar
David Trattnig committed
191
192
193
194
        if not self.heartbeat_running:
            self.heartbeat_running = True
            if self.config.get("heartbeat_frequency") > 0:
                self.heartbeat()
David Trattnig's avatar
David Trattnig committed
195

196

David Trattnig's avatar
David Trattnig committed
197
198

    def heartbeat(self):
199
        """
David Trattnig's avatar
David Trattnig committed
200
        Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
David Trattnig's avatar
David Trattnig committed
201
        a heartbeat is sent to the configured server.
202
        """
David Trattnig's avatar
David Trattnig committed
203
        if self.has_valid_status(True):
David Trattnig's avatar
David Trattnig committed
204
            self.heartbeat_socket.sendto(str.encode("OK"), (self.heartbeat_server, self.heartbeat_port))
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
            
            # Engine resurrected into normal state
            if self.already_invalid:
                self.already_invalid = False
                status = json.dumps(self.get_status())
                self.logger.info(SimpleUtil.green("OK - Engine turned back into some healthy state!")+"\n"+str(status))
                self.mailer.send_admin_mail( \
                    "OK - Engine turned back into some HEALTHY STATE!", \
                    "Things seem fine again at '%s':\n\n%s" % (self.engine_id, status))
        else:
            # Engine turned into invalid state
            if not self.already_invalid:
                self.already_invalid = True
                status = json.dumps(self.get_status())
                self.logger.critical(SimpleUtil.red("Engine turned into some INVALID STATE!")+"\n"+str(status))
                self.mailer.send_admin_mail( \
                    "ERROR - Engine turned into some INVALID STATE!", \
                    "There's an issue with Aura Engine '%s':\n\n%s" % (self.engine_id, status))
David Trattnig's avatar
David Trattnig committed
223
224

        threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
225
226
227



David Trattnig's avatar
David Trattnig committed
228
    def get_io_state(self):
229
        """
David Trattnig's avatar
David Trattnig committed
230
        Retrieves all input and outputs provided by the engine.
231
        """
David Trattnig's avatar
David Trattnig committed
232
        ios = self.soundsystem.engine_state()
233

David Trattnig's avatar
David Trattnig committed
234
235
236
237
        try:
            ios = ios.replace('"connected":', '"connected": ""')
            ios = json.loads(ios, strict=False)
            return ios
238
        except Exception as e:
David Trattnig's avatar
David Trattnig committed
239
240
241
            self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
            return MonitorResponseCode.INVALID_STATE.value

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283


    def validate_url_connection(self, url):
        """
        Checks if connection to passed URL is successful.
        """
        try:
            request = urllib.request.Request(url)
            response = urllib.request.urlopen(request)
            response.read()
        except Exception:
            return False

        return True



    def validate_redis_connection(self):
        """
        Checks if the connection to Redis is successful.
        """
        try:
            cra = ClientRedisAdapter(self.config)
            cra.publish("aura", "status")
        except:
            return False

        return True



    def validate_directory(self, dir_path):
        """
        Checks if a given directory is existing and holds content
        """
        status = dict()
        status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
        status["has_content"] = False

        if status["exists"]:
            status["has_content"] = any([True for _ in os.scandir(dir_path)])

David Trattnig's avatar
David Trattnig committed
284
285
286
        return status


David Trattnig's avatar
David Trattnig committed
287

David Trattnig's avatar
David Trattnig committed
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    def get_url_response(self, url):
        """
        Fetches JSON data from the given URL.

        Args:
            url (String):       The API endpoint to call
        
        Returns:
            (dict[]):           A Python object representing the JSON structure
        """
        data = None

        try:
            request = urllib.request.Request(url)
            response = urllib.request.urlopen(request)
            data = response.read()
David Trattnig's avatar
David Trattnig committed
304
            return json.loads(data, strict=False)
David Trattnig's avatar
David Trattnig committed
305
306
307
        except (urllib.error.URLError, IOError, ValueError) as e:
            self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))

David Trattnig's avatar
David Trattnig committed
308
        return MonitorResponseCode.INVALID_STATE.value
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325



    def get_engine_id(self):
        """
        Retrieves a String identifier consisting of IP and Hostname to differentiate
        the engine in mails and status broadcasts.
        """
        host = platform.node()
        return "%s (%s)" % (self.get_ip(), host)



    def get_ip(self):
        """
        Returns the IP of the Engine instance.
        """
David Trattnig's avatar
David Trattnig committed
326
327
328
329
330
331
332
333
        try:
            s = socket(AF_INET, SOCK_DGRAM)
            s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
            s.connect(('<broadcast>', 0))
            return s.getsockname()[0]
        except:
            self.logger.critical(SimpleUtil.red("Error while accessing network via <broadcast>!"))
            return "<UNKNOWN NETWORK>"