Commit e18d0bdd authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[execution][docker] Implement support for loop blocks handling

parent 9c587307
......@@ -152,6 +152,91 @@ class DockerExecutor(RemoteExecutor):
self.host = host
def __create_db_container(self, datasets_uid, network_name):
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases')
database_paths = {}
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
except:
raise RuntimeError("No environment found for the databases `%s' " \
"- available environments are %s" % (
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys())))
# Creation of the container
# Note: we only support one databases image loaded at the same time
database_port = utils.find_free_port()
cmd = [
'databases_provider',
'0.0.0.0:%d' % database_port,
'/beat/prefix',
'/beat/cache'
]
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_info_name = "beat_db_%s" % utils.id_generator()
databases_info = self.host.create_container(databases_environment, cmd)
databases_info.uid = datasets_uid
databases_info.network_name = network_name
databases_info.set_name(databases_info_name)
# Specify the volumes to mount inside the container
databases_info.add_volume(databases_configuration_path, '/beat/prefix')
databases_info.add_volume(self.cache, '/beat/cache')
for db_name, db_path in database_paths.items():
databases_info.add_volume(db_path, os.path.join('/databases', db_name))
# Start the container
while True:
try:
databases_info.add_port(database_port, database_port, host_address=self.host.ip)
self.host.start(databases_info)
break
except Exception as e:
if str(e).find('port is already allocated') < 0:
break
databases_info.reset_ports()
database_port = utils.find_free_port()
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_info.command = cmd
database_ip = self.host.get_ipaddress(databases_info)
retval = dict(
configuration_path=databases_configuration_path,
container=databases_info,
address=database_ip,
port=database_port
)
return retval
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
"""Executes the user algorithm code using an external program.
......@@ -230,96 +315,58 @@ class DockerExecutor(RemoteExecutor):
#----- (If necessary) Instantiate the docker container that provide the databases
databases_container = None
databases_configuration_path = None
datasets_uid = self.data.pop('datasets_uid', None)
network_name = self.data.pop('network_name', 'bridge')
databases_infos = {}
if len(self.databases) > 0:
databases_infos['db'] = self.__create_db_container(datasets_uid, network_name)
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases')
database_paths = {}
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
#----- Instantiate the algorithm container
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
except:
raise RuntimeError("No environment found for the databases `%s' " \
"- available environments are %s" % (
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys())))
loop_algorithm_container = None
loop_algorithm_container_ip = None
loop_algorithm_container_port = None
# Creation of the container
# Note: we only support one databases image loaded at the same time
database_port = utils.find_free_port()
if self.loop_algorithm is not None:
if len(self.databases) > 0:
databases_infos['loop_db'] = self.__create_db_container(datasets_uid, network_name)
loop_algorithm_container_port = utils.find_free_port()
cmd = [
'databases_provider',
'0.0.0.0:%d' % database_port,
'loop_execute',
'0.0.0.0:{}'.format(loop_algorithm_container_port),
'/beat/prefix',
'/beat/cache'
]
if len(self.databases) > 0:
cmd.append('tcp://{}:{}'.format(databases_infos['loop_db']['address'],
databases_infos['loop_db']['port']))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_container_name = "beat_db_%s" % utils.id_generator()
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.uid = datasets_uid
databases_container.network_name = network_name
databases_container.set_name(databases_container_name)
# Specify the volumes to mount inside the container
databases_container.add_volume(databases_configuration_path, '/beat/prefix')
databases_container.add_volume(self.cache, '/beat/cache')
loop_algorithm_container = self.host.create_container(processing_environment, cmd)
loop_algorithm_container.uid = datasets_uid
loop_algorithm_container.network_name = network_name
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
# Volumes
loop_algorithm_container.add_volume(configuration_path, '/beat/prefix')
loop_algorithm_container.add_volume(self.cache, '/beat/cache', read_only=False)
# Start the container
while True:
try:
databases_container.add_port(database_port, database_port, host_address=self.host.ip)
self.host.start(databases_container)
break
except Exception as e:
if str(e).find('port is already allocated') < 0:
break
databases_container.reset_ports()
database_port = utils.find_free_port()
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_container.command = cmd
database_ip = self.host.get_ipaddress(databases_container)
#----- Instantiate the algorithm container
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
self.host.start(loop_algorithm_container,
virtual_memory_in_megabytes=virtual_memory_in_megabytes,
max_cpu_percent=max_cpu_percent
)
loop_algorithm_container_ip = self.host.get_ipaddress(loop_algorithm_container)
# Command to execute
cmd = [
......@@ -330,7 +377,12 @@ class DockerExecutor(RemoteExecutor):
]
if len(self.databases) > 0:
cmd.append('tcp://' + database_ip + ':%d' % database_port)
cmd.append('tcp://%s:%d' % (databases_infos['db']['address'],
databases_infos['db']['port']))
if self.loop_algorithm is not None:
cmd.append('tcp://%s:%d' % (loop_algorithm_container_ip,
loop_algorithm_container_port))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
......@@ -373,9 +425,11 @@ class DockerExecutor(RemoteExecutor):
status = self.host.wait(algorithm_container)
finally:
if databases_container is not None:
self.host.kill(databases_container)
self.host.wait(databases_container)
for name, databases_info in databases_infos.items():
logger.debug("Stopping database container " + name)
container = databases_info['container']
self.host.kill(container)
self.host.wait(container)
self.message_handler.stop.set()
self.message_handler.join()
......@@ -409,25 +463,29 @@ class DockerExecutor(RemoteExecutor):
self.host.rm(algorithm_container)
if databases_container is not None:
db_container_log = self.host.logs(databases_container)
for name, databases_info in databases_infos.items():
container = databases_info['container']
db_container_log = self.host.logs(container)
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug("Log of the database container: " + db_container_log)
logger.debug("Log of the" + name + "database container: " + db_container_log)
if status != 0:
retval['stderr'] += '\n' + db_container_log
else:
retval['stdout'] += '\n' + db_container_log
self.host.rm(databases_container)
self.host.rm(container)
if loop_algorithm_container:
self.host.rm(loop_algorithm_container)
self.message_handler.destroy()
self.message_handler = None
if not self.debug:
if databases_configuration_path:
shutil.rmtree(databases_configuration_path)
for _, databases_info in databases_infos.items():
shutil.rmtree(databases_info['configuration_path'])
shutil.rmtree(configuration_path)
return retval
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment