Skip to content
Snippets Groups Projects
Commit 4be109b2 authored by Laurent COLBOIS's avatar Laurent COLBOIS
Browse files

[Dask] Add configuration to access multithreaded queue

parent 9b4716e5
No related branches found
No related tags found
1 merge request!94[hotfix] [Dask] Add configuration to access multithreaded queue
Pipeline #62007 passed
from dask.distributed import Client
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster, get_max_jobs
from bob.pipelines.distributed.sge_queues import QUEUE_MTH
min_jobs = 1
max_jobs = get_max_jobs(QUEUE_MTH)
cluster = SGEMultipleQueuesCluster(min_jobs=min_jobs, sge_job_spec=QUEUE_MTH)
cluster.scale(max_jobs)
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
cluster.adapt(
minimum=min_jobs,
maximum=max_jobs,
wait_count=5,
interval=10,
target_duration="10s",
)
dask_client = Client(cluster)
...@@ -291,10 +291,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): ...@@ -291,10 +291,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
def _get_worker_spec_options(self, job_spec): def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command.""" """Craft a dask worker_spec to be used in the qsub command."""
def _get_key_from_spec(spec, key): new_resource_spec = job_spec.get("resource_spec", "")
return spec[key] if key in spec else ""
new_resource_spec = _get_key_from_spec(job_spec, "resource_spec")
# IO_BIG # IO_BIG
new_resource_spec += ( new_resource_spec += (
...@@ -303,10 +300,10 @@ class SGEMultipleQueuesCluster(JobQueueCluster): ...@@ -303,10 +300,10 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
else "" else ""
) )
memory = _get_key_from_spec(job_spec, "memory")[:-1] memory = job_spec.get("memory", "")[:-1]
new_resource_spec += f"mem_free={memory}," new_resource_spec += f"mem_free={memory},"
queue = _get_key_from_spec(job_spec, "queue") queue = job_spec.get("queue", "")
if queue != "all.q": if queue != "all.q":
new_resource_spec += f"{queue}=TRUE" new_resource_spec += f"{queue}=TRUE"
...@@ -317,7 +314,8 @@ class SGEMultipleQueuesCluster(JobQueueCluster): ...@@ -317,7 +314,8 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
return { return {
"queue": queue, "queue": queue,
"project": self.project, "project": self.project,
"memory": _get_key_from_spec(job_spec, "memory"), "memory": job_spec.get("memory", ""),
"job_extra": job_spec.get("job_extra", None),
"cores": 1, "cores": 1,
"processes": 1, "processes": 1,
"log_directory": self.log_directory, "log_directory": self.log_directory,
...@@ -326,7 +324,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): ...@@ -326,7 +324,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
"interface": None, "interface": None,
"protocol": self.protocol, "protocol": self.protocol,
"security": None, "security": None,
"resources": _get_key_from_spec(job_spec, "resources"), "resources": job_spec.get("resources", ""),
"env_extra": self.env_extra, "env_extra": self.env_extra,
} }
......
...@@ -44,6 +44,18 @@ QUEUE_DEFAULT = { ...@@ -44,6 +44,18 @@ QUEUE_DEFAULT = {
}, },
} }
QUEUE_MTH = {
"default": {
"queue": "q_1day_mth",
"memory": "8GB",
"io_big": False,
"job_extra": ["-pe pe_mth 2"],
"resource_spec": "",
"max_jobs": 70,
"resources": {"default": 1},
},
}
QUEUE_IOBIG = { QUEUE_IOBIG = {
"default": { "default": {
"queue": "q_1day", "queue": "q_1day",
......
...@@ -44,6 +44,7 @@ setup( ...@@ -44,6 +44,7 @@ setup(
"local-p16 = bob.pipelines.config.distributed.local_p16:dask_client", "local-p16 = bob.pipelines.config.distributed.local_p16:dask_client",
"local-p32 = bob.pipelines.config.distributed.local_p32:dask_client", "local-p32 = bob.pipelines.config.distributed.local_p32:dask_client",
"sge = bob.pipelines.config.distributed.sge_default:dask_client", "sge = bob.pipelines.config.distributed.sge_default:dask_client",
"sge-demanding = bob.pipelines.config.distributed.sge_demanding:dask_client",
"sge-io-big = bob.pipelines.config.distributed.sge_io_big:dask_client", "sge-io-big = bob.pipelines.config.distributed.sge_io_big:dask_client",
"sge-io-big-non-adaptive = bob.pipelines.config.distributed.sge_io_big_non_adaptive:dask_client", "sge-io-big-non-adaptive = bob.pipelines.config.distributed.sge_io_big_non_adaptive:dask_client",
"sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client", "sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment