Commit b841f264 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[execution][local] Implement support for loop block handling

parent b08d4150
......@@ -55,6 +55,10 @@ from beat.backend.python.helpers import AccessMode
from beat.backend.python.executor import Executor
from beat.backend.python.message_handler import MessageHandler
from beat.backend.python.loop_executor import Executor as LoopExecutor
from beat.backend.python.loop_executor import LoopMessageHandler
class LocalExecutor(BaseExecutor):
"""LocalExecutor runs the code given an execution block information
......@@ -159,6 +163,39 @@ class LocalExecutor(BaseExecutor):
library_cache=library_cache,
custom_root_folders=custom_root_folders)
self.working_dir = None
self.executor = None
self.message_handler = None
self.executor_socket = None
self.loop_executor = None
self.loop_message_handler = None
self.loop_socket = None
self.zmq_context = None
def __cleanup(self):
if self.loop_executor:
self.loop_executor.wait()
for handler in [self.message_handler, self.loop_message_handler]:
if handler:
handler.kill()
handler.join()
handler.destroy()
for socket in [self.executor_socket, self.loop_socket]:
if socket:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
if self.zmq_context is not None:
self.zmq_context.destroy()
if self.working_dir is not None:
shutil.rmtree(self.working_dir)
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
......@@ -237,41 +274,69 @@ class LocalExecutor(BaseExecutor):
raise RuntimeError("execution information is bogus:\n * %s" % \
'\n * '.join(self.errors))
message_handler = MessageHandler('127.0.0.1')
message_handler.start()
self.message_handler = MessageHandler('127.0.0.1')
self.message_handler.start()
zmq_context = zmq.Context()
executor_socket = zmq_context.socket(zmq.PAIR)
executor_socket.connect(message_handler.address)
self.zmq_context = zmq.Context()
self.executor_socket = self.zmq_context.socket(zmq.PAIR)
self.executor_socket.connect(self.message_handler.address)
working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(working_dir, 'prefix')
self.working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(self.working_dir, 'prefix')
self.dump_runner_configuration(working_dir)
self.dump_runner_configuration(self.working_dir)
self.algorithm.export(working_prefix)
executor = Executor(executor_socket,
working_dir,
database_cache=self.databases,
cache_root=self.cache)
if self.loop_algorithm:
self.loop_algorithm.export(working_prefix)
self.loop_message_handler = LoopMessageHandler('127.0.0.1')
self.loop_socket = self.zmq_context.socket(zmq.PAIR)
self.loop_socket.connect(self.loop_message_handler.address)
self.loop_executor = LoopExecutor(self.loop_message_handler,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache)
retval = self.loop_executor.setup()
if not retval:
self.__cleanup()
raise RuntimeError("Loop algorithm setup failed")
prepared = self.loop_executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed")
self.loop_executor.process()
self.executor = Executor(self.executor_socket,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache,
loop_socket=self.loop_socket)
retval = executor.setup()
retval = self.executor.setup()
if not retval:
self.__cleanup()
raise RuntimeError("Algorithm setup failed")
prepared = executor.prepare()
prepared = self.executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Algorithm prepare failed")
_start = time.time()
try:
processed = executor.process()
processed = self.executor.process()
except Exception as e:
message = _process_exception(e, self.prefix, 'databases')
self.__cleanup()
return _create_result(1, message)
if not processed:
self.__cleanup()
raise RuntimeError("Algorithm process failed")
proc_time = time.time() - _start
......@@ -279,12 +344,6 @@ class LocalExecutor(BaseExecutor):
# some local information
logger.debug("Total processing time was %.3f seconds" , proc_time)
message_handler.kill()
message_handler.join()
message_handler.destroy()
executor_socket.setsockopt(zmq.LINGER, 0)
executor_socket.close()
zmq_context.destroy()
shutil.rmtree(working_dir)
self.__cleanup()
return _create_result(0)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment