Commit db0c0bae authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch 'user-guide' into 'master'

New User Guide

See merge request !22
parents 1a11a53a 56b19bf0
Pipeline #39111 passed with stages
in 4 minutes and 59 seconds
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
__path__ = extend_path(__path__, __name__)
\ No newline at end of file
......@@ -164,57 +164,64 @@ class SGEIdiapCluster(JobQueueCluster):
tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html)
Example
-------
Below follow a vanilla-example that will create a set of jobs on all.q
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster()
>>> cluster.scale_up(10)
>>> client = Client(cluster)
Below follow a vanilla-example that will create a set of jobs on all.q:
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster # doctest: +SKIP
>>> from dask.distributed import Client # doctest: +SKIP
>>> cluster = SGEIdiapCluster() # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
It's possible to demand a resource specification yourself:
>>> Q_1DAY_IO_BIG_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
}
}
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC)
>>> cluster.scale_up(10)
>>> client = Client(cluster)
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... }
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
More than one jon spec can be set:
>>> Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"GPU":1},
},
}
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale_up(10)
>>> cluster.scale_up(1, sge_job_spec_key="gpu")
>>> client = Client(cluster)
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "resources": {"GPU":1},
... },
... }
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.scale_up(10) # doctest: +SKIP
>>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
Adaptive job allocation can also be used via `AdaptiveIdiap` extension:
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
Adaptive job allocation can also be used via `AdaptiveIdiap` extension
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10)
>>> client = Client(cluster)
"""
......@@ -321,7 +328,7 @@ class SGEIdiapCluster(JobQueueCluster):
Quantity of jobs to scale
sge_job_spec_key: str
One of the specs :py:attr:`SGEIdiapCluster.sge_job_spec`
One of the specs `SGEIdiapCluster.sge_job_spec`
"""
if n_jobs == 0:
......@@ -359,8 +366,8 @@ class AdaptiveIdiap(Adaptive):
"""
Custom mechanism to adaptively allocate workers based on scheduler load
This custom implementation extends the :py:meth:`Adaptive.recommendations` by looking
at the :py:meth:`distributed.scheduler.TaskState.resource_restrictions`.
This custom implementation extends the `Adaptive.recommendations` by looking
at the `distributed.scheduler.TaskState.resource_restrictions`.
The heristics is:
......@@ -449,7 +456,7 @@ class SchedulerIdiap(Scheduler):
"""
Idiap extended distributed scheduler
This scheduler extends :py:class:`Scheduler` by just adding a handler
This scheduler extends `Scheduler` by just adding a handler
that fetches, at every scheduler cycle, the resource restrictions of
a task that has status `no-worker`
"""
......@@ -473,4 +480,4 @@ class SchedulerIdiap(Scheduler):
):
resource_restrictions.append(self.tasks[k].resource_restrictions)
return resource_restrictions
return resource_restrictions
This diff is collapsed.
......@@ -21,11 +21,11 @@ class DelayedSample:
Parameters
----------
load : function
load:
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
parent : :py:class:`DelayedSample`, :py:class:`Sample`, None
parent : :py:class:`bob.pipelines.sample.DelayedSample`, :py:class:`bob.pipelines.sample.Sample`, None
If passed, consider this as a parent of this sample, to copy
information
......
......@@ -2,3 +2,6 @@
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
def is_picklable(obj):
"""
Test if an object is picklable or not
......
......@@ -25,15 +25,17 @@ requirements:
host:
- python {{ python }}
- setuptools {{ setuptools }}
- numpy {{ numpy }}
- bob.extension
- bob.io.base
run:
- python
- setuptools
- numpy
- dask
- dask-jobqueue
- numpy {{ numpy }}
- scikit-learn >=0.22
- distributed
- scikit-learn
test:
imports:
......
.. _checkpoint:
=======================
Checkpointing Samples
=======================
Mechanism that allows checkpointing of :py:class:`bob.pipelines.sample.Sample` during the processing of :py:class:`sklearn.pipeline.Pipeline` using `HDF5 <https://www.hdfgroup.org/solutions/hdf5/>`_ files.
Very often during the processing of :py:class:`sklearn.pipeline.Pipeline` with big chunks of data is useful to have checkpoints of some steps of the pipeline into the disk.
This is useful for several purposes:
- Reuse samples that are expensive to be re-computed
- Inspection of algorithms
Scikit learn has a caching mechanism that allows the caching of `estimators <https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline>`_ that can be used for such purpose.
Althought useful, such structure is not user friendly.
As in :ref:`sample`, this can be approached with the :py:class:`bob.pipelines.mixins.CheckpointMixin` mixin, where a new class can be created either dynamically with the :py:func:`bob.pipelines.mixins.mix_me_up` function:
.. code:: python
>>> bob.pipelines.mixins import CheckpointMixin
>>> MyTransformerCheckpoint = mix_me_up((CheckpointMixin,), MyTransformer)
or explicitly:
.. code:: python
>>> bob.pipelines.mixins import CheckpointMixin
>>> class MyTransformerCheckpoint(CheckpointMixin, MyTransformer):
>>> pass
Checkpointing a transformer
---------------------------
The code below is a repetition of the example from :ref:`sample`, but now `MyTransformer` is checkpointable once `MyTransformer.transform` is executed.
.. literalinclude:: ./python/pipeline_example_boosted_checkpoint.py
:linenos:
:emphasize-lines: 23, 28, 34, 38
.. warning::
In line 28, samples are created with the keyword argument, `key`. The :py:class:`bob.pipelines.mixins.CheckpointMixin` uses this information for saving.
The keyword argument `features_dir` defined in lines 34 and 38 sets the absolute path where those samples will be saved
.. _checkpoint_statefull:
Checkpointing an statefull transformers
---------------------------------------
Statefull transformers, are transformers that implement the methods `fit` and `transform`.
Those can be checkpointed too as can be observed in the example below.
.. literalinclude:: ./python/pipeline_example_boosted_checkpoint_estimator.py
:linenos:
:emphasize-lines: 52-55
The keyword argument `features_dir` and `model_payh` defined in lines 52 to 55 sets the absolute path where samples and the model trained after fit will be saved
.. note::
SampleSets can be checkpointed in the exact same way as Samples.
......@@ -48,7 +48,7 @@ todo_include_todos = True
autosummary_generate = True
# Create numbers on figures with captions
numfig = True
numfig = False
# If we are on OSX, the 'dvipng' path maybe different
dvipng_osx = "/Library/TeX/texbin/dvipng"
......
.. _dask:
========================================
Dask: Scale your scikit.learn pipelines
========================================
"`Dask is <dask:index>`_ a flexible library for parallel computing in Python.".
The purpose of this guide is not to describe how dask works.
For that, go to its documentation.
Moreover, there are plenty of tutorials online.
For instance, `this official one <https://github.com/dask/dask-tutorial>`_; a nice overview was presented in `AnacondaCon 2018 <https://www.youtube.com/watch?v=tQBovBvSDvA>`_ and there's even one crafted for `Idiap <https://github.com/tiagofrepereira2012/tam-dask>`_.
The purpose of this guide is to describe:
1. The integration of dask with scikit learn pipelines and samples
2. The specificities of `Dask` under the Idiap SGE
From Scikit Learn pipelines to Dask Task Graphs
-----------------------------------------------
The purpose of :doc:`scikit learn pipelines <modules/generated/sklearn.pipeline.Pipeline>` is to assemble several :doc:`scikit estimators <modules/generated/sklearn.base.BaseEstimator>` in one final one.
Then, it is possible to use the methods `fit` and `transform` to create models and transform your data respectivelly.
Any :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` can be transformed in a :doc:`Dask Graph <graphs>` to be further executed by any :doc:`Dask Client <client>`.
This is carried out via the :py:func:`bob.pipelines.mixins.estimator_dask_it` function.
Such fuction does two things:
1. Edit the current :py:class:`sklearn.pipeline.Pipeline` by adding a new first step, where input samples are transformed in :doc:`Dask Bag <bag>`. This allows the usage of :py:func:`dask.bag.map` for further transformations.
2. Mix all :doc:`estimators <modules/generated/sklearn.base.BaseEstimator>` in the pipeline with the :py:class:`bob.pipelines.mixins.DaskEstimatorMixin`. Such mixin is reponsible for the creation of the task graph for the methods `.fit` and `.transform`.
The code snippet below enables such feature for an arbitrary :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>`.
.. code:: python
>>> from bob.pipelines.mixins import estimator_dask_it
>>> dask_pipeline = estimator_dask_it(make_pipeline(...)) # Create a dask graph
>>> dask_pipeline.fit_transform(....).compute() # Run the task graph using the default client
The code below is the same as the one presented in :ref:`checkpoint example <checkpoint_statefull>`.
However, lines 59-63 convert such pipeline in a :doc:`Dask Graph <graphs>` and runs it in a local computer.
.. literalinclude:: ./python/pipeline_example_dask.py
:linenos:
:emphasize-lines: 59-63
Such code generates the following graph.
.. figure:: python/dask_graph.png
This graph can be seem by running `http://localhost:8787` during its execution.
Dask + Idiap SGE
----------------
Dask, allows the deployment and parallelization of graphs either locally or in complex job queuing systems, such as PBS, SGE....
This is achieved via :doc:`Dask-Jobqueue <dask-jobqueue:index>`.
Below follow a nice video explaining what is the :doc:`Dask-Jobqueue <dask-jobqueue:index>`, some of its features and how to use it to run :doc:`dask graphs <graphs>`.
.. raw:: html
<iframe width="560" height="315" src="https://www.youtube.com/embed/FXsgmwpRExM" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
The snippet below shows how to deploy the exact same pipeline from the previous section in the Idiap SGE cluster
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster() # Creates the SGE launcher that launches jobs in the q_all
>>> cluster.scale(10) # Submite 10 jobs in q_all
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
That's it, you just run a scikit pipeline in the Idiap SGE grid :-)
Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for SGE systems, but it contains the following limitations:
1. It assumes that a :doc:`dask graph <dask:graphs>` runs in an homogeneous grid setup. For instance, if parts your graph needs a specific resource that it's avaible in other SGE queues (e.g q_gpu, q_long_gpu, IO_BIG), the scheduler is not able to request those resources on the fly.
2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues
======================================
SGE queue specs are defined in python dictionary as in the example below, where, the root keys are the labels of the SGE queue and the other inner keys represents:
1. **queue**: The real name of the SGE queue
2. **memory**: The amount of memory required for the job
3. **io_big**: Submit jobs with IO_BIG=TRUE
4. **resource_spec**: Whatever other key using in `qsub -l`
5. **resources**: Reference label used to tag :doc:`dask delayed <dask:delayed>` so it will run in a specific queue. This is a very important feature the will be discussed in the next section.
.. code:: python
>>> Q_1DAY_GPU_SPEC = {
... "default": {
... "queue": "q_1day",
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "resources": "",
... },
... "gpu": {
... "queue": "q_gpu",
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "resources": {"GPU":1},
... },
... }
Now that the queue specifications are set, let's trigger some jobs.
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
>>> cluster.scale(10) # Submitting 9 jobs in the q_1day queue
>>> client = Client(cluster) # Creating the scheduler
.. note::
To check if the jobs were actually submitted always do `qstat`::
$ qstat
Running estimator operations in specific SGE queues
===================================================
Sometimes it's necessary to run parts of a :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` in specific SGE queues (e.g. q_1day IO_BIG or q_gpu).
The example below shows how this is approached (lines 78 to 88).
In this example, the `fit` method of `MyBoostedFitTransformer` runs on `q_gpu`
.. literalinclude:: ./python/pipeline_example_dask_sge.py
:linenos:
:emphasize-lines: 78-88
Adaptive SGE: Scale up/down SGE cluster according to the graph complexity
=========================================================================
One note about the code from the last section.
Every time` cluster.scale` is executed to increase the amount of available SGE jobs to run a :doc:`dask graph <graphs>`, such resources will be available until the end of its execution.
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :py:class`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
.. code:: python
>>> cluster.adapt(minimum=2, maximum=10)
The code below shows the same example, but with adaptive cluster.
Note line 83
.. literalinclude:: ./python/pipeline_example_dask_sge_adaptive.py
:linenos:
:emphasize-lines: 83
This diff is collapsed.
py:meth dask.distributed.Adaptive
py:class dask_jobqueue.core.JobQueueCluster
py:class distributed.deploy.adaptive.Adaptive
py:class dask_jobqueue.core.Job
py:class sklearn.preprocessing._function_transformer.FunctionTransformer
\ No newline at end of file
============================
Python API for bob.pipelines
============================
Samples
-----------------
.. automodule:: bob.pipelines.sample
Mixins
-----------------
.. automodule:: bob.pipelines.mixins
Idiap SGE Support
-----------------
.. automodule:: bob.pipelines.distributed.sge
.. automodule:: bob.pipelines.distributed.local
Transformers
------------
.. automodule:: bob.pipelines.transformers
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m["offset"] for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Creating X
X = numpy.zeros((2, 2))
# Building an arbitrary pipeline
pipeline = make_pipeline(MyTransformer(), MyTransformer())
X_transformed = pipeline.transform([X])
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples
MyBoostedTransformer = mix_me_up((SampleMixin,), MyTransformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
)
X_transformed = pipeline.transform([X_as_sample])
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, key="1", metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_2",
),
)
X_transformed = pipeline.transform([X_as_sample])
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1,2],[3,4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, key="1", metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2",
model_path="./checkpoint_2/model.pkl",
),
)
X_transformed = pipeline.fit_transform([X_as_sample])
from bob.pipelines.sample import DelayedSample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
import pickle
import functools
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples
MyBoostedTransformer = mix_me_up((SampleMixin,), MyTransformer)
# X is stored in the disk
X = open("delayed_sample.pkl", "rb")
# Wrapping X with Samples
X_as_sample = DelayedSample(functools.partial(pickle.load, X), metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
)
X_transformed = pipeline.transform([X_as_sample])
\ No newline at end of file
from bob.pipelines.sample import Sample, SampleSet
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy