Skip to content
Snippets Groups Projects
Commit 5cbfa61f authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Merge branch 'sge-demanding' into 'master'

[hotfix] [Dask] Add configuration to access multithreaded queue

See merge request !94
parents 3a4fae54 d37efc3e
No related branches found
No related tags found
1 merge request!94[hotfix] [Dask] Add configuration to access multithreaded queue
Pipeline #62113 passed
from dask.distributed import Client
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster, get_max_jobs
from bob.pipelines.distributed.sge_queues import QUEUE_MTH
min_jobs = 1
max_jobs = get_max_jobs(QUEUE_MTH)
cluster = SGEMultipleQueuesCluster(min_jobs=min_jobs, sge_job_spec=QUEUE_MTH)
cluster.scale(max_jobs)
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
cluster.adapt(
minimum=min_jobs,
maximum=max_jobs,
wait_count=5,
interval=10,
target_duration="10s",
)
dask_client = Client(cluster)
......@@ -291,10 +291,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
def _get_key_from_spec(spec, key):
return spec[key] if key in spec else ""
new_resource_spec = _get_key_from_spec(job_spec, "resource_spec")
new_resource_spec = job_spec.get("resource_spec", "")
# IO_BIG
new_resource_spec += (
......@@ -303,10 +300,10 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
else ""
)
memory = _get_key_from_spec(job_spec, "memory")[:-1]
memory = job_spec.get("memory", "")[:-1]
new_resource_spec += f"mem_free={memory},"
queue = _get_key_from_spec(job_spec, "queue")
queue = job_spec.get("queue", "")
if queue != "all.q":
new_resource_spec += f"{queue}=TRUE"
......@@ -317,7 +314,8 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
return {
"queue": queue,
"project": self.project,
"memory": _get_key_from_spec(job_spec, "memory"),
"memory": job_spec.get("memory", ""),
"job_extra": job_spec.get("job_extra", None),
"cores": 1,
"processes": 1,
"log_directory": self.log_directory,
......@@ -326,7 +324,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
"interface": None,
"protocol": self.protocol,
"security": None,
"resources": _get_key_from_spec(job_spec, "resources"),
"resources": job_spec.get("resources", ""),
"env_extra": self.env_extra,
}
......
......@@ -44,6 +44,48 @@ QUEUE_DEFAULT = {
},
}
QUEUE_MTH = {
"default": {
"queue": "q_1day_mth",
"memory": "8GB",
"io_big": False,
"job_extra": ["-pe pe_mth 2"],
"resource_spec": "",
"max_jobs": 70,
"resources": {"default": 1},
},
"q_1week": {
"queue": "q_1week",
"memory": "4GB",
"io_big": True,
"resource_spec": "",
"max_jobs": 24,
"resources": {"q_1week": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_gpu": 1},
},
"q_short_gpu": {
"queue": "q_short_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
}
QUEUE_IOBIG = {
"default": {
"queue": "q_1day",
......
......@@ -44,6 +44,7 @@ setup(
"local-p16 = bob.pipelines.config.distributed.local_p16:dask_client",
"local-p32 = bob.pipelines.config.distributed.local_p32:dask_client",
"sge = bob.pipelines.config.distributed.sge_default:dask_client",
"sge-demanding = bob.pipelines.config.distributed.sge_demanding:dask_client",
"sge-io-big = bob.pipelines.config.distributed.sge_io_big:dask_client",
"sge-io-big-non-adaptive = bob.pipelines.config.distributed.sge_io_big_non_adaptive:dask_client",
"sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment