diff --git a/bob/pipelines/config/distributed/sge_demanding.py b/bob/pipelines/config/distributed/sge_demanding.py new file mode 100644 index 0000000000000000000000000000000000000000..f9c92ef1cc1a48390f48e543640088e2e04b4f94 --- /dev/null +++ b/bob/pipelines/config/distributed/sge_demanding.py @@ -0,0 +1,23 @@ +from dask.distributed import Client + +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster, get_max_jobs +from bob.pipelines.distributed.sge_queues import QUEUE_MTH + +min_jobs = 1 +max_jobs = get_max_jobs(QUEUE_MTH) +cluster = SGEMultipleQueuesCluster(min_jobs=min_jobs, sge_job_spec=QUEUE_MTH) +cluster.scale(max_jobs) + +# Adapting to minimim 1 job to maximum 48 jobs +# interval: Milliseconds between checks from the scheduler +# wait_count: Number of consecutive times that a worker should be suggested for +# removal before we remove it. +cluster.adapt( + minimum=min_jobs, + maximum=max_jobs, + wait_count=5, + interval=10, + target_duration="10s", +) + +dask_client = Client(cluster) diff --git a/bob/pipelines/distributed/sge.py b/bob/pipelines/distributed/sge.py index f02796069ad7e34e929be019bc50c8db90bd17d1..1b06eae79490485f36689d38a6e944e86528b03e 100644 --- a/bob/pipelines/distributed/sge.py +++ b/bob/pipelines/distributed/sge.py @@ -291,10 +291,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): def _get_worker_spec_options(self, job_spec): """Craft a dask worker_spec to be used in the qsub command.""" - def _get_key_from_spec(spec, key): - return spec[key] if key in spec else "" - - new_resource_spec = _get_key_from_spec(job_spec, "resource_spec") + new_resource_spec = job_spec.get("resource_spec", "") # IO_BIG new_resource_spec += ( @@ -303,10 +300,10 @@ class SGEMultipleQueuesCluster(JobQueueCluster): else "" ) - memory = _get_key_from_spec(job_spec, "memory")[:-1] + memory = job_spec.get("memory", "")[:-1] new_resource_spec += f"mem_free={memory}," - queue = _get_key_from_spec(job_spec, "queue") + queue = job_spec.get("queue", "") if queue != "all.q": new_resource_spec += f"{queue}=TRUE" @@ -317,7 +314,8 @@ class SGEMultipleQueuesCluster(JobQueueCluster): return { "queue": queue, "project": self.project, - "memory": _get_key_from_spec(job_spec, "memory"), + "memory": job_spec.get("memory", ""), + "job_extra": job_spec.get("job_extra", None), "cores": 1, "processes": 1, "log_directory": self.log_directory, @@ -326,7 +324,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster): "interface": None, "protocol": self.protocol, "security": None, - "resources": _get_key_from_spec(job_spec, "resources"), + "resources": job_spec.get("resources", ""), "env_extra": self.env_extra, } diff --git a/bob/pipelines/distributed/sge_queues.py b/bob/pipelines/distributed/sge_queues.py index fbc7f6b7b730d48bf6a55ab0bd1eb2347fc0bca8..7b13d379ca767bbd53de17ad8c8d3fcbbb965dd8 100644 --- a/bob/pipelines/distributed/sge_queues.py +++ b/bob/pipelines/distributed/sge_queues.py @@ -44,6 +44,48 @@ QUEUE_DEFAULT = { }, } +QUEUE_MTH = { + "default": { + "queue": "q_1day_mth", + "memory": "8GB", + "io_big": False, + "job_extra": ["-pe pe_mth 2"], + "resource_spec": "", + "max_jobs": 70, + "resources": {"default": 1}, + }, + "q_1week": { + "queue": "q_1week", + "memory": "4GB", + "io_big": True, + "resource_spec": "", + "max_jobs": 24, + "resources": {"q_1week": 1}, + }, + "q_long_gpu": { + "queue": "q_long_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_long_gpu": 1}, + }, + "q_gpu": { + "queue": "q_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_gpu": 1}, + }, + "q_short_gpu": { + "queue": "q_short_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "max_jobs": 45, + "resources": {"q_short_gpu": 1}, + }, +} + QUEUE_IOBIG = { "default": { "queue": "q_1day", diff --git a/setup.py b/setup.py index 80597926bdeabc24c5d2591b8302d1421417dad9..69393010ff8479e73b9cd633dd85ccd37e3d7e3e 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ setup( "local-p16 = bob.pipelines.config.distributed.local_p16:dask_client", "local-p32 = bob.pipelines.config.distributed.local_p32:dask_client", "sge = bob.pipelines.config.distributed.sge_default:dask_client", + "sge-demanding = bob.pipelines.config.distributed.sge_demanding:dask_client", "sge-io-big = bob.pipelines.config.distributed.sge_io_big:dask_client", "sge-io-big-non-adaptive = bob.pipelines.config.distributed.sge_io_big_non_adaptive:dask_client", "sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client",