Skip to content
Snippets Groups Projects
Commit 56b19bf0 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Finished dask documentation

parent 9eb960a2
No related branches found
No related tags found
1 merge request!22New User Guide
Pipeline #39106 passed
......@@ -63,7 +63,7 @@ Dask + Idiap SGE
----------------
Dask, allows the deployment and parallelization of graphs either locally or in complex job queuing systems, such as PBS, SGE....
This is achieved via the :doc:`Dask-Jobqueue <dask-jobqueue:index>`.
This is achieved via :doc:`Dask-Jobqueue <dask-jobqueue:index>`.
Below follow a nice video explaining what is the :doc:`Dask-Jobqueue <dask-jobqueue:index>`, some of its features and how to use it to run :doc:`dask graphs <graphs>`.
.. raw:: html
......@@ -71,7 +71,7 @@ Below follow a nice video explaining what is the :doc:`Dask-Jobqueue <dask-jobqu
<iframe width="560" height="315" src="https://www.youtube.com/embed/FXsgmwpRExM" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
The snippet below show how to deploy a the pipeline from the previous section in the Idiap SGE cluster
The snippet below shows how to deploy the exact same pipeline from the previous section in the Idiap SGE cluster
.. code:: python
......@@ -79,7 +79,7 @@ The snippet below show how to deploy a the pipeline from the previous section in
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster() # Creates the SGE launcher that launches jobs in the q_all
>>> cluster.scale_up(10) # Submite 10 jobs in q_all
>>> cluster.scale(10) # Submite 10 jobs in q_all
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
......@@ -108,22 +108,22 @@ SGE queue specs are defined in python dictionary as in the example below, where,
.. code:: python
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "resources": {"GPU":1},
... },
... }
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "resources": {"GPU":1},
... },
... }
Now that the queue specifications are set, let's trigger some jobs.
......@@ -133,9 +133,55 @@ Now that the queue specifications are set, let's trigger some jobs.
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale_up(10) # Submitting 10 jobs in the q_1day queue
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
>>> cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
>>> cluster.scale(10) # Submitting 9 jobs in the q_1day queue
>>> client = Client(cluster) # Creating the scheduler
.. note::
To check if the jobs were actually submitted always do `qstat`::
$ qstat
Running estimator operations in specific SGE queues
===================================================
Sometimes it's necessary to run parts of a :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` in specific SGE queues (e.g. q_1day IO_BIG or q_gpu).
The example below shows how this is approached (lines 78 to 88).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_gpu`
.. literalinclude:: ./python/pipeline_example_dask_sge.py
:linenos:
:emphasize-lines: 78-88
Adaptive SGE: Scale up/down SGE cluster according to the graph complexity
=========================================================================
One note about the code from the last section.
Every time` cluster.scale` is executed to increase the amount of available SGE jobs to run a :doc:`dask graph <graphs>`, such resources will be available until the end of its execution.
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :py:class`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
.. code:: python
>>> cluster.adapt(minimum=2, maximum=10)
The code below shows the same example, but with adaptive cluster.
Note line 83
.. literalinclude:: ./python/pipeline_example_dask_sge_adaptive.py
:linenos:
:emphasize-lines: 83
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
from bob.pipelines.mixins import estimator_dask_it
import numpy
import time
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
cluster.scale(10) # Submitting 9 jobs in the q_1day queue
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(X_as_sample) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
client.shutdown()
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
from bob.pipelines.mixins import estimator_dask_it
import numpy
import time
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
# Faking big processing
time.sleep(120)
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
)
# Setting up SGE
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu": 1},
},
}
from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
cluster.scale(1) # Submitting 1 job in the q_gpu queue
cluster.adapt(minimum=1, maximum=10)
client = Client(cluster) # Creating the scheduler
# Create a dask graph from a pipeline
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(X_as_sample) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
client.shutdown()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment