Skip to content
Snippets Groups Projects
Commit 3aeb8010 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

[sphinx] Updated the documentation with the new API

parent b254c180
No related branches found
No related tags found
1 merge request!51[dask][sge] Multiqueue updates
Pipeline #46022 passed
......@@ -49,7 +49,7 @@ The code below is an example. Especially lines 59-63 where we convert such pipel
.. literalinclude:: ./python/pipeline_example_dask.py
:linenos:
:emphasize-lines: 59-63
:emphasize-lines: 48-53
Such code generates the following graph.
......@@ -125,13 +125,13 @@ SGE queue specs are defined in python dictionary as in the example below, where,
... "max_jobs": 48,
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "q_short_gpu": {
... "queue": "q_short_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": {"GPU":1},
... "resources": {"q_short_gpu":1},
... },
... }
......@@ -157,38 +157,11 @@ Running estimator operations in specific SGE queues
---------------------------------------------------
Sometimes it's necessary to run parts of a :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` in specific SGE queues (e.g. q_1day IO_BIG or q_gpu).
The example below shows how this is approached (lines 78 to 88).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_gpu`
The example below shows how this is approached (lines 52 to 57).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_short_gpu`
.. literalinclude:: ./python/pipeline_example_dask_sge.py
:linenos:
:emphasize-lines: 78-88
Adaptive SGE: Scale up/down SGE cluster according to the graph complexity
-------------------------------------------------------------------------
One note about the code from the last section.
Every time` cluster.scale` is executed to increase the amount of available SGE jobs to run a :doc:`dask graph <graphs>`, such resources will be available until the end of its execution.
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :any`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
.. code:: python
>>> cluster.adapt(minimum=2, maximum=10)
The code below shows the same example, but with adaptive cluster.
Note line 83
.. literalinclude:: ./python/pipeline_example_dask_sge_adaptive.py
:linenos:
:emphasize-lines: 83
:emphasize-lines: 52-57
......@@ -4,21 +4,17 @@ from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -26,9 +22,8 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
......@@ -39,30 +34,27 @@ class MyFitTranformer(TransformerMixin, BaseEstimator):
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "~/dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
)
# Create a dask graph from a pipeline
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5)
# Run the task graph in the local computer in a single tread
X_transformed = dasked_pipeline.fit_transform(X_as_sample).compute(
scheduler="single-threaded"
)
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")
import shutil
shutil.rmtree(model_path)
import time
import numpy
from dask.distributed import Client
......@@ -8,21 +6,18 @@ from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -30,73 +25,43 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "~/dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
cluster.scale(1, sge_job_spec_key="q_gpu") # Submitting 1 job in the q_gpu queue
cluster.scale(10) # Submitting 9 jobs in the q_1day queue
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
# Run the task graph in the local computer in a single tread
X_transformed = pipeline.fit_transform(X_as_sample).compute(scheduler=client)
import shutil
client.shutdown()
shutil.rmtree(model_path)
import time
import numpy
from dask.distributed import Client
......@@ -8,21 +6,18 @@ from sklearn.base import TransformerMixin
from sklearn.pipeline import make_pipeline
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.mixins import CheckpointMixin
from bob.pipelines.mixins import SampleMixin
from bob.pipelines.mixins import estimator_dask_it
from bob.pipelines.mixins import mix_me_up
from bob.pipelines.sample import Sample
import bob.pipelines
import os
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
return X
def fit(self, X):
def fit(self, X, y=None):
pass
def _more_tags(self):
......@@ -30,73 +25,48 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
def __init__(self):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
model_path = "~/dask_tmp"
os.makedirs(model_path, exist_ok=True)
pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
# Wrapping with sample, checkpoint and dask
# NOTE that pipeline.fit will run in `q_short_gpu`
pipeline = bob.pipelines.wrap(
["sample", "checkpoint", "dask"],
pipeline,
model_path=model_path,
transform_extra_arguments=(("metadata", "metadata"),),
fit_tag="q_short_gpu",
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
cluster.scale(1) # Submitting 1 job in the q_gpu queue
cluster.adapt(minimum=1, maximum=10)
# Creating my cluster obj.
cluster = SGEMultipleQueuesCluster()
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
# Run the task graph in the local computer in a single tread
# NOTE THAT resources is set in .compute
X_transformed = pipeline.fit_transform(X_as_sample).compute(
scheduler=client, resources=cluster.get_sge_resources()
)
import shutil
client.shutdown()
shutil.rmtree(model_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment