Commit 37c71863 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[execution][local] Refactor execution to use Executor from beat.backend.python

parent e18d0bdd
......@@ -36,10 +36,8 @@ Execution utilities
import os
import sys
import glob
import errno
import tempfile
import subprocess
import shutil
import zmq
import time
......@@ -48,20 +46,14 @@ logger = logging.getLogger(__name__)
import simplejson
from .. import schema
from .. import database
from .. import algorithm
from .. import inputs
from .. import outputs
from .. import data
from .. import stats
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import AccessMode
from beat.backend.python.executor import Executor
from beat.backend.python.message_handler import MessageHandler
class LocalExecutor(BaseExecutor):
"""LocalExecutor runs the code given an execution block information
......@@ -168,42 +160,6 @@ class LocalExecutor(BaseExecutor):
custom_root_folders=custom_root_folders)
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
super(LocalExecutor, self).__enter__()
self._prepare_inputs()
self._prepare_outputs()
return self
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
(self.input_list, self.data_loaders) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
cache_access=AccessMode.LOCAL, db_access=AccessMode.LOCAL,
databases=self.databases
)
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
(self.output_list, self.data_sinks) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
input_list=self.input_list, data_loaders=self.data_loaders
)
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
"""Executes the user algorithm code
......@@ -281,79 +237,54 @@ class LocalExecutor(BaseExecutor):
raise RuntimeError("execution information is bogus:\n * %s" % \
'\n * '.join(self.errors))
self.runner = self.algorithm.runner()
message_handler = MessageHandler('127.0.0.1')
message_handler.start()
zmq_context = zmq.Context()
executor_socket = zmq_context.socket(zmq.PAIR)
executor_socket.connect(message_handler.address)
working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(working_dir, 'prefix')
retval = self.runner.setup(self.data['parameters'])
self.dump_runner_configuration(working_dir)
self.algorithm.export(working_prefix)
executor = Executor(executor_socket,
working_dir,
database_cache=self.databases,
cache_root=self.cache)
retval = executor.setup()
if not retval:
raise RuntimeError("Algorithm setup failed")
prepared = self.runner.prepare(self.data_loaders)
prepared = executor.prepare()
if not prepared:
raise RuntimeError("Algorithm prepare failed")
if not self.input_list or not self.output_list:
raise RuntimeError("I/O for execution block has not yet been set up")
_start = time.time()
try:
if self.algorithm.type == algorithm.Algorithm.AUTONOMOUS:
if self.analysis:
result = self.runner.process(data_loaders=self.data_loaders,
output=self.output_list[0])
else:
result = self.runner.process(data_loaders=self.data_loaders,
outputs=self.output_list)
if not result:
return False
else:
main_group = self.input_list.main_group
while main_group.hasMoreData():
main_group.restricted_access = False
main_group.next()
main_group.restricted_access = True
try:
if self.algorithm.type == algorithm.Algorithm.LEGACY:
if self.analysis:
success = self.runner.process(inputs=self.input_list,
output=self.output_list[0])
else:
success = self.runner.process(inputs=self.input_list,
outputs=self.output_list)
elif self.algorithm.type == algorithm.Algorithm.SEQUENTIAL:
if self.analysis:
success = self.runner.process(inputs=self.input_list,
data_loaders=self.data_loaders,
output=self.output_list[0])
else:
success = self.runner.process(inputs=self.input_list,
data_loaders=self.data_loaders,
outputs=self.output_list)
if not success:
return _create_result(1, "The algorithm returned 'False'")
except Exception as e:
message = _process_exception(e, self.prefix, 'algorithms')
return _create_result(1, message)
processed = executor.process()
except Exception as e:
message = _process_exception(e, self.prefix, 'databases')
return _create_result(1, message)
missing_data_outputs = [x for x in self.output_list if x.isDataMissing()]
if not processed:
raise RuntimeError("Algorithm process failed")
proc_time = time.time() - _start
if missing_data_outputs:
raise RuntimeError("Missing data on the following output(s): %s" % \
', '.join([x.name for x in missing_data_outputs]))
# some local information
logger.debug("Total processing time was %.3f seconds" , proc_time)
message_handler.kill()
message_handler.join()
message_handler.destroy()
executor_socket.setsockopt(zmq.LINGER, 0)
executor_socket.close()
zmq_context.destroy()
shutil.rmtree(working_dir)
return _create_result(0)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment