Commit 606acd12 authored by David Trattnig's avatar David Trattnig
Browse files

Heartbeat Monitoring.

parent 731b1aaf
......@@ -26,12 +26,18 @@ mail_server=""
mail_server_port="587"
mail_user="aura@subsquare.at"
mail_pass="---SECRET--PASSWORD---"
# if you want to send multiple adminmails, make them space separated
# If you want to send multiple adminmails, make them space separated
admin_mail="david@subsquare.at"
# with from mailadress should be used
# Which from mailadress should be used
from_mail="monitoring@aura.engine"
# The beginning of the subject. With that you can easily apply filter rules using a mail client
mailsubject_prefix="[Aura Engine]"
# Server where heartbeat info is sent to
hearbeat_server = '127.0.0.1'
# Some UDP port
heartbeat_port = 43334
# Seconds how often the vitality of the Engine should be checked (0 = disabled)
heartbeat_frequency = 1
[api]
......
......@@ -26,12 +26,18 @@ mail_server="w00fdabd.kasserver.com"
mail_server_port="587"
mail_user="aura@subsquare.at"
mail_pass="---SECRET--PASSWORD---"
# if you want to send multiple adminmails, make them space separated
# If you want to send multiple adminmails, make them space separated
admin_mail="david@subsquare.at"
# with from mailadress should be used
# Which from mailadress should be used
from_mail="monitoring@aura.engine"
# The beginning of the subject. With that you can easily apply filter rules using a mail client
mailsubject_prefix="[Aura Engine]"
# Server where heartbeat info is sent to
hearbeat_server = '172.17.0.1'
# Some UDP port
heartbeat_port = 43334
# Seconds how often the vitality of the Engine should be checked (0 = disabled)
heartbeat_frequency = 1
[api]
......
......@@ -26,12 +26,18 @@ mail_server="w00fdabd.kasserver.com"
mail_server_port="587"
mail_user="aura@subsquare.at"
mail_pass="---SECRET--PASSWORD---"
# if you want to send multiple adminmails, make them space separated
# If you want to send multiple adminmails, make them space separated
admin_mail="david@subsquare.at"
# with from mailadress should be used
# Which from mailadress should be used
from_mail="monitoring@aura.engine"
# The beginning of the subject. With that you can easily apply filter rules using a mail client
mailsubject_prefix="[Aura Engine]"
# Server where heartbeat info is sent to
hearbeat_server = '127.0.0.1'
# Some UDP port
heartbeat_port = 43334
# Seconds how often the vitality of the Engine should be checked (0 = disabled)
heartbeat_frequency = 1
[api]
......
#!/usr/bin/env python2.7
# Copyright (c) 2001, Nicola Larosa
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS
# OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
""" PyHeartBeat server: receives and tracks UDP packets from all clients.
While the BeatLog thread logs each UDP packet in a dictionary, the main
thread periodically scans the dictionary and prints the IP addresses of the
clients that sent at least one packet during the run, but have
not sent any packet since a time longer than the definition of the timeout.
Adjust the constant parameters as needed, or call as:
PyHBServer.py [timeout [udpport]]
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch10s13.html
"""
HBPORT = 43334
CHECKWAIT = 10
from socket import socket, gethostbyname, AF_INET, SOCK_DGRAM
from threading import Lock, Thread, Event
from time import time, ctime, sleep
import sys
class BeatDict:
"Manage heartbeat dictionary"
def __init__(self):
self.beatDict = {}
if __debug__:
self.beatDict['127.0.0.1'] = time( )
self.dictLock = Lock( )
def __repr__(self):
list = ''
self.dictLock.acquire( )
for key in self.beatDict.keys( ):
list = "%sIP address: %s - Last time: %s\n" % (
list, key, ctime(self.beatDict[key]))
self.dictLock.release( )
return list
def update(self, entry):
"Create or update a dictionary entry"
self.dictLock.acquire( )
self.beatDict[entry] = time( )
self.dictLock.release( )
def extractSilent(self, howPast):
"Returns a list of entries older than howPast"
silent = []
when = time( ) - howPast
self.dictLock.acquire( )
for key in self.beatDict.keys( ):
if self.beatDict[key] < when:
silent.append(key)
self.dictLock.release( )
return silent
class BeatRec(Thread):
"Receive UDP packets, log them in heartbeat dictionary"
def __init__(self, goOnEvent, updateDictFunc, port):
Thread.__init__(self)
self.goOnEvent = goOnEvent
self.updateDictFunc = updateDictFunc
self.port = port
self.recSocket = socket(AF_INET, SOCK_DGRAM)
self.recSocket.bind(('', port))
def __repr__(self):
return "Heartbeat Server on port: %d\n" % self.port
def run(self):
while self.goOnEvent.isSet( ):
if __debug__:
print "Waiting to receive..."
data, addr = self.recSocket.recvfrom(6)
if __debug__:
print "Received packet from " + `addr`
self.updateDictFunc(addr[0])
def main( ):
"Listen to the heartbeats and detect inactive clients"
global HBPORT, CHECKWAIT
if len(sys.argv)>1:
HBPORT=sys.argv[1]
if len(sys.argv)>2:
CHECKWAIT=sys.argv[2]
beatRecGoOnEvent = Event( )
beatRecGoOnEvent.set( )
beatDictObject = BeatDict( )
beatRecThread = BeatRec(beatRecGoOnEvent, beatDictObject.update, HBPORT)
if __debug__:
print beatRecThread
beatRecThread.start( )
print "PyHeartBeat server listening on port %d" % HBPORT
print "\n*** Press Ctrl-C to stop ***\n"
while 1:
try:
if __debug__:
print "Beat Dictionary"
print `beatDictObject`
silent = beatDictObject.extractSilent(CHECKWAIT)
if silent:
print "Silent clients"
print `silent`
sleep(CHECKWAIT)
except KeyboardInterrupt:
print "Exiting."
beatRecGoOnEvent.clear( )
beatRecThread.join( )
if __name__ == '__main__':
main( )
\ No newline at end of file
......@@ -17,18 +17,24 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import urllib
import logging
import json
import os.path
import threading
from os import path
from enum import Enum
from socket import socket, AF_INET, SOCK_DGRAM
from time import time, ctime, sleep
import meta
from modules.communication.redis.adapter import ClientRedisAdapter
class MonitorResponseCode(Enum):
OK = "OK"
INVALID_STATE = "INVALID-STATE"
......@@ -37,12 +43,23 @@ class MonitorResponseCode(Enum):
class Monitoring:
"""
Engine Monitoring
Engine Monitoring is in charge of:
- Checking the overall status of all components and external API endpoints
- Checking the vital parts, which are minimal requirements for running the engine
- Sending a heartbeat to a defined server via socket
"""
logger = None
soundsystem = None
status = None
heartbeat_server = None
heartbeat_port = None
heartbeat_frequency = None
heartbeat_socket = None
heartbeat_running = None
def __init__(self, config, soundsystem):
"""
......@@ -59,16 +76,77 @@ class Monitoring:
self.status["api"]["tank"] = dict()
self.status["api"]["engine"] = dict()
# Heartbeat settings
self.heartbeat_running = False
self.heartbeat_server = config.get("heartbeat_server")
self.heartbeat_port = config.get("heartbeat_port")
self.heartbeat_frequency = config.get("heartbeat_frequency")
self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
#
# PUBLIC METHODS
#
def get_status(self):
"""
Retrieves the current monitoring status.
"""
return self.status
def has_valid_status(self, update_vitality_only):
"""
Checks if the current status is valid to run engine. By default it
does not request new status information, rather using the cached one.
To request new data either call `get_status()` before or use the
`update_vital` parameter.
Args:
update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
"""
if update_vitality_only:
self.update_vitality_status()
else:
self.update_status()
try:
if self.status["soundsystem"]["active"] \
and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
self.status["engine"]["status"] = MonitorResponseCode.OK.value
return True
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
return False
#
# PRIVATE METHODS
#
def update_status(self):
"""
Requests the current status of all components
"""
self.status["engine"]["version"] = meta.__version__
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["version"] = self.soundsystem.version()
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.status["soundsystem"]["uptime"] = self.soundsystem.uptime()
self.status["soundsystem"]["io"] = self.get_io_state()
self.status["soundsystem"]["mixer"] = self.soundsystem.get_mixer_status()
......@@ -82,58 +160,58 @@ class Monitoring:
self.status["api"]["tank"]["available"] = self.validate_url_connection(self.config.get("api_tank_status"))
self.status["api"]["tank"]["status"] = self.get_url_response(self.config.get("api_tank_status"))
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["url"] = self.config.get("exposed_api_url")
self.status["api"]["engine"]["available"] = self.validate_url_connection(self.config.get("exposed_api_url"))
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
self.update_vitality_status()
if self.has_valid_status():
self.status["engine"]["status"] = MonitorResponseCode.OK.value
else:
self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
def get_io_state(self):
def update_vitality_status(self):
"""
Retrieves all input and outputs provided by the engine.
Refreshes the vital status info which are required for the engine to survive.
"""
ios = self.soundsystem.engine_state()
self.soundsystem.enable_transaction(self.soundsystem.client)
self.status["soundsystem"]["active"] = self.soundsystem.is_active()
self.soundsystem.disable_transaction(self.soundsystem.client)
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
self.status["redis_ready"] = self.validate_redis_connection()
self.status["audio_store"] = self.validate_directory(self.config.get("audiofolder"))
# After first update start the Heartbeat Monitior
if not self.heartbeat_running:
self.heartbeat_running = True
if self.config.get("heartbeat_frequency") > 0:
self.heartbeat()
def get_status(self):
def heartbeat(self):
"""
Retrieves the current monitoring status.
Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
as heartbeat is sent to the configured server.
"""
self.update_status()
return self.status
if self.has_valid_status(True):
self.heartbeat_socket.sendto(b"OK", (self.heartbeat_server, self.heartbeat_port))
threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
def has_valid_status(self):
def get_io_state(self):
"""
Checks if the current status is valid to run engine
Retrieves all input and outputs provided by the engine.
"""
try:
if self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
and self.status["redis_ready"] \
and self.status["audio_store"]["exists"]:
ios = self.soundsystem.engine_state()
return True
return False
try:
ios = ios.replace('"connected":', '"connected": ""')
ios = json.loads(ios, strict=False)
return ios
except Exception as e:
self.logger.error("Exception while validating engine status: " + str(e))
return False
self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
return MonitorResponseCode.INVALID_STATE.value
def validate_url_connection(self, url):
......@@ -200,4 +278,4 @@ class Monitoring:
except (urllib.error.URLError, IOError, ValueError) as e:
self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
return MonitorResponseCode.INVALID_STATE.value
\ No newline at end of file
return MonitorResponseCode.INVALID_STATE.value
......@@ -60,9 +60,10 @@ class StartupThread(threading.Thread):
try:
self.soundsystem.start()
is_valid = self.monitoring.has_valid_status(False)
status = self.monitoring.get_status()
self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
if not status["engine"]["status"] == "OK":
if not is_valid:
self.logger.info("Engine Status: " + SimpleUtil.red(status["engine"]["status"]))
raise EngineMalfunctionException
else:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment