Commit 85f2ddf8 authored by Flavio TARSETTI's avatar Flavio TARSETTI
Browse files

Merge branch '94_improve_complex_setup_handling' into 'master'

improve complex setup handling

See merge request !111
parents 83d806eb 24b4b8dc
Pipeline #37580 passed with stages
in 17 minutes and 35 seconds
...@@ -41,7 +41,8 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide. ...@@ -41,7 +41,8 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide.
Usage: Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>] %(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] [--docker-network=<name>] [--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <broker_address> [--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
<broker_address>
%(prog)s (--help | -h) %(prog)s (--help | -h)
%(prog)s (--version | -V) %(prog)s (--version | -V)
...@@ -57,6 +58,7 @@ Options: ...@@ -57,6 +58,7 @@ Options:
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache' -c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use --docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers --port-range=<range> Range of port usable for communication with containers
--cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
""" """
...@@ -107,6 +109,7 @@ def run( ...@@ -107,6 +109,7 @@ def run(
docker_network_name=None, docker_network_name=None,
docker_port_range=None, docker_port_range=None,
docker_images_cache=None, docker_images_cache=None,
docker_cache_mount_point=None,
): ):
"""Start the worker """Start the worker
...@@ -202,6 +205,8 @@ def run( ...@@ -202,6 +205,8 @@ def run(
data["network_name"] = docker_network_name data["network_name"] = docker_network_name
if docker_port_range: if docker_port_range:
data["port_range"] = docker_port_range data["port_range"] = docker_port_range
if docker_cache_mount_point:
data["cache_mount_point"] = docker_cache_mount_point
# Start the execution # Start the execution
logger.info("Running '%s' with job id #%s", data["algorithm"], job_id) logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)
...@@ -319,6 +324,14 @@ def main(argv=None): ...@@ -319,6 +324,14 @@ def main(argv=None):
return 1 return 1
logger.info("Using port range %s", docker_port_range) logger.info("Using port range %s", docker_port_range)
docker_cache_mount_point = args.get("--cache-mount-point", None)
if docker_cache_mount_point:
if not docker_cache_mount_point.startswith("nfs://"):
raise RuntimeError(
"Invalid nfs mount point: {}".format(docker_cache_mount_point)
)
logger.info("Using volume cache mount point %s", docker_cache_mount_point)
return run( return run(
broker_address, broker_address,
service_name=args.get("--name"), service_name=args.get("--name"),
...@@ -329,6 +342,7 @@ def main(argv=None): ...@@ -329,6 +342,7 @@ def main(argv=None):
docker_network_name=docker_network_name, docker_network_name=docker_network_name,
docker_port_range=docker_port_range, docker_port_range=docker_port_range,
docker_images_cache=docker_images_cache, docker_images_cache=docker_images_cache,
docker_cache_mount_point=docker_cache_mount_point,
) )
......
...@@ -279,7 +279,9 @@ class DockerExecutor(RemoteExecutor): ...@@ -279,7 +279,9 @@ class DockerExecutor(RemoteExecutor):
) )
return retval return retval
def __setup_io_volumes(self, algorithm_container, configuration): def __setup_io_volumes(
self, algorithm_container, docker_cache_mount_point, configuration
):
"""Setup all the volumes for input and output files. """Setup all the volumes for input and output files.
Parameters: Parameters:
...@@ -293,6 +295,7 @@ class DockerExecutor(RemoteExecutor): ...@@ -293,6 +295,7 @@ class DockerExecutor(RemoteExecutor):
file_path = item["path"] file_path = item["path"]
source_path = os.path.join(self.cache, file_path) source_path = os.path.join(self.cache, file_path)
if docker_cache_mount_point is None:
if os.path.isfile(source_path): if os.path.isfile(source_path):
algorithm_container.add_volume( algorithm_container.add_volume(
source_path, os.path.join(self.CONTAINER_CACHE_PATH, file_path) source_path, os.path.join(self.CONTAINER_CACHE_PATH, file_path)
...@@ -306,12 +309,21 @@ class DockerExecutor(RemoteExecutor): ...@@ -306,12 +309,21 @@ class DockerExecutor(RemoteExecutor):
self.CONTAINER_CACHE_PATH, target_path self.CONTAINER_CACHE_PATH, target_path
) )
algorithm_container.add_volume(file_, cache_path) algorithm_container.add_volume(file_, cache_path)
else:
input_folder = file_path[: file_path.rfind("/")]
source_folder = os.path.join(docker_cache_mount_point, input_folder)
target_folder = os.path.join(self.CONTAINER_CACHE_PATH, input_folder)
algorithm_container.add_volume(source_folder, target_folder)
def __add_writable_volume(file_path): def __add_writable_volume(file_path):
output_folder = file_path[: file_path.rfind("/")] output_folder = file_path[: file_path.rfind("/")]
source_folder = os.path.join(self.cache, output_folder) source_folder = os.path.join(self.cache, output_folder)
if not os.path.exists(source_folder): if not os.path.exists(source_folder):
os.makedirs(source_folder) os.makedirs(source_folder)
if docker_cache_mount_point is not None:
source_folder = os.path.join(docker_cache_mount_point, output_folder)
algorithm_container.add_volume( algorithm_container.add_volume(
source_folder, source_folder,
os.path.join(self.CONTAINER_CACHE_PATH, output_folder), os.path.join(self.CONTAINER_CACHE_PATH, output_folder),
...@@ -405,6 +417,8 @@ class DockerExecutor(RemoteExecutor): ...@@ -405,6 +417,8 @@ class DockerExecutor(RemoteExecutor):
port = utils.find_free_port_in_range(int(min_port), int(max_port)) port = utils.find_free_port_in_range(int(min_port), int(max_port))
address += ":{}".format(port) address += ":{}".format(port)
volume_cache_mount_point = self.data.pop("cache_mount_point", None)
self.message_handler = MessageHandler(address, kill_callback=_kill) self.message_handler = MessageHandler(address, kill_callback=_kill)
# ----- (If necessary) Instantiate the docker container that provide the databases # ----- (If necessary) Instantiate the docker container that provide the databases
...@@ -463,7 +477,9 @@ class DockerExecutor(RemoteExecutor): ...@@ -463,7 +477,9 @@ class DockerExecutor(RemoteExecutor):
loop_algorithm_container.add_volume( loop_algorithm_container.add_volume(
configuration_path, self.CONTAINER_PREFIX_PATH configuration_path, self.CONTAINER_PREFIX_PATH
) )
self.__setup_io_volumes(loop_algorithm_container, self.data["loop"]) self.__setup_io_volumes(
loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
)
# Start the container # Start the container
self.host.start( self.host.start(
...@@ -505,7 +521,9 @@ class DockerExecutor(RemoteExecutor): ...@@ -505,7 +521,9 @@ class DockerExecutor(RemoteExecutor):
# Volumes # Volumes
algorithm_container.add_volume(configuration_path, self.CONTAINER_PREFIX_PATH) algorithm_container.add_volume(configuration_path, self.CONTAINER_PREFIX_PATH)
self.__setup_io_volumes(algorithm_container, self.data) self.__setup_io_volumes(
algorithm_container, volume_cache_mount_point, self.data
)
# Start the container # Start the container
self.host.start( self.host.start(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment