PyHeartBeat.py 5.69 KB
Newer Older
1
#!/usr/bin/env python3
David Trattnig's avatar
David Trattnig committed
2
3
4

# Copyright (c) 2001, Nicola Larosa
# All rights reserved.
5
6
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
David Trattnig's avatar
David Trattnig committed
7
# are met:
8
9
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
David Trattnig's avatar
David Trattnig committed
10
11
12
#    * Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials provided
13
14
15
16
#      with the distribution.
#    * Neither the name of the <ORGANIZATION> nor the names of its
#      contributors may be used to endorse or promote products derived
#      from this software without specific prior written permission.
David Trattnig's avatar
David Trattnig committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS
# OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

""" PyHeartBeat server: receives and tracks UDP packets from all clients.

While the BeatLog thread logs each UDP packet in a dictionary, the main
thread periodically scans the dictionary and prints the IP addresses of the
clients that sent at least one packet during the run, but have
not sent any packet since a time longer than the definition of the timeout.

Adjust the constant parameters as needed, or call as:
37
38
39
40
41
42
43
44
45
46
47
48

    PyHeartBeat.py [udpport [timeout]]

Set the environment variable "DEBUG" to "1" in order to emit more detailed
debug messages.
In addition "127.0.0.1" is marked as a previously active peer.


Manual heartbeat messages can be easily sent via "netcat":

    echo foo | nc -q 1 -u localhost 43334

David Trattnig's avatar
David Trattnig committed
49
50
51
52

https://www.oreilly.com/library/view/python-cookbook/0596001673/ch10s13.html
"""

53
54
55
import os
import socket
import sys
David Trattnig's avatar
David Trattnig committed
56
57
from threading import Lock, Thread, Event
from time import time, ctime, sleep
58
59
60
61
62
63


DEFAULT_HEARTBEAT_PORT = 43334
DEFAULT_WAIT_PERIOD = 10
DEBUG_ENABLED = os.getenv("DEBUG", "0") == "1"

David Trattnig's avatar
David Trattnig committed
64
65
66
67
68
69

class BeatDict:
    "Manage heartbeat dictionary"

    def __init__(self):
        self.beatDict = {}
70
71
72
        if DEBUG_ENABLED:
            self.beatDict["127.0.0.1"] = time()
        self.dictLock = Lock()
David Trattnig's avatar
David Trattnig committed
73
74

    def __repr__(self):
75
76
77
78
79
80
81
82
83
        result = ""
        self.dictLock.acquire()
        for key in self.beatDict.keys():
            result += "IP address: %s - Last time: %s\n" % (
                key,
                ctime(self.beatDict[key]),
            )
        self.dictLock.release()
        return result
David Trattnig's avatar
David Trattnig committed
84
85
86

    def update(self, entry):
        "Create or update a dictionary entry"
87
88
89
        self.dictLock.acquire()
        self.beatDict[entry] = time()
        self.dictLock.release()
David Trattnig's avatar
David Trattnig committed
90
91
92
93

    def extractSilent(self, howPast):
        "Returns a list of entries older than howPast"
        silent = []
94
95
96
        when = time() - howPast
        self.dictLock.acquire()
        for key in self.beatDict.keys():
David Trattnig's avatar
David Trattnig committed
97
98
            if self.beatDict[key] < when:
                silent.append(key)
99
        self.dictLock.release()
David Trattnig's avatar
David Trattnig committed
100
101
        return silent

102

David Trattnig's avatar
David Trattnig committed
103
104
105
106
107
108
109
110
class BeatRec(Thread):
    "Receive UDP packets, log them in heartbeat dictionary"

    def __init__(self, goOnEvent, updateDictFunc, port):
        Thread.__init__(self)
        self.goOnEvent = goOnEvent
        self.updateDictFunc = updateDictFunc
        self.port = port
111
112
113
        self.recSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.recSocket.settimeout(0.2)
        self.recSocket.bind(("", port))
David Trattnig's avatar
David Trattnig committed
114
115

    def __repr__(self):
116
        return f"Heartbeat Server on port: {self.port}"
David Trattnig's avatar
David Trattnig committed
117
118

    def run(self):
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        while self.goOnEvent.isSet():
            if DEBUG_ENABLED:
                print("Waiting to receive...")
            try:
                data, addr = self.recSocket.recvfrom(6)
            except socket.timeout:
                # no incoming message -> no timestamp update -> check again
                pass
            else:
                if DEBUG_ENABLED:
                    print(f"Received packet from {addr}")
                self.updateDictFunc(addr[0])


def main():
David Trattnig's avatar
David Trattnig committed
134
    "Listen to the heartbeats and detect inactive clients"
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    if len(sys.argv) > 1:
        heartbeat_port = int(sys.argv[1])
    else:
        heartbeat_port = DEFAULT_HEARTBEAT_PORT
    if len(sys.argv) > 2:
        wait_period = float(sys.argv[2])
    else:
        wait_period = DEFAULT_WAIT_PERIOD

    beatRecGoOnEvent = Event()
    beatRecGoOnEvent.set()
    beatDictObject = BeatDict()
    beatRecThread = BeatRec(beatRecGoOnEvent, beatDictObject.update, heartbeat_port)
    if DEBUG_ENABLED:
        print(beatRecThread)
    beatRecThread.start()
    print(f"PyHeartBeat server listening on port {heartbeat_port}")
    print("\n*** Press Ctrl-C to stop ***\n")
    while True:
David Trattnig's avatar
David Trattnig committed
154
        try:
155
156
157
            if DEBUG_ENABLED:
                print(f"Beat Dictionary: {beatDictObject}")
            silent = beatDictObject.extractSilent(wait_period)
David Trattnig's avatar
David Trattnig committed
158
            if silent:
159
160
                print(f"Silent clients: {' '.join(silent)}")
            sleep(wait_period)
David Trattnig's avatar
David Trattnig committed
161
        except KeyboardInterrupt:
162
163
164
165
166
            print("Exiting.")
            beatRecGoOnEvent.clear()
            beatRecThread.join()
            break

David Trattnig's avatar
David Trattnig committed
167

168
169
if __name__ == "__main__":
    main()