Commit 05f6bd27 authored by David Trattnig's avatar David Trattnig
Browse files

Generic location for executor. #44

parent fe585e9f
......@@ -23,12 +23,13 @@ import time
import json
from threading import Thread
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from threading import Thread, Timer
from datetime import datetime, timedelta
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
......@@ -175,4 +176,149 @@ class SocketControlInterface:
def terminate(self):
SocketControlInterface.instance = None
self.server.close()
self.logger.info(SU.yellow("[ECI] Shutting down..."))
\ No newline at end of file
self.logger.info(SU.yellow("[ECI] Shutting down..."))
class EngineExecutor(Timer):
"""
Base class for timed or threaded execution of Engine commands.
Primarly used for automation by the scheduler.
"""
logger = logging.getLogger("AuraEngine")
timer_store = {}
child_timer = None
direct_exec = None
timer_id = None
timer_type = None
param = None
diff = None
dt = None
def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
"""
Constructor
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
child_timer (EngineExeuctor): Child action which is bound to this timer
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
from modules.core.engine import Engine
now_unix = Engine.engine_time()
self.child_timer = child_timer
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time:
diff = 0
else:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
self.exec_timed()
self.start()
self.update_store()
def exec_now(self):
"""
Immediate execution within a thread. It's not stored in the timer store.
"""
self.direct_exec = True
thread = Thread(target = self.func, args = (self.param,))
thread.start()
def exec_timed(self):
"""
Timed execution in a thread.
"""
def wrapper_func(param=None):
# Remove from store
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineExecutor.timer_store[self.timer_id]
# Call actual function
if param: self.func(param,)
else: self.func()
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
def update_store(self):
"""
Adds the instance to the store and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
if existing_command.child_timer:
self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))
EngineExecutor.timer_store[self.timer_id] = self
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
def is_alive(self):
"""
Returns true if the command is still due to be executed.
"""
if self.direct_exec == True:
return False
return super().is_alive()
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"
@staticmethod
def log_commands():
"""
Prints a list of active timers to the log.
"""
timers = EngineExecutor.timer_store.values()
msg = "\n [ ENGINE COMMAND QUEUE ]\n"
if not timers:
msg += "None available!\n"
else:
for timer in timers:
msg += f" => {str(timer)}\n"
if timer.child_timer:
msg += f" => {str(timer.child_timer)}\n"
EngineExecutor.logger.info(msg + "\n")
......@@ -22,15 +22,13 @@
import logging
from enum import Enum
from threading import Thread, Timer
from datetime import datetime, timedelta
from modules.base.config import AuraConfig
from modules.base.utils import SimpleUtil as SU
from modules.base.mail import AuraMailer
from modules.core.resources import ResourceClass
from modules.core.channels import Channel
from modules.core.control import EngineExecutor
......@@ -194,144 +192,6 @@ class FallbackManager:
class EngineExecutor(Timer):
"""
Base class for timed or threaded execution of Engine commands.
"""
logger = logging.getLogger("AuraEngine")
timer_store = {}
child_timer = None
direct_exec = None
timer_id = None
timer_type = None
param = None
diff = None
dt = None
def __init__(self, timer_type="BASE", child_timer=None, due_time=None, func=None, param=None):
"""
Constructor
Args:
timer_type (String): Prefix used for the `timer_id` to make it unique
child_timer (EngineExeuctor): Child action which is bound to this timer
due_time (Float): When timer should be executed. For values <= 0 execution happens immediately in a threaded way
func (function): The function to be called
param (object): Parameter passt to the function
"""
from modules.core.engine import Engine
now_unix = Engine.engine_time()
self.child_timer = child_timer
self.direct_exec = False
self.timer_type = timer_type
self.timer_id = f"{timer_type}:{func.__name__}:{due_time}"
if not due_time:
diff = 0
else:
diff = due_time - now_unix
self.diff = diff
self.dt = datetime.now() + timedelta(seconds=diff)
self.func = func
self.param = param
if diff < 0:
msg = f"Timer '{self.timer_id}' is due in the past. Executing immediately ..."
self.logger.error(SU.red(msg))
self.exec_now()
elif diff == 0:
self.logger.info(f"Timer '{self.timer_id}' to be executed immediately")
self.exec_now()
else:
self.exec_timed()
self.start()
self.update_store()
def exec_now(self):
"""
Immediate execution within a thread. It's not stored in the timer store.
"""
self.direct_exec = True
thread = Thread(target = self.func, args = (self.param,))
thread.start()
def exec_timed(self):
"""
Timed execution in a thread.
"""
def wrapper_func(param=None):
# Remove from store
self.logger.info(SU.green(f"Removing old timer with ID: {self.timer_id}"))
del EngineExecutor.timer_store[self.timer_id]
# Call actual function
if param: self.func(param,)
else: self.func()
Timer.__init__(self, self.diff, wrapper_func, (self.param,))
def update_store(self):
"""
Adds the instance to the store and cancels any previously existing commands.
"""
existing_command = None
if self.timer_id in EngineExecutor.timer_store:
existing_command = EngineExecutor.timer_store[self.timer_id]
if existing_command:
self.logger.info(SU.green(f"Cancelling previous timer with ID: {self.timer_id}"))
existing_command.cancel()
if existing_command.child_timer:
self.logger.info(SU.green(f"Cancelling child timer with ID: {existing_command.child_timer.timer_id}"))
EngineExecutor.timer_store[self.timer_id] = self
self.logger.info(SU.green(f"Created command timer with ID: {self.timer_id}"))
def is_alive(self):
"""
Returns true if the command is still due to be executed.
"""
if self.direct_exec == True:
return False
return super().is_alive()
def __str__(self):
"""
String represenation of the timer.
"""
return f"[{self.timer_id}] exec at {str(self.dt)} (alive: {self.is_alive()})"
@staticmethod
def log_commands():
"""
Prints a list of active timers to the log.
"""
timers = EngineExecutor.timer_store.values()
msg = "\n [ ENGINE COMMAND QUEUE ]\n"
if not timers:
msg += "None available!\n"
else:
for timer in timers:
msg += f" => {str(timer)}\n"
if timer.child_timer:
msg += f" => {str(timer.child_timer)}\n"
EngineExecutor.logger.info(msg + "\n")
class FallbackCommand(EngineExecutor):
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment