Commit 8f9761b8 authored by Lars Kruse's avatar Lars Kruse
Browse files

fix(heartbeat-monitor): port the "PyHeartBeat" script to Python3

In addition to porting it to python3, a number of documentation and code
issues were solved:

* doc: proper order of arguments
* doc: mention debug mode (previously via "__debug__", now "DEBUG=1")
* doc: provide example for manual heartbeat packets
* code: specify timeout for "socket.recvfrom" (avoid hanging)
* code: handle parameters properly (cast to int/float)
* style: format with "black"
parent d041f86c
Pipeline #1110 passed with stage
in 2 minutes and 3 seconds
#!/usr/bin/env python2.7
#!/usr/bin/env python3
# Copyright (c) 2001, Nicola Larosa
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# with the distribution.
# * Neither the name of the <ORGANIZATION> nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
......@@ -34,54 +34,72 @@ clients that sent at least one packet during the run, but have
not sent any packet since a time longer than the definition of the timeout.
Adjust the constant parameters as needed, or call as:
PyHBServer.py [timeout [udpport]]
PyHeartBeat.py [udpport [timeout]]
Set the environment variable "DEBUG" to "1" in order to emit more detailed
debug messages.
In addition "127.0.0.1" is marked as a previously active peer.
Manual heartbeat messages can be easily sent via "netcat":
echo foo | nc -q 1 -u localhost 43334
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch10s13.html
"""
HBPORT = 43334
CHECKWAIT = 10
from socket import socket, gethostbyname, AF_INET, SOCK_DGRAM
import os
import socket
import sys
from threading import Lock, Thread, Event
from time import time, ctime, sleep
import sys
DEFAULT_HEARTBEAT_PORT = 43334
DEFAULT_WAIT_PERIOD = 10
DEBUG_ENABLED = os.getenv("DEBUG", "0") == "1"
class BeatDict:
"Manage heartbeat dictionary"
def __init__(self):
self.beatDict = {}
if __debug__:
self.beatDict['127.0.0.1'] = time( )
self.dictLock = Lock( )
if DEBUG_ENABLED:
self.beatDict["127.0.0.1"] = time()
self.dictLock = Lock()
def __repr__(self):
list = ''
self.dictLock.acquire( )
for key in self.beatDict.keys( ):
list = "%sIP address: %s - Last time: %s\n" % (
list, key, ctime(self.beatDict[key]))
self.dictLock.release( )
return list
result = ""
self.dictLock.acquire()
for key in self.beatDict.keys():
result += "IP address: %s - Last time: %s\n" % (
key,
ctime(self.beatDict[key]),
)
self.dictLock.release()
return result
def update(self, entry):
"Create or update a dictionary entry"
self.dictLock.acquire( )
self.beatDict[entry] = time( )
self.dictLock.release( )
self.dictLock.acquire()
self.beatDict[entry] = time()
self.dictLock.release()
def extractSilent(self, howPast):
"Returns a list of entries older than howPast"
silent = []
when = time( ) - howPast
self.dictLock.acquire( )
for key in self.beatDict.keys( ):
when = time() - howPast
self.dictLock.acquire()
for key in self.beatDict.keys():
if self.beatDict[key] < when:
silent.append(key)
self.dictLock.release( )
self.dictLock.release()
return silent
class BeatRec(Thread):
"Receive UDP packets, log them in heartbeat dictionary"
......@@ -90,52 +108,62 @@ class BeatRec(Thread):
self.goOnEvent = goOnEvent
self.updateDictFunc = updateDictFunc
self.port = port
self.recSocket = socket(AF_INET, SOCK_DGRAM)
self.recSocket.bind(('', port))
self.recSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.recSocket.settimeout(0.2)
self.recSocket.bind(("", port))
def __repr__(self):
return "Heartbeat Server on port: %d\n" % self.port
return f"Heartbeat Server on port: {self.port}"
def run(self):
while self.goOnEvent.isSet( ):
if __debug__:
print "Waiting to receive..."
data, addr = self.recSocket.recvfrom(6)
if __debug__:
print "Received packet from " + `addr`
self.updateDictFunc(addr[0])
def main( ):
while self.goOnEvent.isSet():
if DEBUG_ENABLED:
print("Waiting to receive...")
try:
data, addr = self.recSocket.recvfrom(6)
except socket.timeout:
# no incoming message -> no timestamp update -> check again
pass
else:
if DEBUG_ENABLED:
print(f"Received packet from {addr}")
self.updateDictFunc(addr[0])
def main():
"Listen to the heartbeats and detect inactive clients"
global HBPORT, CHECKWAIT
if len(sys.argv)>1:
HBPORT=sys.argv[1]
if len(sys.argv)>2:
CHECKWAIT=sys.argv[2]
beatRecGoOnEvent = Event( )
beatRecGoOnEvent.set( )
beatDictObject = BeatDict( )
beatRecThread = BeatRec(beatRecGoOnEvent, beatDictObject.update, HBPORT)
if __debug__:
print beatRecThread
beatRecThread.start( )
print "PyHeartBeat server listening on port %d" % HBPORT
print "\n*** Press Ctrl-C to stop ***\n"
while 1:
if len(sys.argv) > 1:
heartbeat_port = int(sys.argv[1])
else:
heartbeat_port = DEFAULT_HEARTBEAT_PORT
if len(sys.argv) > 2:
wait_period = float(sys.argv[2])
else:
wait_period = DEFAULT_WAIT_PERIOD
beatRecGoOnEvent = Event()
beatRecGoOnEvent.set()
beatDictObject = BeatDict()
beatRecThread = BeatRec(beatRecGoOnEvent, beatDictObject.update, heartbeat_port)
if DEBUG_ENABLED:
print(beatRecThread)
beatRecThread.start()
print(f"PyHeartBeat server listening on port {heartbeat_port}")
print("\n*** Press Ctrl-C to stop ***\n")
while True:
try:
if __debug__:
print "Beat Dictionary"
print `beatDictObject`
silent = beatDictObject.extractSilent(CHECKWAIT)
if DEBUG_ENABLED:
print(f"Beat Dictionary: {beatDictObject}")
silent = beatDictObject.extractSilent(wait_period)
if silent:
print "Silent clients"
print `silent`
sleep(CHECKWAIT)
print(f"Silent clients: {' '.join(silent)}")
sleep(wait_period)
except KeyboardInterrupt:
print "Exiting."
beatRecGoOnEvent.clear( )
beatRecThread.join( )
print("Exiting.")
beatRecGoOnEvent.clear()
beatRecThread.join()
break
if __name__ == '__main__':
main( )
\ No newline at end of file
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment