Skip to content
Snippets Groups Projects

[dask][sge] Multiqueue updates

Merged Tiago de Freitas Pereira requested to merge multiqueue into master
7 files
+ 140
215
Compare changes
  • Side-by-side
  • Inline
Files
7
@@ -145,9 +145,9 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster # doctest: +SKIP
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP
>>> from dask.distributed import Client # doctest: +SKIP
>>> cluster = SGEIdiapCluster() # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
@@ -162,7 +162,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": "",
... }
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
@@ -186,7 +186,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": {"GPU":1},
... },
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
@@ -194,7 +194,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Adaptive job allocation can also be used via `AdaptiveIdiap` extension:
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
"""
@@ -267,6 +267,29 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
def get_sge_resources(self):
"""
Get the available resources from `SGEMultipleQueuesCluster.sge_job_spec`.
This is useful when it's necessary to set the resource available for
`.compute` method.
Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
Example
-------
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
>>> resources = cluster.get_sge_resources() # doctest: +SKIP
>>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
"""
resources = [
list(self.sge_job_spec[k]["resources"].items())[0]
for k in self.sge_job_spec
if self.sge_job_spec[k]["resources"] != ""
]
return dict(resources)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
@@ -315,7 +338,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Quantity of jobs to scale
sge_job_spec_key: str
One of the specs `SGEIdiapCluster.sge_job_spec`
One of the specs `SGEMultipleQueuesCluster.sge_job_spec`
"""
if n_jobs == 0:
@@ -452,7 +475,7 @@ class SchedulerResourceRestriction(Scheduler):
allowed_failures=100
if rc.get("bob.pipelines.sge.allowed_failures") is None
else rc.get("bob.pipelines.sge.allowed_failures"),
synchronize_worker_interval="60s",
synchronize_worker_interval="20s",
*args,
**kwargs,
)
Loading