Newer
Older
#
# Aura Engine
#
# Copyright (C) 2020 David Trattnig <david.trattnig@subsquare.at>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from socket import socket, AF_INET, SOCK_DGRAM
from time import time, ctime, sleep
from modules.communication.redis.adapter import ClientRedisAdapter
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
soundsystem = None
status = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, config, soundsystem):
"""
Initialize Monitoring
"""
self.logger = logging.getLogger("AuraEngine")
self.config = config
self.soundsystem = soundsystem
self.status = dict()
self.status["api"]["steering"] = dict()
self.status["api"]["tank"] = dict()
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = config.get("heartbeat_server")
self.heartbeat_port = config.get("heartbeat_port")
self.heartbeat_frequency = config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["soundsystem"]["active"] \
and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
return True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
#
# PRIVATE METHODS
#
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status()
#self.status["soundsystem"]["recorder"] = self.soundsystem.get_recorder_status()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["api"]["steering"]["url"] = self.config.get("api_steering_status")
self.status["api"]["steering"]["available"] = self.validate_url_connection(self.config.get("api_steering_status"))
self.status["api"]["tank"]["url"] = self.config.get("api_tank_status")
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
Refreshes the vital status info which are required for the engine to survive.
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.soundsystem.disable_transaction(self.soundsystem.client)
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# After first update start the Heartbeat Monitior
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
as heartbeat is sent to the configured server.
if self.has_valid_status(True):
self.heartbeat_socket.sendto(b"OK", (self.heartbeat_server, self.heartbeat_port))
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
Retrieves all input and outputs provided by the engine.
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def validate_url_connection(self, url):
"""
Checks if connection to passed URL is successful.
"""
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
response.read()
except Exception:
return False
return True
def validate_redis_connection(self):
"""
Checks if the connection to Redis is successful.
"""
try:
cra = ClientRedisAdapter(self.config)
cra.publish("aura", "status")
except:
return False
return True
def validate_directory(self, dir_path):
"""
Checks if a given directory is existing and holds content
"""
status = dict()
status["exists"] = path.exists(dir_path) and os.path.isdir(dir_path)
status["has_content"] = False
if status["exists"]:
status["has_content"] = any([True for _ in os.scandir(dir_path)])
def get_url_response(self, url):
"""
Fetches JSON data from the given URL.
Args:
url (String): The API endpoint to call
Returns:
(dict[]): A Python object representing the JSON structure
"""
data = None
try:
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))