Skip to content
Snippets Groups Projects
Commit be04794b authored by David Trattnig's avatar David Trattnig
Browse files

Parent timer init and synchronized storage. #62

parent 6c7d6af8
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ import time
import json
from threading import Thread, Timer
from threading import Thread, Timer, Lock
from datetime import datetime, timedelta
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
......@@ -184,6 +184,7 @@ class EngineExecutor(Timer):
Primarily used for automations performed by the scheduler.
"""
_lock = None
logger = logging.getLogger("AuraEngine")
timer_store = {}
parent_timer = None
......@@ -206,12 +207,17 @@ class EngineExecutor(Timer):
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
"""
self._lock = Lock()
from src.core.engine import Engine
now_unix = Engine.engine_time()
now_unix = Engine.engine_time()
# Init parent-child relation
self.parent_timer = parent_timer
if self.parent_timer:
self.parent_timer.child_timer = self
# Init meta data
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
......@@ -228,7 +234,7 @@ class EngineExecutor(Timer):
is_stored = self.update_store()
if not is_stored:
self.logger.info(SU.red("Timer '{self.timer_id}' omitted because it's already existing but dead"))
self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead"))
else:
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
......@@ -246,7 +252,10 @@ class EngineExecutor(Timer):
"""
Child timers are dependend on their parents. So let's wait until parents are done with their stuff.
"""
if self.parent_timer and self.parent_timer.is_alive():
if self.parent_timer:
# Wait a bit to allow any parent to complete initialization, in case child & parent are instantiated at the 'same' time
# Required to avoid "Thread.__init__() not called" exceptions on the parent
time.sleep(0.1)
while self.parent_timer.is_alive():
self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish")
time.sleep(0.2)
......@@ -260,6 +269,7 @@ class EngineExecutor(Timer):
self.wait_for_parent()
thread = Thread(target = self.func, args = (self.param,))
thread.start()
self.cancel()
def exec_timed(self):
......@@ -267,10 +277,10 @@ class EngineExecutor(Timer):
Timed execution in a thread.
"""
def wrapper_func(param=None):
self.wait_for_parent()
if param: self.func(param,)
else: self.func()
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
self.wait_for_parent()
if param: self.func(param,)
else: self.func()
super().__init__(self.diff, wrapper_func, (self.param,))
def update_store(self):
......@@ -283,27 +293,29 @@ class EngineExecutor(Timer):
Returns:
(Boolean): True if the timer has been added to the store
"""
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
with self._lock:
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
if existing_command:
# Check if existing timer has been executed already -> don't update
if not existing_command.is_alive():
self.logger.debug(f"Existing previous timer with ID: {self.timer_id}. Don't update.")
return False
# Check if existing timer has been executed already -> don't update
if not existing_command.is_alive():
self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.")
self.cancel()
return False
# Still waiting for execution -> update
else:
self.logger.debug(f"Cancelling previous timer with ID: {self.timer_id}")
existing_command.cancel()
if existing_command.child_timer:
self.logger.debug(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")
# Still waiting for execution -> update
else:
self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}")
existing_command.cancel()
if existing_command.child_timer:
self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}")
EngineExecutor.timer_store[self.timer_id] = self
self.logger.debug(f"Created command timer with ID: {self.timer_id}")
return True
EngineExecutor.timer_store[self.timer_id] = self
self.logger.debug(f"Created command timer with ID: {self.timer_id}")
return True
def is_alive(self):
......@@ -325,29 +337,29 @@ class EngineExecutor(Timer):
@staticmethod
def remove_stale_timers():
"""
Removes timers from store which have been executed and are older than 5 hours.
Removes timers from store which have been executed and are older than 3 hours.
"""
timers = EngineExecutor.timer_store.values()
del_keys = []
for timer in timers:
if timer.dt < datetime.now() - timedelta(hours=5):
if timer.dt < datetime.now() - timedelta(hours=3):
if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()):
timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}")
del_keys.append(timer.timer_id)
for timer_id in del_keys:
del EngineExecutor.timer_store[timer_id]
del EngineExecutor.timer_store[timer_id]
@staticmethod
def log_commands():
"""
Prints a list of active timers and inactive timer not older than one hour.
Prints a list of recent active and inactive timers.
"""
timers = EngineExecutor.timer_store.values()
msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n")
EngineExecutor.remove_stale_timers()
timers = EngineExecutor.timer_store.values()
if not timers:
msg += "\nNone available!\n"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment