Commit 3dd85627 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[execution][subprocess] Implement support for loop blocks handling

parent f00077a6
......@@ -145,6 +145,7 @@ class BaseExecutor(object):
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
self.algorithm = None
self.loop_algorithm = None
self.databases = {}
self.input_list = None
self.data_loaders = None
......@@ -185,14 +186,34 @@ class BaseExecutor(object):
if self.data['algorithm'] in algorithm_cache:
self.algorithm = algorithm_cache[self.data['algorithm']]
else:
self.algorithm = algorithm.Algorithm(self.prefix, self.data['algorithm'],
dataformat_cache, library_cache)
self.algorithm = algorithm.Algorithm(self.prefix,
self.data['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.algorithm.name] = self.algorithm
if not self.algorithm.valid:
self.errors += self.algorithm.errors
return
if 'loop' in self.data:
loop = self.data['loop']
if loop['algorithm'] in algorithm_cache:
self.loop_algorithm = algorithm_cache[loop['algorithm']]
else:
self.loop_algorithm = algorithm.Algorithm(self.prefix,
loop['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.loop_algorithm.name] = self.loop_algorithm
if len(loop['inputs']) != len(self.loop_algorithm.input_map):
self.errors.append("The number of inputs of the loop algorithm doesn't correspond")
for name in self.data['inputs'].keys():
if name not in self.algorithm.input_map.keys():
self.errors.append("The input '%s' doesn't exist in the loop algorithm" % name)
# Check that the mapping in coherent
if len(self.data['inputs']) != len(self.algorithm.input_map):
self.errors.append("The number of inputs of the algorithm doesn't correspond")
......@@ -209,31 +230,19 @@ class BaseExecutor(object):
if name not in self.algorithm.output_map.keys():
self.errors.append("The output '%s' doesn't exist in the algorithm" % name)
if 'loop' in self.data:
for name in ['request', 'answer']:
if name not in self.algorithm.loop_map.keys():
self.errors.append("The loop '%s' doesn't exist in the algorithm" % name)
if self.errors:
return
# Load the databases (if any is required)
for name, details in self.data['inputs'].items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
self._update_db_cache(self.data['inputs'], database_cache)
if not db.valid:
self.errors += db.errors
if 'loop' in self.data:
self._update_db_cache(self.data['loop']['inputs'], database_cache)
def __enter__(self):
......@@ -247,9 +256,6 @@ class BaseExecutor(object):
logger.info("Start the execution of '%s'", self.algorithm.name)
# self._prepare_inputs()
# self._prepare_outputs()
return self
......@@ -269,6 +275,32 @@ class BaseExecutor(object):
self.data_sinks = []
def _update_db_cache(self, inputs, database_cache):
""" Update the database cache based on the input list given"""
for name, details in inputs.items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
if not db.valid:
self.errors += db.errors
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
......@@ -380,6 +412,9 @@ class BaseExecutor(object):
self.algorithm.export(tmp_prefix)
if self.loop_algorithm:
self.loop_algorithm.export(tmp_prefix)
def dump_databases_provider_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
......
......@@ -165,6 +165,57 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=library_cache,
custom_root_folders=custom_root_folders)
# We need three apps to run this function: databases_provider and execute
self.EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]),
'execute'))
self.LOOP_EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]),
'loop_execute'))
self.DBPROVIDER_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]),
'databases_provider'))
def __create_db_process(self):
databases_process = None
databases_configuration_path = None
database_port = None
database_port = utils.find_free_port()
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# Creation of the subprocess
# Note: we only support one databases image loaded at the same time
cmd = [
self.DBPROVIDER_BIN,
self.ip_address + (':%d' % database_port),
databases_configuration_path,
self.cache,
]
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_process_stdout = tempfile.NamedTemporaryFile(delete=False)
databases_process_stderr = tempfile.NamedTemporaryFile(delete=False)
logger.debug("Starting database process with: %s" % " ".join(cmd))
databases_process = sp.Popen(cmd,
stdout=databases_process_stdout,
stderr=databases_process_stderr)
retval = dict(
configuration_path=databases_configuration_path,
process=databases_process,
port=database_port,
stdout=databases_process_stdout,
stderr=databases_process_stderr
)
return retval
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
......@@ -215,19 +266,16 @@ class SubprocessExecutor(RemoteExecutor):
'\n * '.join(self.errors))
# We need two apps to run this function: databases_provider and execute
EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]),
'execute'))
DBPROVIDER_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]),
'databases_provider'))
missing_scripts = []
if (len(self.databases) > 0) and DBPROVIDER_BIN is None:
if (len(self.databases) > 0) and self.DBPROVIDER_BIN is None:
missing_scripts.append('databases_provider')
if EXECUTE_BIN is None:
if self.EXECUTE_BIN is None:
missing_scripts.append('execute')
if self.LOOP_EXECUTE_BIN is None:
missing_scripts.append('loop_execute')
if missing_scripts:
raise RuntimeError("Scripts not found at PATH (%s): %s" % \
(os.environ.get('PATH', ''), ', '.join(missing_scripts)))
......@@ -245,52 +293,69 @@ class SubprocessExecutor(RemoteExecutor):
#----- (If necessary) Instantiate the subprocess that provide the databases
databases_process = None
databases_configuration_path = None
database_port = utils.find_free_port()
database_infos = {}
if len(self.databases) > 0:
database_infos['db'] = self.__create_db_process()
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# import ipdb;ipdb.set_trace()
# Creation of the subprocess
# Note: we only support one databases image loaded at the same time
#----- Instantiate the algorithm subprocess
loop_algorithm_port = None
loop_algorithm_process = None
if self.loop_algorithm is not None:
if len(self.databases) > 0:
database_infos['loop_db'] = self.__create_db_process()
loop_algorithm_port = utils.find_free_port()
cmd = [
DBPROVIDER_BIN,
self.ip_address + (':%d' % database_port),
databases_configuration_path,
self.cache,
self.LOOP_EXECUTE_BIN,
self.ip_address + (':%d' % loop_algorithm_port),
configuration_path,
self.cache
]
if len(self.databases) > 0:
cmd.append('tcp://' + self.ip_address + (':%d' % database_infos['loop_db']['port']))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_process_stdout = tempfile.NamedTemporaryFile(delete=False)
databases_process_stderr = tempfile.NamedTemporaryFile(delete=False)
# Creation of the container
loop_algorithm_process_stdout = tempfile.NamedTemporaryFile(delete=False)
loop_algorithm_process_stderr = tempfile.NamedTemporaryFile(delete=False)
databases_process = sp.Popen(cmd, stdout=databases_process_stdout,
stderr=databases_process_stderr)
logger.debug("Starting loop process with: %s" % " ".join(cmd))
loop_algorithm_process = sp.Popen(cmd,
stdout=loop_algorithm_process_stdout,
stderr=loop_algorithm_process_stderr)
#----- Instantiate the algorithm subprocess
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
# Process the messages until the subprocess is done
# self.loop_message_handler.start()
#----- Instantiate the algorithms subprocess
# Command to execute
cmd = [
EXECUTE_BIN,
self.EXECUTE_BIN,
'--cache=%s' % self.cache,
self.message_handler.address,
configuration_path,
]
if len(self.databases) > 0:
cmd.append('tcp://' + self.ip_address + (':%d' % database_port))
cmd.append('tcp://' + self.ip_address + (':%d' % database_infos['db']['port']))
if self.loop_algorithm is not None:
cmd.append('tcp://' + self.ip_address + (':%d' % loop_algorithm_port))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
......@@ -299,7 +364,10 @@ class SubprocessExecutor(RemoteExecutor):
algorithm_process_stdout = tempfile.NamedTemporaryFile(delete=False)
algorithm_process_stderr = tempfile.NamedTemporaryFile(delete=False)
algorithm_process = sp.Popen(cmd, stdout=algorithm_process_stdout,
logger.debug("Starting algorithm process with: %s" % " ".join(cmd))
algorithm_process = sp.Popen(cmd,
stdout=algorithm_process_stdout,
stderr=algorithm_process_stderr)
......@@ -358,23 +426,38 @@ class SubprocessExecutor(RemoteExecutor):
stats.update(retval['statistics']['data'], self.io_statistics)
if databases_process is not None:
for name, db_info in database_infos.items():
logger.debug("Stopping %s process" % name)
databases_process = db_info['process']
databases_process.terminate()
databases_process.communicate()
with open(databases_process_stdout.name, 'r') as f:
with open(db_info['stdout'].name, 'r') as f:
retval['stdout'] += '\n' + f.read()
if status != 0:
with open(db_info['stderr'].name, 'r') as f:
retval['stderr'] += '\n' + f.read()
if loop_algorithm_process is not None:
logger.debug("Stopping loop process")
loop_algorithm_process.terminate()
loop_algorithm_process.communicate()
with open(loop_algorithm_process_stdout.name, 'r') as f:
retval['stdout'] += '\n' + f.read()
if status != 0:
with open(databases_process_stderr.name, 'r') as f:
with open(loop_algorithm_process_stderr.name, 'r') as f:
retval['stderr'] += '\n' + f.read()
self.message_handler.destroy()
self.message_handler = None
if not self.debug:
if databases_configuration_path:
shutil.rmtree(databases_configuration_path)
for _, db_info in database_infos.items():
shutil.rmtree(db_info['configuration_path'])
shutil.rmtree(configuration_path)
return retval
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment