Commit 4b33d18a authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Fixed multiqueue

parent f9ec8e30
Pipeline #46148 passed with stage
in 4 minutes and 5 seconds
......@@ -116,8 +116,8 @@ def get_resource_requirements(pipeline):
Parameters
----------
pipeline: :py:class`sklearn.pipeline.Pipeline`
A :py:class`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper`
pipeline: :any:`sklearn.pipeline.Pipeline`
A :any:`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper`
Example
-------
......@@ -295,7 +295,6 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
......
......@@ -10,7 +10,7 @@ QUEUE_DEFAULT = {
"io_big": False,
"resource_spec": "",
"max_jobs": 96,
"resources": "",
"resources": {"default": 1},
},
"q_1week": {
"queue": "q_1week",
......
......@@ -138,8 +138,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
if isinstance(samples[0], SampleSet):
return [
SampleSet(
self._samples_transform(sset.samples, method_name),
parent=sset,
self._samples_transform(sset.samples, method_name), parent=sset,
)
for sset in samples
]
......@@ -416,11 +415,7 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
"""
def __init__(
self,
estimator,
fit_tag=None,
transform_tag=None,
**kwargs,
self, estimator, fit_tag=None, transform_tag=None, **kwargs,
):
super().__init__(**kwargs)
self.estimator = estimator
......@@ -483,15 +478,16 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
# change the name to have a better name in dask graphs
_fit.__name__ = f"{_frmt(self)}.fit"
self._dask_state = delayed(_fit)(
X,
y,
)
self._dask_state = delayed(_fit)(X, y)
if self.fit_tag is not None:
self.resource_tags[self._dask_state.key] = self._make_dask_resource_tag(
self.fit_tag
)
from dask import core
# If you do `delayed(_fit)(X, y)`, two tasks are generated;
# the `finlize-TASK` and `TASK`. With this, we make sure
# that the two are annotated
self.resource_tags[
tuple([f"{k}{str(self._dask_state.key)}" for k in ["", "finalize-"]])
] = self._make_dask_resource_tag(self.fit_tag)
return self
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment