diff --git a/modules/core/control.py b/modules/core/control.py index 39867a0a5416472361314b3202bcfcf8cef0b471..c58572050050ead78fcad1d7f810b81ae65e8ca9 100644 --- a/modules/core/control.py +++ b/modules/core/control.py @@ -23,12 +23,13 @@ import time import json -from threading import Thread -from http_parser.http import HttpStream -from http_parser.reader import SocketReader +from threading import Thread, Timer +from datetime import datetime, timedelta +from http_parser.http import HttpStream +from http_parser.reader import SocketReader -from modules.base.config import AuraConfig -from modules.base.utils import SimpleUtil as SU +from modules.base.config import AuraConfig +from modules.base.utils import SimpleUtil as SU @@ -175,4 +176,149 @@ class SocketControlInterface: def terminate(self): SocketControlInterface.instance = None self.server.close() - self.logger.info(SU.yellow("[ECI] Shutting down...")) \ No newline at end of file + self.logger.info(SU.yellow("[ECI] Shutting down...")) + + + + + +class EngineExecutor(Timer): + """ + Base class for timed or threaded execution of Engine commands. + + Primarly used for automation by the scheduler. + """ + logger = logging.getLogger("AuraEngine") + timer_store = {} + child_timer = None + direct_exec = None + timer_id = None + timer_type = None + param = None + diff = None + dt = None + + + def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None): + """ + Constructor + + Args: + timer_type (String): Prefix used for the `timer_id` to make it unique + child_timer (EngineExeuctor): Child action which is bound to this timer + due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way + func (function): The function to be called + param (object): Parameter passt to the function + """ + from modules.core.engine import Engine + now_unix = Engine.engine_time() + self.child_timer = child_timer + self.direct_exec = False + self.timer_type = timer_type + self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" + + if not due_time: + diff = 0 + else: + diff = due_time - now_unix + + self.diff = diff + self.dt = datetime.now() + timedelta(seconds=diff) + self.func = func + self.param = param + + if diff < 0: + msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." + self.logger.error(SU.red(msg)) + self.exec_now() + elif diff == 0: + self.logger.info(f"Timer '{self.timer_id}' to be executed immediately") + self.exec_now() + else: + self.exec_timed() + self.start() + + self.update_store() + + + + def exec_now(self): + """ + Immediate execution within a thread. It's not stored in the timer store. + """ + self.direct_exec = True + thread = Thread(target = self.func, args = (self.param,)) + thread.start() + + + + def exec_timed(self): + """ + Timed execution in a thread. + """ + def wrapper_func(param=None): + + # Remove from store + self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}")) + del EngineExecutor.timer_store[self.timer_id] + + # Call actual function + if param: self.func(param,) + else: self.func() + + Timer.__init__(self, self.diff, wrapper_func, (self.param,)) + + + def update_store(self): + """ + Adds the instance to the store and cancels any previously existing commands. + """ + existing_command = None + if self.timer_id in EngineExecutor.timer_store: + existing_command = EngineExecutor.timer_store[self.timer_id] + if existing_command: + self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}")) + existing_command.cancel() + if existing_command.child_timer: + self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")) + + EngineExecutor.timer_store[self.timer_id] = self + self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}")) + + + def is_alive(self): + """ + Returns true if the command is still due to be executed. + """ + if self.direct_exec == True: + return False + return super().is_alive() + + + def __str__(self): + """ + String represenation of the timer. + """ + return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})" + + + @staticmethod + def log_commands(): + """ + Prints a list of active timers to the log. + """ + timers = EngineExecutor.timer_store.values() + msg = "\n [ ENGINE COMMAND QUEUE ]\n" + + if not timers: + msg += "None available!\n" + else: + for timer in timers: + msg += f" => {str(timer)}\n" + if timer.child_timer: + msg += f" => {str(timer.child_timer)}\n" + + EngineExecutor.logger.info(msg + "\n") + + + diff --git a/modules/scheduling/fallback_manager.py b/modules/scheduling/fallback.py similarity index 61% rename from modules/scheduling/fallback_manager.py rename to modules/scheduling/fallback.py index 535f76d721ecff31be5ebd82aac8b9a9fbebe391..7b9b8657285899447f141196d897b373f8992429 100644 --- a/modules/scheduling/fallback_manager.py +++ b/modules/scheduling/fallback.py @@ -22,15 +22,13 @@ import logging from enum import Enum -from threading import Thread, Timer -from datetime import datetime, timedelta from modules.base.config import AuraConfig from modules.base.utils import SimpleUtil as SU from modules.base.mail import AuraMailer from modules.core.resources import ResourceClass from modules.core.channels import Channel - +from modules.core.control import EngineExecutor @@ -194,144 +192,6 @@ class FallbackManager: -class EngineExecutor(Timer): - """ - Base class for timed or threaded execution of Engine commands. - """ - logger = logging.getLogger("AuraEngine") - timer_store = {} - child_timer = None - direct_exec = None - timer_id = None - timer_type = None - param = None - diff = None - dt = None - - - def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None): - """ - Constructor - - Args: - timer_type (String): Prefix used for the `timer_id` to make it unique - child_timer (EngineExeuctor): Child action which is bound to this timer - due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way - func (function): The function to be called - param (object): Parameter passt to the function - """ - from modules.core.engine import Engine - now_unix = Engine.engine_time() - self.child_timer = child_timer - self.direct_exec = False - self.timer_type = timer_type - self.timer_id = f"{timer_type}:{func.__name__}:{due_time}" - - if not due_time: - diff = 0 - else: - diff = due_time - now_unix - - self.diff = diff - self.dt = datetime.now() + timedelta(seconds=diff) - self.func = func - self.param = param - - if diff < 0: - msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..." - self.logger.error(SU.red(msg)) - self.exec_now() - elif diff == 0: - self.logger.info(f"Timer '{self.timer_id}' to be executed immediately") - self.exec_now() - else: - self.exec_timed() - self.start() - - self.update_store() - - - - def exec_now(self): - """ - Immediate execution within a thread. It's not stored in the timer store. - """ - self.direct_exec = True - thread = Thread(target = self.func, args = (self.param,)) - thread.start() - - - - def exec_timed(self): - """ - Timed execution in a thread. - """ - def wrapper_func(param=None): - - # Remove from store - self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}")) - del EngineExecutor.timer_store[self.timer_id] - - # Call actual function - if param: self.func(param,) - else: self.func() - - Timer.__init__(self, self.diff, wrapper_func, (self.param,)) - - - def update_store(self): - """ - Adds the instance to the store and cancels any previously existing commands. - """ - existing_command = None - if self.timer_id in EngineExecutor.timer_store: - existing_command = EngineExecutor.timer_store[self.timer_id] - if existing_command: - self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}")) - existing_command.cancel() - if existing_command.child_timer: - self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}")) - - EngineExecutor.timer_store[self.timer_id] = self - self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}")) - - - def is_alive(self): - """ - Returns true if the command is still due to be executed. - """ - if self.direct_exec == True: - return False - return super().is_alive() - - - def __str__(self): - """ - String represenation of the timer. - """ - return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})" - - - @staticmethod - def log_commands(): - """ - Prints a list of active timers to the log. - """ - timers = EngineExecutor.timer_store.values() - msg = "\n [ ENGINE COMMAND QUEUE ]\n" - - if not timers: - msg += "None available!\n" - else: - for timer in timers: - msg += f" => {str(timer)}\n" - if timer.child_timer: - msg += f" => {str(timer.child_timer)}\n" - - EngineExecutor.logger.info(msg + "\n") - - - class FallbackCommand(EngineExecutor): """