Skip to content
Snippets Groups Projects

Fixed multiqueue

Merged Tiago de Freitas Pereira requested to merge multi into master
1 unresolved thread
Files
4
@@ -107,6 +107,34 @@ def get_max_jobs(queue_dict):
)
def get_resource_requirements(pipeline):
"""
Get the resource requirements to execute a graph.
This is useful when it's necessary get the dictionary mapping the dask delayed keys with
specific resource restrictions.
Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
Parameters
----------
pipeline: :py:class`sklearn.pipeline.Pipeline`
A :py:class`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper`
Example
-------
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
>>> from bob.pipelines.sge import get_resource_requirements # doctest: +SKIP
>>> resources = get_resource_requirements(pipeline) # doctest: +SKIP
>>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
"""
resources = dict()
for s in pipeline:
if hasattr(s, "resource_tags"):
resources.update(s.resource_tags)
return resources
Please register or sign in to reply
class SGEMultipleQueuesCluster(JobQueueCluster):
"""Launch Dask jobs in the SGE cluster allowing the request of multiple
queues.
@@ -267,28 +295,6 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
def get_sge_resources(self):
"""
Get the available resources from `SGEMultipleQueuesCluster.sge_job_spec`.
This is useful when it's necessary to set the resource available for
`.compute` method.
Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
Example
-------
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
>>> resources = cluster.get_sge_resources() # doctest: +SKIP
>>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
"""
resources = [
list(self.sge_job_spec[k]["resources"].items())[0]
for k in self.sge_job_spec
if self.sge_job_spec[k]["resources"] != ""
]
return dict(resources)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
@@ -475,7 +481,7 @@ class SchedulerResourceRestriction(Scheduler):
allowed_failures=100
if rc.get("bob.pipelines.sge.allowed_failures") is None
else rc.get("bob.pipelines.sge.allowed_failures"),
synchronize_worker_interval="20s",
synchronize_worker_interval="10s",
*args,
**kwargs,
)
Loading