Moved forward with Dask documentation [sphinx]

parent 85f25752
Pipeline #39089 passed with stage
in 9 minutes and 31 seconds
......@@ -193,6 +193,7 @@ class SGEIdiapCluster(JobQueueCluster):
More than one jon spec can be set:
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
......
......@@ -51,8 +51,11 @@ The code below is a repetition of the example from :ref:`sample`, but now `MyTra
The keyword argument `features_dir` defined in lines 34 and 38 sets the absolute path where those samples will be saved
Checkpointing an statfull transformers
--------------------------------------
.. _checkpoint_statefull:
Checkpointing an statefull transformers
---------------------------------------
Statefull transformers, are transformers that implement the methods `fit` and `transform`.
Those can be checkpointed too as can be observed in the example below.
......
......@@ -5,10 +5,11 @@
========================================
"`Dask is <https://dask.org/>`_ a flexible library for parallel computing in Python.".
The purpose of this guide is not to describe how dask works, for that, go to its documentation.
"`Dask is <dask:index>`_ a flexible library for parallel computing in Python.".
The purpose of this guide is not to describe how dask works.
For that, go to its documentation.
Moreover, there are plenty of tutorials online.
For instance, `this official one <https://github.com/dask/dask-tutorial>`_, there is also a nice overview presented in `AnacondaCon 2018 <https://www.youtube.com/watch?v=tQBovBvSDvA>`_ and there's even one crafted for `Idiap <https://github.com/tiagofrepereira2012/tam-dask>`_.
For instance, `this official one <https://github.com/dask/dask-tutorial>`_; a nice overview was presented in `AnacondaCon 2018 <https://www.youtube.com/watch?v=tQBovBvSDvA>`_ and there's even one crafted for `Idiap <https://github.com/tiagofrepereira2012/tam-dask>`_.
The purpose of this guide is to describe:
......@@ -16,29 +17,125 @@ The purpose of this guide is to describe:
2. The specificities of `Dask` under the Idiap SGE
Dask + scikit learn pipelines
-----------------------------
From Scikit Learn pipelines to Dask Task Graphs
-----------------------------------------------
The purpose of :doc:`scikit learn pipelines <modules/generated/sklearn.pipeline.Pipeline>` is to assemble several :doc:`scikit estimators <modules/generated/sklearn.base.BaseEstimator>` in one final one.
Then, it is possible to use the methods `fit` and `transform` to create models and transform your data respectivelly.
An arbitrary scikit learn pipeline can be transformed in a `dask graph <https://docs.dask.org/en/latest/graphs.html>`_ to be further executed using the :py:class:`bob.pipelines.mixins.DaskEstimatorMixin` mixin.
This can be mixed with the :py:func:`bob.pipelines.mixins.estimator_dask_it` function.
Any :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` can be transformed in a :doc:`Dask Graph <graphs>` to be further executed by any :doc:`Dask Client <client>`.
This is carried out via the :py:func:`bob.pipelines.mixins.estimator_dask_it` function.
Such fuction does two things:
1. Edit the current :py:class:`sklearn.pipeline.Pipeline` by adding a new first step, where input samples are transformed in :doc:`Dask Bag <bag>`. This allows the usage of :py:func:`dask.bag.map` for further transformations.
This function does two things.
2. Mix all :doc:`estimators <modules/generated/sklearn.base.BaseEstimator>` in the pipeline with the :py:class:`bob.pipelines.mixins.DaskEstimatorMixin`. Such mixin is reponsible for the creation of the task graph for the methods `.fit` and `.transform`.
1. Edit the in input pipeline adding a new first step, where input samples are wrapped in `Dask Bags <https://docs.dask.org/en/latest/bag.html>`_
2. Create a Dask graph for each step in your pipeline
The code snippet below enables such feature for an arbitrary :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>`.
..todo ::
Provide code sample
.. code:: python
>>> from bob.pipelines.mixins import estimator_dask_it
>>> dask_pipeline = estimator_dask_it(make_pipeline(...)) # Create a dask graph
>>> dask_pipeline.fit_transform(....).compute() # Run the task graph using the default client
The code below is the same as the one presented in :ref:`checkpoint example <checkpoint_statefull>`.
However, lines 59-63 convert such pipeline in a :doc:`Dask Graph <graphs>` and runs it in a local computer.
.. literalinclude:: ./python/pipeline_example_dask.py
:linenos:
:emphasize-lines: 59-63
Such code generates the following graph.
.. figure:: python/dask_graph.png
This graph can be seem by running `http://localhost:8787` during its execution.
Dask + Idiap SGE
----------------
Dask, allows the deployment and parallelization of graphs either locally or in complex job queuing systems, such as PBS, SGE....
This is achieved via the :doc:`Dask-Jobqueue <dask-jobqueue:index>`.
Below follow a nice video explaining what is the :doc:`Dask-Jobqueue <dask-jobqueue:index>`, some of its features and how to use it to run :doc:`dask graphs <graphs>`.
.. raw:: html
<iframe width="560" height="315" src="https://www.youtube.com/embed/FXsgmwpRExM" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
The snippet below show how to deploy a the pipeline from the previous section in the Idiap SGE cluster
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster() # Creates the SGE launcher that launches jobs in the q_all
>>> cluster.scale_up(10) # Submite 10 jobs in q_all
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
That's it, you just run a scikit pipeline in the Idiap SGE grid :-)
Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for SGE systems, but it contains the following limitations:
1. It assumes that a :doc:`dask graph <dask:graphs>` runs in an homogeneous grid setup. For instance, if parts your graph needs a specific resource that it's avaible in other SGE queues (e.g q_gpu, q_long_gpu, IO_BIG), the scheduler is not able to request those resources on the fly.
2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues
======================================
SGE queue specs are defined in python dictionary as in the example below, where, the root keys are the labels of the SGE queue and the other inner keys represents:
1. **queue**: The real name of the SGE queue
2. **memory**: The amount of memory required for the job
3. **io_big**: Submit jobs with IO_BIG=TRUE
4. **resource_spec**: Whatever other key using in `qsub -l`
5. **resources**: Reference label used to tag :doc:`dask delayed <dask:delayed>` so it will run in a specific queue. This is a very important feature the will be discussed in the next section.
.. code:: python
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "resources": {"GPU":1},
... },
... }
Now that the queue specifications are set, let's trigger some jobs.
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale_up(10) # Submitting 10 jobs in the q_1day queue
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
>>> client = Client(cluster) # Creating the scheduler
..todo ::
Provide code sample
......@@ -6,9 +6,8 @@
Bob Pipelines
===============
Easilly boost your `Scikit Learn Pipelines <https://scikit-learn.org/stable/index.html>`_ with powerfull features, such as:
Easilly boost your :doc:`Scikit Learn Pipelines <modules/generated/sklearn.pipeline.Pipeline>` with powerfull features, such as:
.. figure:: img/dask.png
......@@ -31,10 +30,10 @@ Easilly boost your `Scikit Learn Pipelines <https://scikit-learn.org/stable/inde
.. warning::
Before any investigation of this package is capable of, check the scikit learn `user guide <https://scikit-learn.org/stable/modules/compose.html#pipeline>`_. Several `tutorials <https://scikit-learn.org/stable/tutorial/index.html>`_ are available online.
Before any investigation of this package is capable of, check the scikit learn :ref:`user guide <scikit-learn:pipeline>`. Several :ref:`tutorials <scikit-learn:tutorial_menu>` are available online.
.. warning::
If you want to implement your own scikit-learn estimator, please, check it out this `link <https://scikit-learn.org/stable/developers/develop.html>`_
If you want to implement your own scikit-learn estimator, please, check it out this :doc:`link <scikit-learn:developers/develop>`
......
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
from bob.pipelines.mixins import estimator_dask_it
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1,2],[3,4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2",
model_path="./checkpoint_2/model.pkl",
),
)
# Create a dask graph from a pipeline
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5)
# Run the task graph in the local computer in a single tread
X_transformed = dasked_pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment