Commit 0d8bbd71 authored by Philip ABBET's avatar Philip ABBET

Refactoring of the beat.core.dock module

parent cb0c182d
This diff is collapsed.
......@@ -164,9 +164,9 @@ class DockerExecutor(BaseExecutor):
if envkey not in self.host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, self.host,
", ".join(self.host.environments.keys())))
", ".join(self.host.processing_environments.keys())))
self.proxy_mode = 'direct_access' not in self.host.environments[envkey].get('capabilities', [])
self.proxy_mode = 'direct_access' not in self.host.processing_environments[envkey].get('capabilities', [])
def __enter__(self):
......@@ -277,16 +277,31 @@ class DockerExecutor(BaseExecutor):
'\n * '.join(self.errors))
# Creates an in-memory archive containing all the configuration and files
# needed by the processing container
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
processing_archive = self.host.create_archive(configuration_path)
# Determine the docker image to use for the processing
processing_environment = '%(name)s (%(version)s)' % self.data['environment']
if processing_environment not in self.host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (processing_environment,
self.host, ", ".join(self.host.processing_environments.keys())))
# Creates the message handler
algorithm_container = None
def _kill():
self.host.kill(algorithm_container)
self.message_handler = message_handler.ProxyMessageHandler(
self.input_list, self.output_list, self.host.ip, kill_callback=_kill)
#----- (If necessary) Instantiate the docker container that provide the databases
databases_container = None
# (If necessary) Creates an in-memory archive containing all the configuration
# and files needed by the databases container
if self.db_address is not None:
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
......@@ -308,27 +323,6 @@ class DockerExecutor(BaseExecutor):
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
databases_archive = self.host.create_archive(databases_configuration_path)
# Creates the message handler
self.message_handler = message_handler.ProxyMessageHandler(
self.input_list, self.output_list, self.host.ip)
# Determine the docker image to use for the processing
processing_environment = '%(name)s (%(version)s)' % self.data['environment']
if processing_environment not in self.host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (processing_environment,
self.host, ", ".join(self.host.environments.keys())))
# (If necessary) Instantiate the docker container that provide the databases
databases_container = None
if self.db_address is not None:
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
......@@ -338,22 +332,7 @@ class DockerExecutor(BaseExecutor):
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys())))
# Specify the volumes to mount inside the container
volumes = {}
if not self.data.has_key('datasets_root_path'):
for db_name, db_path in database_paths.items():
volumes[db_path] = {
'bind': os.path.join('/databases', db_name),
'mode': 'ro',
}
else:
volumes[self.data['datasets_root_path']] = {
'bind': self.data['datasets_root_path'],
'mode': 'ro',
}
# Instantiate the container
# Creation of the container
# Note: we only support one databases image loaded at the same time
cmd = [
'databases_provider',
......@@ -361,25 +340,28 @@ class DockerExecutor(BaseExecutor):
os.path.join('/tmp', os.path.basename(databases_configuration_path))
]
databases_container = dock.Popen(
self.host,
databases_environment,
command=cmd,
configuration_archive=databases_archive,
volumes=volumes
)
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.copy_path(databases_configuration_path, '/tmp')
# Specify the volumes to mount inside the container
if not self.data.has_key('datasets_root_path'):
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
else:
databases_container.add_volume(self.data['datasets_root_path'],
self.data['datasets_root_path'])
# Start the container
self.host.start(databases_container)
# Specify the volumes to mount inside the algorithm container
volumes = {}
if not self.proxy_mode:
volumes[self.cache] = {
'bind': '/cache',
'mode': 'rw',
}
#----- Instantiate the algorithm container
# Instantiate the algorithm container
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
# Command to execute
cmd = [
'execute',
self.message_handler.address,
......@@ -389,46 +371,49 @@ class DockerExecutor(BaseExecutor):
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
algorithm_container = dock.Popen(
self.host,
processing_environment,
command=cmd,
configuration_archive=processing_archive,
virtual_memory_in_megabytes=virtual_memory_in_megabytes,
max_cpu_percent=max_cpu_percent,
volumes=volumes
# Creation of the container
algorithm_container = self.host.create_container(processing_environment, cmd)
algorithm_container.copy_path(configuration_path, '/tmp')
# Volumes
if not self.proxy_mode:
algorithm_container.add_volume(self.cache, '/cache', read_only=False)
# Start the container
self.host.start(algorithm_container,
virtual_memory_in_megabytes=virtual_memory_in_megabytes,
max_cpu_percent=max_cpu_percent
)
# Process the messages until the container is done
self.message_handler.set_process(algorithm_container)
self.message_handler.start()
timed_out = False
try:
timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None
status = algorithm_container.wait(timeout)
status = self.host.wait(algorithm_container, timeout)
except requests.exceptions.ReadTimeout:
logger.warn("user process has timed out after %d minutes", timeout_in_minutes)
algorithm_container.kill()
status = algorithm_container.wait()
self.host.kill(algorithm_container)
status = self.host.wait(algorithm_container)
if databases_container is not None:
databases_container.kill()
databases_container.wait()
self.host.kill(databases_container)
self.host.wait(databases_container)
timed_out = True
except KeyboardInterrupt: # Developer pushed CTRL-C
logger.info("stopping user process on CTRL-C console request")
algorithm_container.kill()
status = algorithm_container.wait()
self.host.kill(algorithm_container)
status = self.host.wait(algorithm_container)
if databases_container is not None:
databases_container.kill()
databases_container.wait()
self.host.kill(databases_container)
self.host.wait(databases_container)
finally:
self.message_handler.stop.set()
......@@ -436,27 +421,24 @@ class DockerExecutor(BaseExecutor):
# Collects final information and returns to caller
retval = dict(
stdout = algorithm_container.stdout,
stderr = algorithm_container.stderr,
stdout = self.host.stdout(algorithm_container),
stderr = self.host.stderr(algorithm_container),
status = status,
timed_out = timed_out,
statistics = self.message_handler.last_statistics,
statistics = self.host.statistics(algorithm_container),
system_error = self.message_handler.system_error,
user_error = self.message_handler.user_error,
)
if 'data' in retval['statistics']:
stats.update(retval['statistics']['data'], self.io_statistics)
else:
logger.warn("cannot find 'data' entry on returned stats, " \
"therefore not appending I/O info either")
retval['statistics']['data'] = self.message_handler.statistics
stats.update(retval['statistics']['data'], self.io_statistics)
algorithm_container.rm()
self.host.rm(algorithm_container)
if databases_container is not None:
retval['stdout'] += '\n' + databases_container.stdout
retval['stderr'] += '\n' + databases_container.stderr
databases_container.rm()
retval['stdout'] += '\n' + self.host.stdout(databases_container)
retval['stderr'] += '\n' + self.host.stderr(databases_container)
self.host.rm(databases_container)
self.message_handler.destroy()
self.message_handler = None
......
......@@ -45,7 +45,7 @@ class ProxyMessageHandler(MessageHandler):
Add support for output-related messages.
'''
def __init__(self, input_list, output_list, host_address):
def __init__(self, input_list, output_list, host_address, kill_callback=None):
# Starts our 0MQ server
self.context = zmq.Context()
......@@ -56,7 +56,8 @@ class ProxyMessageHandler(MessageHandler):
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
super(ProxyMessageHandler, self).__init__(input_list, self.context, self.socket)
super(ProxyMessageHandler, self).__init__(input_list, self.context, self.socket,
kill_callback=kill_callback)
self.output_list = output_list
......
This diff is collapsed.
......@@ -270,8 +270,7 @@ class TestDockerExecution(TestExecution):
@classmethod
def setUpClass(cls):
cls.host = Host()
cls.host.setup(raise_on_errors=False)
cls.host = Host(raise_on_errors=False)
@classmethod
......@@ -285,6 +284,11 @@ class TestDockerExecution(TestExecution):
self.proxy_mode = True
def tearDown(self):
super(TestDockerExecution, self).tearDown()
self.host.teardown()
def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
database_cache, algorithm_cache):
return DockerExecutor(self.host, prefix, configuration, tmp_prefix,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment