From 5cfc996078f671045550b329f3beb97bc0fac288 Mon Sep 17 00:00:00 2001 From: David Trattnig <david.trattnig@o94.at> Date: Mon, 14 Dec 2020 21:50:31 +0100 Subject: [PATCH] Parent timer init and synchronized storage. #62 --- src/core/control.py | 74 ++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/src/core/control.py b/src/core/control.py index 7ca70e6a..e099aa18 100644 --- a/src/core/control.py +++ b/src/core/control.py @@ -23,7 +23,7 @@ import time import json -from threading import Thread, Timer +from threading import Thread, Timer, Lock from datetime import datetime, timedelta from http_parser.http import HttpStream from http_parser.reader import SocketReader @@ -184,6 +184,7 @@ class EngineExecutor(Timer): Primarily used for automations performed by the scheduler. """ + _lock = None logger = logging.getLogger("AuraEngine") timer_store = {} parent_timer = None @@ -206,12 +207,17 @@ class EngineExecutor(Timer): due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way func (function): The function to be called param (object): Parameter passt to the function - """ + """ + self._lock = Lock() from src.core.engine import Engine - now_unix = Engine.engine_time() + now_unix = Engine.engine_time() + + # Init parent-child relation self.parent_timer = parent_timer if self.parent_timer: self.parent_timer.child_timer = self + + # Init meta data self.direct_exec = False self.timer_type = timer_type self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" @@ -228,7 +234,7 @@ class EngineExecutor(Timer): is_stored = self.update_store() if not is_stored: - self.logger.info(SU.red("Timer '{self.timer_id}' omitted because it's already existing but dead")) + self.logger.info(SU.red(f"Timer '{self.timer_id}' omitted because it's already existing but dead")) else: if diff < 0: msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." @@ -246,7 +252,10 @@ class EngineExecutor(Timer): """ Child timers are dependend on their parents. So let's wait until parents are done with their stuff. """ - if self.parent_timer and self.parent_timer.is_alive(): + if self.parent_timer: + # Wait a bit to allow any parent to complete initialization, in case child & parent are instantiated at the 'same' time + # Required to avoid "Thread.__init__() not called" exceptions on the parent + time.sleep(0.1) while self.parent_timer.is_alive(): self.logger.info(f"Timer '{self.timer_id}' is waiting for parent timer '{self.parent_timer.timer_id}' to finish") time.sleep(0.2) @@ -260,6 +269,7 @@ class EngineExecutor(Timer): self.wait_for_parent() thread = Thread(target = self.func, args = (self.param,)) thread.start() + self.cancel() def exec_timed(self): @@ -267,10 +277,10 @@ class EngineExecutor(Timer): Timed execution in a thread. """ def wrapper_func(param=None): - self.wait_for_parent() - if param: self.func(param,) - else: self.func() - Timer.__init__(self, self.diff, wrapper_func, (self.param,)) + self.wait_for_parent() + if param: self.func(param,) + else: self.func() + super().__init__(self.diff, wrapper_func, (self.param,)) def update_store(self): @@ -283,27 +293,29 @@ class EngineExecutor(Timer): Returns: (Boolean): True if the timer has been added to the store """ - existing_command = None - if self.timer_id in EngineExecutor.timer_store: - existing_command = EngineExecutor.timer_store[self.timer_id] + with self._lock: + existing_command = None + if self.timer_id in EngineExecutor.timer_store: + existing_command = EngineExecutor.timer_store[self.timer_id] - if existing_command: + if existing_command: - # Check if existing timer has been executed already -> don't update - if not existing_command.is_alive(): - self.logger.debug(f"Existing previous timer with ID: {self.timer_id}. Don't update.") - return False + # Check if existing timer has been executed already -> don't update + if not existing_command.is_alive(): + self.logger.debug(f"Existing dead timer with ID: {self.timer_id}. Don't update.") + self.cancel() + return False - # Still waiting for execution -> update - else: - self.logger.debug(f"Cancelling previous timer with ID: {self.timer_id}") - existing_command.cancel() - if existing_command.child_timer: - self.logger.debug(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}") + # Still waiting for execution -> update + else: + self.logger.debug(f"Cancelling existingTimer with ID: {self.timer_id}") + existing_command.cancel() + if existing_command.child_timer: + self.logger.debug(f"Cancelling existingTimer:childTimer with ID: {existing_command.child_timer.timer_id}") - EngineExecutor.timer_store[self.timer_id] = self - self.logger.debug(f"Created command timer with ID: {self.timer_id}") - return True + EngineExecutor.timer_store[self.timer_id] = self + self.logger.debug(f"Created command timer with ID: {self.timer_id}") + return True def is_alive(self): @@ -325,29 +337,29 @@ class EngineExecutor(Timer): @staticmethod def remove_stale_timers(): """ - Removes timers from store which have been executed and are older than 5 hours. + Removes timers from store which have been executed and are older than 3 hours. """ timers = EngineExecutor.timer_store.values() del_keys = [] for timer in timers: - if timer.dt < datetime.now() - timedelta(hours=5): + if timer.dt < datetime.now() - timedelta(hours=3): if not timer.child_timer or (timer.child_timer and not timer.child_timer.is_alive()): timer.logger.debug(f"Removing already executed timer with ID: {timer.timer_id}") del_keys.append(timer.timer_id) for timer_id in del_keys: - del EngineExecutor.timer_store[timer_id] + del EngineExecutor.timer_store[timer_id] @staticmethod def log_commands(): """ - Prints a list of active timers and inactive timer not older than one hour. + Prints a list of recent active and inactive timers. """ - timers = EngineExecutor.timer_store.values() msg = SU.blue("\n [ ENGINE COMMAND QUEUE ]\n") EngineExecutor.remove_stale_timers() + timers = EngineExecutor.timer_store.values() if not timers: msg += "\nNone available!\n" -- GitLab