[dask] Simplified the way multiqueue is set

parent 22b36d8c
......@@ -145,9 +145,9 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster # doctest: +SKIP
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP
>>> from dask.distributed import Client # doctest: +SKIP
>>> cluster = SGEIdiapCluster() # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -162,7 +162,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": "",
... }
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -186,7 +186,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": {"GPU":1},
... },
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -194,7 +194,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Adaptive job allocation can also be used via `AdaptiveIdiap` extension:
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
"""
......@@ -267,6 +267,28 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
def get_sge_resources(self):
"""
Get the available resources from `SGEMultipleQueuesCluster.sge_job_spec`.
This is useful when it's necessary to set the resource available for
`.compute` method.
Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
Example
-------
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
>>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
"""
resources = [
list(self.sge_job_spec[k]["resources"].items())[0]
for k in self.sge_job_spec
if self.sge_job_spec[k]["resources"] != ""
]
return dict(resources)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
......@@ -315,7 +337,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Quantity of jobs to scale
sge_job_spec_key: str
One of the specs `SGEIdiapCluster.sge_job_spec`
One of the specs `SGEMultipleQueuesCluster.sge_job_spec`
"""
if n_jobs == 0:
......@@ -452,7 +474,7 @@ class SchedulerResourceRestriction(Scheduler):
allowed_failures=100
if rc.get("bob.pipelines.sge.allowed_failures") is None
else rc.get("bob.pipelines.sge.allowed_failures"),
synchronize_worker_interval="60s",
synchronize_worker_interval="20s",
*args,
**kwargs,
)
......
......@@ -20,13 +20,12 @@ QUEUE_DEFAULT = {
"max_jobs": 24,
"resources": {"q_1week": 1},
},
"q_short_gpu": {
"queue": "q_short_gpu",
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
"resources": {"q_long_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
......@@ -35,20 +34,15 @@ QUEUE_DEFAULT = {
"resource_spec": "",
"resources": {"q_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"q_short_gpu": {
"queue": "q_short_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
}
"""This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big This HAS to be the
default
"""
QUEUE_GPU = {
"default": {
......@@ -57,7 +51,7 @@ QUEUE_GPU = {
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
"resources": "",
},
"q_1day": {
"queue": "q_1day",
......@@ -82,6 +76,13 @@ QUEUE_GPU = {
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
"memory": "30GB",
......@@ -97,6 +98,3 @@ QUEUE_GPU = {
"resources": {"q_long_gpu": 1},
},
}
"""
This queue setup uses the q_short_gpu queue of the SGE.
"""
......@@ -138,8 +138,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
if isinstance(samples[0], SampleSet):
return [
SampleSet(
self._samples_transform(sset.samples, method_name),
parent=sset,
self._samples_transform(sset.samples, method_name), parent=sset,
)
for sset in samples
]
......@@ -408,11 +407,7 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
"""
def __init__(
self,
estimator,
fit_tag=None,
transform_tag=None,
**kwargs,
self, estimator, fit_tag=None, transform_tag=None, **kwargs,
):
super().__init__(**kwargs)
self.estimator = estimator
......@@ -421,6 +416,9 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
self.fit_tag = fit_tag
self.transform_tag = transform_tag
def _make_dask_resource_tag(self, tag):
return [(1, tag)]
def _dask_transform(self, X, method_name):
graph_name = f"{_frmt(self)}.{method_name}"
logger.debug(graph_name)
......@@ -432,7 +430,9 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
_transf.__name__ = graph_name
map_partitions = X.map_partitions(_transf, self._dask_state)
if self.transform_tag is not None:
self.resource_tags[map_partitions] = self.transform_tag
self.resource_tags[map_partitions] = self._make_dask_resource_tag(
self.transform_tag
)
return map_partitions
......@@ -464,12 +464,12 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
# change the name to have a better name in dask graphs
_fit.__name__ = f"{_frmt(self)}.fit"
self._dask_state = delayed(_fit)(
X,
y,
)
self._dask_state = delayed(_fit)(X, y,)
if self.fit_tag is not None:
self.resource_tags[self._dask_state] = self.fit_tag
self.resource_tags[self._dask_state] = self._make_dask_resource_tag(
self.fit_tag
)
return self
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment