Commit 99fa97c0 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Merge branch 'multiqueue' into 'master'

[dask][sge] Multiqueue updates

See merge request !51
parents 22b36d8c a201b15e
Pipeline #46086 passed with stages
in 4 minutes and 57 seconds
......@@ -145,9 +145,9 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster # doctest: +SKIP
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP
>>> from dask.distributed import Client # doctest: +SKIP
>>> cluster = SGEIdiapCluster() # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -162,7 +162,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": "",
... }
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -186,7 +186,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
... "resources": {"GPU":1},
... },
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
......@@ -194,7 +194,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Adaptive job allocation can also be used via `AdaptiveIdiap` extension:
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
"""
......@@ -267,6 +267,29 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
def get_sge_resources(self):
"""
Get the available resources from `SGEMultipleQueuesCluster.sge_job_spec`.
This is useful when it's necessary to set the resource available for
`.compute` method.
Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
Example
-------
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
>>> resources = cluster.get_sge_resources() # doctest: +SKIP
>>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
"""
resources = [
list(self.sge_job_spec[k]["resources"].items())[0]
for k in self.sge_job_spec
if self.sge_job_spec[k]["resources"] != ""
]
return dict(resources)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
......@@ -315,7 +338,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Quantity of jobs to scale
sge_job_spec_key: str
One of the specs `SGEIdiapCluster.sge_job_spec`
One of the specs `SGEMultipleQueuesCluster.sge_job_spec`
"""
if n_jobs == 0:
......@@ -452,7 +475,7 @@ class SchedulerResourceRestriction(Scheduler):
allowed_failures=100
if rc.get("bob.pipelines.sge.allowed_failures") is None
else rc.get("bob.pipelines.sge.allowed_failures"),
synchronize_worker_interval="60s",
synchronize_worker_interval="20s",
*args,
**kwargs,
)
......
......@@ -20,13 +20,12 @@ QUEUE_DEFAULT = {
"max_jobs": 24,
"resources": {"q_1week": 1},
},
"q_short_gpu": {
"queue": "q_short_gpu",
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
"resources": {"q_long_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
......@@ -35,20 +34,15 @@ QUEUE_DEFAULT = {
"resource_spec": "",
"resources": {"q_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"q_short_gpu": {
"queue": "q_short_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
}
"""This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big This HAS to be the
default
"""
QUEUE_GPU = {
"default": {
......@@ -57,7 +51,7 @@ QUEUE_GPU = {
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
"resources": "",
},
"q_1day": {
"queue": "q_1day",
......@@ -82,6 +76,13 @@ QUEUE_GPU = {
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
"memory": "30GB",
......@@ -97,6 +98,3 @@ QUEUE_GPU = {
"resources": {"q_long_gpu": 1},
},
}
"""
This queue setup uses the q_short_gpu queue of the SGE.
"""
......@@ -138,8 +138,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
if isinstance(samples[0], SampleSet):
return [
SampleSet(
self._samples_transform(sset.samples, method_name),
parent=sset,
self._samples_transform(sset.samples, method_name), parent=sset,
)
for sset in samples
]
......@@ -408,11 +407,7 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
"""
def __init__(
self,
estimator,
fit_tag=None,
transform_tag=None,
**kwargs,
self, estimator, fit_tag=None, transform_tag=None, **kwargs,
):
super().__init__(**kwargs)
self.estimator = estimator
......@@ -421,6 +416,9 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
self.fit_tag = fit_tag
self.transform_tag = transform_tag
def _make_dask_resource_tag(self, tag):
return [(1, tag)]
def _dask_transform(self, X, method_name):
graph_name = f"{_frmt(self)}.{method_name}"
logger.debug(graph_name)
......@@ -432,7 +430,9 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
_transf.__name__ = graph_name
map_partitions = X.map_partitions(_transf, self._dask_state)
if self.transform_tag is not None:
self.resource_tags[map_partitions] = self.transform_tag
self.resource_tags[map_partitions] = self._make_dask_resource_tag(
self.transform_tag
)
return map_partitions
......@@ -464,12 +464,12 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
# change the name to have a better name in dask graphs
_fit.__name__ = f"{_frmt(self)}.fit"
self._dask_state = delayed(_fit)(
X,
y,
)
self._dask_state = delayed(_fit)(X, y,)
if self.fit_tag is not None:
self.resource_tags[self._dask_state] = self.fit_tag
self.resource_tags[self._dask_state] = self._make_dask_resource_tag(
self.fit_tag
)
return self
......
......@@ -49,7 +49,7 @@ The code below is an example. Especially lines 59-63 where we convert such pipel
.. literalinclude:: ./python/pipeline_example_dask.py
:linenos:
:emphasize-lines: 59-63
:emphasize-lines: 48-53
Such code generates the following graph.
......@@ -125,13 +125,13 @@ SGE queue specs are defined in python dictionary as in the example below, where,
... "max_jobs": 48,
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "q_short_gpu": {
... "queue": "q_short_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": {"GPU":1},
... "resources": {"q_short_gpu":1},
... },
... }
......@@ -157,38 +157,11 @@ Running estimator operations in specific SGE queues
---------------------------------------------------
Sometimes it's necessary to run parts of a :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` in specific SGE queues (e.g. q_1day IO_BIG or q_gpu).
The example below shows how this is approached (lines 78 to 88).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_gpu`
The example below shows how this is approached (lines 52 to 57).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_short_gpu`
.. literalinclude:: ./python/pipeline_example_dask_sge.py
:linenos:
:emphasize-lines: 78-88
Adaptive SGE: Scale up/down SGE cluster according to the graph complexity
-------------------------------------------------------------------------
One note about the code from the last section.
Every time` cluster.scale` is executed to increase the amount of available SGE jobs to run a :doc:`dask graph <graphs>`, such resources will be available until the end of its execution.
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :any`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
.. code:: python
>>> cluster.adapt(minimum=2, maximum=10)
The code below shows the same example, but with adaptive cluster.
Note line 83
.. literalinclude:: ./python/pipeline_example_dask_sge_adaptive.py
:linenos:
:emphasize-lines: 83
:emphasize-lines: 52-57
......@@ -4,21 +4,17 @@ from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -26,11 +22,10 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
def transform(self, X, metadata=None):
# Transform `X`
return [x @ self._fit_model for x in X]
......@@ -39,30 +34,29 @@ class MyFitTranformer(TransformerMixin, BaseEstimator):
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=os.path.join(model_path, "model.pickle"),
features_dir=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
)
# Create a dask graph from a pipeline
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5)
# Run the task graph in the local computer in a single tread
X_transformed = dasked_pipeline.fit_transform(X_as_sample).compute(
scheduler="single-threaded"
)
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")
import shutil
shutil.rmtree(model_path)
import time
import numpy
from dask.distributed import Client
......@@ -8,21 +6,18 @@ from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -30,73 +25,45 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
def transform(self, X, metadata=None):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=os.path.join(model_path, "model.pickle"),
features_dir=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
cluster.scale(1, sge_job_spec_key="q_gpu") # Submitting 1 job in the q_gpu queue
cluster.scale(10) # Submitting 9 jobs in the q_1day queue
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
# Run the task graph in the local computer in a single tread
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler=client)
import shutil
client.shutdown()
shutil.rmtree(model_path)
import time
import numpy
from dask.distributed import Client
......@@ -8,21 +6,18 @@ from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -30,73 +25,48 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
def transform(self, X, metadata=None):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "./dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
# NOTE that pipeline.fit will run in `q_short_gpu`
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
fit_tag="q_short_gpu",
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
cluster.scale(1) # Submitting 1 job in the q_gpu queue
cluster.adapt(minimum=1, maximum=10)
# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
# Run the task graph in the local computer in a single tread
# NOTE THAT resources is set in .compute
X_transformed = pipeline.fit_transform(X_as_sample).compute(
scheduler=client, resources=cluster.get_sge_resources()
)
import shutil
client.shutdown()
shutil.rmtree(model_path)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment