From 606acd12d673d6784a96b2962c41fe336ee88f55 Mon Sep 17 00:00:00 2001
From: David Trattnig <david.trattnig@o94.at>
Date: Thu, 7 May 2020 14:02:59 +0200
Subject: [PATCH] Heartbeat Monitoring.

---
 configuration/sample-development.engine.ini |  10 +-
 configuration/sample-docker.engine.ini      |  10 +-
 configuration/sample-production.engine.ini  |  10 +-
 contrib/heartbeat-monitor/PyHeartBeat.py    | 141 ++++++++++++++++++
 modules/core/monitor.py                     | 150 +++++++++++++++-----
 modules/core/startup.py                     |   3 +-
 6 files changed, 281 insertions(+), 43 deletions(-)
 create mode 100644 contrib/heartbeat-monitor/PyHeartBeat.py

diff --git a/configuration/sample-development.engine.ini b/configuration/sample-development.engine.ini
index c211cffe..c327e567 100644
--- a/configuration/sample-development.engine.ini
+++ b/configuration/sample-development.engine.ini
@@ -26,12 +26,18 @@ mail_server=""
 mail_server_port="587"
 mail_user="aura@subsquare.at"
 mail_pass="---SECRET--PASSWORD---"
-# if you want to send multiple adminmails, make them space separated
+# If you want to send multiple adminmails, make them space separated
 admin_mail="david@subsquare.at"
-# with from mailadress should be used
+# Which from mailadress should be used
 from_mail="monitoring@aura.engine"
 # The beginning of the subject. With that you can easily apply filter rules using a mail client
 mailsubject_prefix="[Aura Engine]"
+# Server where heartbeat info is sent to
+hearbeat_server = '127.0.0.1'   
+# Some UDP port
+heartbeat_port = 43334 
+# Seconds how often the vitality of the Engine should be checked (0 = disabled)
+heartbeat_frequency = 1
 
 [api]
 
diff --git a/configuration/sample-docker.engine.ini b/configuration/sample-docker.engine.ini
index 4eeee717..ebcb2970 100644
--- a/configuration/sample-docker.engine.ini
+++ b/configuration/sample-docker.engine.ini
@@ -26,12 +26,18 @@ mail_server="w00fdabd.kasserver.com"
 mail_server_port="587"
 mail_user="aura@subsquare.at"
 mail_pass="---SECRET--PASSWORD---"
-# if you want to send multiple adminmails, make them space separated
+# If you want to send multiple adminmails, make them space separated
 admin_mail="david@subsquare.at"
-# with from mailadress should be used
+# Which from mailadress should be used
 from_mail="monitoring@aura.engine"
 # The beginning of the subject. With that you can easily apply filter rules using a mail client
 mailsubject_prefix="[Aura Engine]"
+# Server where heartbeat info is sent to
+hearbeat_server = '172.17.0.1'   
+# Some UDP port
+heartbeat_port = 43334 
+# Seconds how often the vitality of the Engine should be checked (0 = disabled)
+heartbeat_frequency = 1
 
 [api]
 
diff --git a/configuration/sample-production.engine.ini b/configuration/sample-production.engine.ini
index 989cd285..9dd9f9b3 100644
--- a/configuration/sample-production.engine.ini
+++ b/configuration/sample-production.engine.ini
@@ -26,12 +26,18 @@ mail_server="w00fdabd.kasserver.com"
 mail_server_port="587"
 mail_user="aura@subsquare.at"
 mail_pass="---SECRET--PASSWORD---"
-# if you want to send multiple adminmails, make them space separated
+# If you want to send multiple adminmails, make them space separated
 admin_mail="david@subsquare.at"
-# with from mailadress should be used
+# Which from mailadress should be used
 from_mail="monitoring@aura.engine"
 # The beginning of the subject. With that you can easily apply filter rules using a mail client
 mailsubject_prefix="[Aura Engine]"
+# Server where heartbeat info is sent to
+hearbeat_server = '127.0.0.1'   
+# Some UDP port
+heartbeat_port = 43334 
+# Seconds how often the vitality of the Engine should be checked (0 = disabled)
+heartbeat_frequency = 1
 
 [api]
 
diff --git a/contrib/heartbeat-monitor/PyHeartBeat.py b/contrib/heartbeat-monitor/PyHeartBeat.py
new file mode 100644
index 00000000..5b3e495d
--- /dev/null
+++ b/contrib/heartbeat-monitor/PyHeartBeat.py
@@ -0,0 +1,141 @@
+#!/usr/bin/env python2.7
+
+# Copyright (c) 2001, Nicola Larosa
+# All rights reserved.
+# Redistribution and use in source and binary forms, with or without 
+# modification, are permitted provided that the following conditions 
+# are met:
+#    * Redistributions of source code must retain the above copyright 
+#      notice, this list of conditions and the following disclaimer. 
+#    * Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials provided
+#      with the distribution. 
+#    * Neither the name of the <ORGANIZATION> nor the names of its 
+#      contributors may be used to endorse or promote products derived 
+#      from this software without specific prior written permission. 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS
+# OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+""" PyHeartBeat server: receives and tracks UDP packets from all clients.
+
+While the BeatLog thread logs each UDP packet in a dictionary, the main
+thread periodically scans the dictionary and prints the IP addresses of the
+clients that sent at least one packet during the run, but have
+not sent any packet since a time longer than the definition of the timeout.
+
+Adjust the constant parameters as needed, or call as:
+    PyHBServer.py [timeout [udpport]]
+
+https://www.oreilly.com/library/view/python-cookbook/0596001673/ch10s13.html
+"""
+
+HBPORT = 43334
+CHECKWAIT = 10
+
+from socket import socket, gethostbyname, AF_INET, SOCK_DGRAM
+from threading import Lock, Thread, Event
+from time import time, ctime, sleep
+import sys
+
+class BeatDict:
+    "Manage heartbeat dictionary"
+
+    def __init__(self):
+        self.beatDict = {}
+        if __debug__:
+            self.beatDict['127.0.0.1'] = time(  )
+        self.dictLock = Lock(  )
+
+    def __repr__(self):
+        list = ''
+        self.dictLock.acquire(  )
+        for key in self.beatDict.keys(  ):
+            list = "%sIP address: %s - Last time: %s\n" % (
+                list, key, ctime(self.beatDict[key]))
+        self.dictLock.release(  )
+        return list
+
+    def update(self, entry):
+        "Create or update a dictionary entry"
+        self.dictLock.acquire(  )
+        self.beatDict[entry] = time(  )
+        self.dictLock.release(  )
+
+    def extractSilent(self, howPast):
+        "Returns a list of entries older than howPast"
+        silent = []
+        when = time(  ) - howPast
+        self.dictLock.acquire(  )
+        for key in self.beatDict.keys(  ):
+            if self.beatDict[key] < when:
+                silent.append(key)
+        self.dictLock.release(  )
+        return silent
+
+class BeatRec(Thread):
+    "Receive UDP packets, log them in heartbeat dictionary"
+
+    def __init__(self, goOnEvent, updateDictFunc, port):
+        Thread.__init__(self)
+        self.goOnEvent = goOnEvent
+        self.updateDictFunc = updateDictFunc
+        self.port = port
+        self.recSocket = socket(AF_INET, SOCK_DGRAM)
+        self.recSocket.bind(('', port))
+
+    def __repr__(self):
+        return "Heartbeat Server on port: %d\n" % self.port
+
+    def run(self):
+        while self.goOnEvent.isSet(  ):
+            if __debug__:
+                print "Waiting to receive..."
+            data, addr = self.recSocket.recvfrom(6)
+            if __debug__:
+                print "Received packet from " + `addr`
+            self.updateDictFunc(addr[0])
+
+def main(  ):
+    "Listen to the heartbeats and detect inactive clients"
+    global HBPORT, CHECKWAIT
+    if len(sys.argv)>1:
+        HBPORT=sys.argv[1]
+    if len(sys.argv)>2:
+        CHECKWAIT=sys.argv[2]
+
+    beatRecGoOnEvent = Event(  )
+    beatRecGoOnEvent.set(  )
+    beatDictObject = BeatDict(  )
+    beatRecThread = BeatRec(beatRecGoOnEvent, beatDictObject.update, HBPORT)
+    if __debug__:
+        print beatRecThread
+    beatRecThread.start(  )
+    print "PyHeartBeat server listening on port %d" % HBPORT
+    print "\n*** Press Ctrl-C to stop ***\n"
+    while 1:
+        try:
+            if __debug__:
+                print "Beat Dictionary"
+                print `beatDictObject`
+            silent = beatDictObject.extractSilent(CHECKWAIT)
+            if silent:
+                print "Silent clients"
+                print `silent`
+            sleep(CHECKWAIT)
+        except KeyboardInterrupt:
+            print "Exiting."
+            beatRecGoOnEvent.clear(  )
+            beatRecThread.join(  )
+
+if __name__ == '__main__':
+    main(  )
\ No newline at end of file
diff --git a/modules/core/monitor.py b/modules/core/monitor.py
index 7b95a155..7fbc753a 100644
--- a/modules/core/monitor.py
+++ b/modules/core/monitor.py
@@ -17,18 +17,24 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
+
+import sys
 import urllib
 import logging
 import json
 import os.path
+import threading
 
 from os import path
 from enum import Enum
+from socket import socket, AF_INET, SOCK_DGRAM
+from time import time, ctime, sleep
 
 import meta
 from modules.communication.redis.adapter import ClientRedisAdapter
 
 
+
 class MonitorResponseCode(Enum):
     OK = "OK"
     INVALID_STATE = "INVALID-STATE"
@@ -37,12 +43,23 @@ class MonitorResponseCode(Enum):
 
 class Monitoring:
     """
-    Engine Monitoring
+    Engine Monitoring is in charge of:
+
+    - Checking the overall status of all components and external API endpoints
+    - Checking the vital parts, which are minimal requirements for running the engine
+    - Sending a heartbeat to a defined server via socket
+
     """
     logger = None
     soundsystem = None
     status = None
 
+    heartbeat_server = None
+    heartbeat_port = None
+    heartbeat_frequency = None
+    heartbeat_socket = None
+    heartbeat_running = None
+
 
     def __init__(self, config, soundsystem):
         """
@@ -59,16 +76,77 @@ class Monitoring:
         self.status["api"]["tank"] = dict()
         self.status["api"]["engine"] = dict()
 
+        # Heartbeat settings
+        self.heartbeat_running = False
+        self.heartbeat_server = config.get("heartbeat_server")
+        self.heartbeat_port = config.get("heartbeat_port")
+        self.heartbeat_frequency = config.get("heartbeat_frequency")
+        self.heartbeat_socket = socket(AF_INET, SOCK_DGRAM)
+
+
+
+    #
+    # PUBLIC METHODS
+    #
+
+
+    def get_status(self):
+        """
+        Retrieves the current monitoring status.
+        """
+        return self.status
+
+
+
+    def has_valid_status(self, update_vitality_only):
+        """
+        Checks if the current status is valid to run engine. By default it
+        does not request new status information, rather using the cached one.
+        To request new data either call `get_status()` before or use the
+        `update_vital` parameter.
+
+        Args:
+            update_vitality_only (Boolean): Refreshes only the vital parts required for the heartbeat
+        """
+                
+        if update_vitality_only:
+            self.update_vitality_status()
+        else:
+            self.update_status()
+            
+        try:
+            if self.status["soundsystem"]["active"] \
+                and self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
+                and self.status["redis_ready"] \
+                and self.status["audio_store"]["exists"]:
+
+                self.status["engine"]["status"] = MonitorResponseCode.OK.value
+                return True
+            else:
+                self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
+                return False
+
+        except Exception as e:
+            self.logger.error("Exception while validating engine status: " + str(e))
+            self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
+        
+        return False
+
+
+
+    #
+    # PRIVATE METHODS
+    #
+
 
     def update_status(self):
         """
         Requests the current status of all components
         """
         self.status["engine"]["version"] = meta.__version__
-        
+
         self.soundsystem.enable_transaction(self.soundsystem.client)
         self.status["soundsystem"]["version"]   = self.soundsystem.version()
-        self.status["soundsystem"]["active"]    = self.soundsystem.is_active()
         self.status["soundsystem"]["uptime"]    = self.soundsystem.uptime()
         self.status["soundsystem"]["io"]        = self.get_io_state()
         self.status["soundsystem"]["mixer"]     = self.soundsystem.get_mixer_status()
@@ -82,58 +160,58 @@ class Monitoring:
         self.status["api"]["tank"]["available"]     = self.validate_url_connection(self.config.get("api_tank_status"))
         self.status["api"]["tank"]["status"]        = self.get_url_response(self.config.get("api_tank_status"))
 
-        self.status["api"]["engine"]["url"]          = self.config.get("exposed_api_url")
+        self.status["api"]["engine"]["url"]         = self.config.get("exposed_api_url")
         self.status["api"]["engine"]["available"]   = self.validate_url_connection(self.config.get("exposed_api_url"))
 
-        self.status["redis_ready"]                  = self.validate_redis_connection()
-        self.status["audio_store"]                  = self.validate_directory(self.config.get("audiofolder"))
+        self.update_vitality_status()
 
-        if self.has_valid_status():
-            self.status["engine"]["status"] = MonitorResponseCode.OK.value
-        else:
-            self.status["engine"]["status"] = MonitorResponseCode.INVALID_STATE.value
 
 
-
-    def get_io_state(self):
+    def update_vitality_status(self):
         """
-        Retrieves all input and outputs provided by the engine.
+        Refreshes the vital status info which are required for the engine to survive.
         """
-        ios = self.soundsystem.engine_state()
+        self.soundsystem.enable_transaction(self.soundsystem.client)
+        self.status["soundsystem"]["active"]    = self.soundsystem.is_active()
+        self.soundsystem.disable_transaction(self.soundsystem.client)
 
-        try:
-            ios = ios.replace('"connected":', '"connected": ""')
-            ios = json.loads(ios, strict=False)
-            return ios
-        except Exception as e:
-            self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
-            return MonitorResponseCode.INVALID_STATE.value
+        self.status["redis_ready"]              = self.validate_redis_connection()
+        self.status["audio_store"]              = self.validate_directory(self.config.get("audiofolder"))
 
+        # After first update start the Heartbeat Monitior
+        if not self.heartbeat_running:
+            self.heartbeat_running = True
+            if self.config.get("heartbeat_frequency") > 0:
+                self.heartbeat()
 
 
-    def get_status(self):
+
+    def heartbeat(self):
         """
-        Retrieves the current monitoring status.
+        Every `heartbeat_frequency` seconds the current vitality status is checked. If it's okay,
+        as heartbeat is sent to the configured server.
         """
-        self.update_status()
-        return self.status
+        if self.has_valid_status(True):
+            self.heartbeat_socket.sendto(b"OK", (self.heartbeat_server, self.heartbeat_port))
+
+        threading.Timer(self.config.get("heartbeat_frequency"), self.heartbeat).start()
 
 
 
-    def has_valid_status(self):
+    def get_io_state(self):
         """
-        Checks if the current status is valid to run engine
+        Retrieves all input and outputs provided by the engine.
         """
-        try:
-            if self.status["soundsystem"]["mixer"]["in_filesystem_0"] \
-                and self.status["redis_ready"] \
-                and self.status["audio_store"]["exists"]:
+        ios = self.soundsystem.engine_state()
 
-                return True
-            return False
+        try:
+            ios = ios.replace('"connected":', '"connected": ""')
+            ios = json.loads(ios, strict=False)
+            return ios
         except Exception as e:
-            self.logger.error("Exception while validating engine status: " + str(e))
-            return False
+            self.logger.warn("Got invalid JSON from soundsystem - " + str(e))
+            return MonitorResponseCode.INVALID_STATE.value
+
 
 
     def validate_url_connection(self, url):
@@ -200,4 +278,4 @@ class Monitoring:
         except (urllib.error.URLError, IOError, ValueError) as e:
             self.logger.error("Error while connecting to URL '%s' - %s" % (url, e))
 
-        return MonitorResponseCode.INVALID_STATE.value
\ No newline at end of file
+        return MonitorResponseCode.INVALID_STATE.value
diff --git a/modules/core/startup.py b/modules/core/startup.py
index f4873dc3..5c3de48e 100644
--- a/modules/core/startup.py
+++ b/modules/core/startup.py
@@ -60,9 +60,10 @@ class StartupThread(threading.Thread):
         try:
             self.soundsystem.start()
 
+            is_valid = self.monitoring.has_valid_status(False)
             status = self.monitoring.get_status()
             self.logger.info("Status Monitor:\n%s" % json.dumps(status, indent=4))
-            if not status["engine"]["status"] == "OK":
+            if not is_valid:
                 self.logger.info("Engine Status: " + SimpleUtil.red(status["engine"]["status"]))
                 raise EngineMalfunctionException
             else:
-- 
GitLab