Commit 2c3e754c authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Fix checkpointing guide

parent a17eb7c0
Pipeline #39281 failed with stage
in 8 minutes and 26 seconds
......@@ -2,6 +2,7 @@ from . import utils
from .sample import Sample, DelayedSample, SampleSet
from .wrappers import BaseWrapper, DelayedSamplesCall, SampleWrapper, CheckpointWrapper, DaskWrapper, ToDaskBag, wrap, dask_tags
from . import distributed
from . import transformers
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
......@@ -28,3 +28,24 @@ def test_sampleset_collection():
# Testing iterator
for i in sampleset:
assert isinstance(i, mario.Sample)
import tempfile
import os
import numpy as np
with tempfile.TemporaryDirectory() as dir_name:
checkpointing_transformer = mario.CheckpointWrapper(features_dir=dir_name)
# now let's create some samples with ``key`` metadata
# Creating X: 3 samples, 2 features
X = np.zeros((3, 2))
# 3 offsets: one for each sample
offsets = np.arange(3).reshape((3, 1))
# key values must be string because they will be used to create file names.
samples = [mario.Sample(x, offset=o, key=str(i)) for i, (x, o) in enumerate(zip(X, offsets))]
samples[0]
# let's transform them
transformed_samples = checkpointing_transformer.transform(samples)
# Let's check the features directory
os.listdir(dir_name)
......@@ -263,7 +263,13 @@ class CheckpointWrapper(BaseWrapper, TransformerMixin):
if self.features_dir is None:
return None
return os.path.join(self.features_dir, str(sample.key) + self.extension)
key = str(sample.key)
if key.startswith(os.sep) or ".." in key:
raise ValueError(
"Sample.key values should be relative paths with no "
f"reference to upper folders. Got: {key}"
)
return os.path.join(self.features_dir, key + self.extension)
def save(self, sample):
path = self.make_path(sample)
......
.. _bob.pipelines.checkpoint:
=======================
Checkpointing Samples
=======================
Here, we detail a mechanism that allows saving of :any:`bob.pipelines.Sample`s
during the processing of estimators into the disk.
=============
Checkpointing
=============
Very often during the processing of :any:`sklearn.pipeline.Pipeline` with big chunks of
data is useful to have checkpoints of some steps of the pipeline into the disk. This is
useful for several purposes:
- Reuse samples that are expensive to be re-computed
- Inspection of algorithms
Scikit-learn has a caching mechanism that allows the caching of `estimators <https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline>`_ that can be used for such purpose.
Although useful, such structure is not user friendly.
As in :ref:`bob.pipelines.sample`, this can be approached with the :any:`bob.pipelines.mixins.CheckpointMixin` mixin, where a new class can be created either dynamically with the :py:func:`bob.pipelines.mixins.mix_me_up` function:
.. code:: python
>>> bob.pipelines.mixins import CheckpointMixin
>>> MyTransformerCheckpoint = mix_me_up((CheckpointMixin,), MyTransformer)
or explicitly:
.. code:: python
>>> bob.pipelines.mixins import CheckpointMixin
>>> class MyTransformerCheckpoint(CheckpointMixin, MyTransformer):
>>> pass
Checkpointing a transformer
---------------------------
data, it is useful to have checkpoints of some steps of the pipeline into the disk. This
is useful for several purposes, such as:
- Reusing samples that are expensive to be re-computed.
- Inspection of algorithms.
Scikit-learn has a caching mechanism that allows the caching of
:any:`sklearn.pipeline.Pipeline` that can be used for such purpose. Although useful,
such structure is not user friendly.
As we detailed in :ref:`bob.pipelines.sample`, sklearn estimators can be extended to
handle samples with metadata. Now, one metadata can be a unique identifier of each
sample. We will refer to this unique identifier as ``sample.key``. If we have that in
our samples, we can use that identifier to save and load samples from disk. This is what
we call checkpointing and to do this, all you need to do is to wrap your estimator with
:any:`CheckpointWrapper` and make sure your samples have the ``.key`` metadata.
Checkpointing samples
---------------------
Below, you will see an example on how checkpointing works. First, let's make a
transformer.
.. doctest::
>>> # by convention, we import bob.pipelines as mario, because mario works with pipes ;)
>>> import bob.pipelines as mario
>>> import numpy as np
>>> from sklearn.base import TransformerMixin, BaseEstimator
>>>
>>> class MyTransformer(TransformerMixin, BaseEstimator):
... def transform(self, X, sample_specific_offsets):
... print(f"Transforming {len(X)} samples ...")
... return np.array(X) + np.array(sample_specific_offsets)
...
... def fit(self, X):
... print("Fit was called!")
... return self
All checkpointing transformers must be able to handle :any:`Sample`'s.
For that, we can use :any:`SampleWrapper`:
.. doctest::
>>> transform_extra_arguments=[("sample_specific_offsets", "offset")]
>>> sample_transformer = mario.SampleWrapper(MyTransformer(), transform_extra_arguments)
Then, we wrap it with :any:`CheckpointWrapper`:
.. doctest::
>>> # create some samples with ``key`` metadata
>>> # Creating X: 3 samples, 2 features
>>> X = np.zeros((3, 2))
>>> # 3 offsets: one for each sample
>>> offsets = np.arange(3).reshape((3, 1))
>>> # key values must be string because they will be used to create file names.
>>> samples = [mario.Sample(x, offset=o, key=str(i)) for i, (x, o) in enumerate(zip(X, offsets))]
>>> samples[0]
Sample(data=array([0., 0.]), offset=array([0]), key='0')
>>> import tempfile
>>> import os
>>> # create a temporary directory to save checkpoints
>>> with tempfile.TemporaryDirectory() as dir_name:
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, features_dir=dir_name)
...
... # transform samples
... transformed_samples = checkpointing_transformer.transform(samples)
...
... # Let's check the features directory
... list(sorted(os.listdir(dir_name)))
Transforming 3 samples ...
['0.h5', '1.h5', '2.h5']
The code below is a repetition of the example from :ref:`bob.pipelines.sample`, but now `MyTransformer` is checkpointable once `MyTransformer.transform` is executed.
.. literalinclude:: ./python/pipeline_example_boosted_checkpoint.py
:linenos:
:emphasize-lines: 23, 28, 34, 38
.. warning::
In line 28, samples are created with the keyword argument, `key`. The :any:`bob.pipelines.mixins.CheckpointMixin` uses this information for saving.
The keyword argument `features_dir` defined in lines 34 and 38 sets the absolute path where those samples will be saved
.. _checkpoint_statefull:
Checkpointing an statefull transformers
---------------------------------------
.. note::
By default, :any:`CheckpointWrapper` saves samples inside HDF5 files
but you can change that. Refer to its documentation to see how.
If checkpoints for a sample already exists, it will not be recomputed but loaded from
disk:
.. doctest::
>>> # create a temporary directory to save checkpoints
>>> with tempfile.TemporaryDirectory() as dir_name:
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, features_dir=dir_name)
...
... # transform samples for the first time, it should print transforming 3 samples
... transformed_samples1 = checkpointing_transformer.transform(samples)
...
... # transform samples again. This time it should not print transforming 3
... # samples
... transformed_samples2 = checkpointing_transformer.transform(samples)
...
... # It should print True
... print(np.allclose(transformed_samples1[1].data, transformed_samples2[1].data))
Transforming 3 samples ...
True
Statefull transformers, are transformers that implement the methods `fit` and `transform`.
Those can be checkpointed too as can be observed in the example below.
.. note::
.. literalinclude:: ./python/pipeline_example_boosted_checkpoint_estimator.py
:linenos:
:emphasize-lines: 52-55
:any:`SampleSet`'s can be checkpointed as well. The samples inside them
should have the ``.key`` metadata.
The keyword argument `features_dir` and `model_payh` defined in lines 52 to 55 sets the absolute path where samples and the model trained after fit will be saved
Checkpointing estimators
------------------------
We can also checkpoint estimators after their training (``estimator.fit``). This allows
us to load the estimator from disk instead of training it if ``.fit`` is called and a
checkpoint exists.
.. note::
SampleSets can be checkpointed in the exact same way as Samples.
.. doctest::
>>> # create a temporary directory to save checkpoints
>>> with tempfile.NamedTemporaryFile(prefix="model", suffix=".pkl") as f:
... f.close()
... checkpointing_transformer = mario.CheckpointWrapper(
... sample_transformer, model_path=f.name)
...
... # call .fit for the first time, it should print Fit was called!
... __ = checkpointing_transformer.fit(samples)
...
... # call .fit again. This time it should not print anything
... __ = checkpointing_transformer.fit(samples)
Fit was called!
.. _dask:
.. _bob.pipelines.dask:
========================================
Dask: Scale your scikit.learn pipelines
......@@ -12,7 +12,7 @@ Moreover, there are plenty of tutorials online.
For instance, `this official one <https://github.com/dask/dask-tutorial>`_; a nice overview was presented in `AnacondaCon 2018 <https://www.youtube.com/watch?v=tQBovBvSDvA>`_ and there's even one crafted for `Idiap <https://github.com/tiagofrepereira2012/tam-dask>`_.
The purpose of this guide is to describe:
1. The integration of dask with scikit learn pipelines and samples
2. The specificities of `Dask` under the Idiap SGE
......@@ -67,9 +67,9 @@ This is achieved via :doc:`Dask-Jobqueue <dask-jobqueue:index>`.
Below follow a nice video explaining what is the :doc:`Dask-Jobqueue <dask-jobqueue:index>`, some of its features and how to use it to run :doc:`dask graphs <graphs>`.
.. raw:: html
<iframe width="560" height="315" src="https://www.youtube.com/embed/FXsgmwpRExM" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
The snippet below shows how to deploy the exact same pipeline from the previous section in the Idiap SGE cluster
......@@ -78,7 +78,7 @@ The snippet below shows how to deploy the exact same pipeline from the previous
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day
>>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
......@@ -91,7 +91,7 @@ Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for S
2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues
......@@ -130,7 +130,7 @@ SGE queue specs are defined in python dictionary as in the example below, where,
Now that the queue specifications are set, let's trigger some jobs.
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
......@@ -144,7 +144,7 @@ Now that the queue specifications are set, let's trigger some jobs.
Running estimator operations in specific SGE queues
Running estimator operations in specific SGE queues
===================================================
Sometimes it's necessary to run parts of a :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` in specific SGE queues (e.g. q_1day IO_BIG or q_gpu).
......
......@@ -3,19 +3,12 @@
Python API for bob.pipelines
============================
Samples
-----------------
.. automodule:: bob.pipelines.sample
Mixins
-----------------
.. automodule:: bob.pipelines.mixins
Main module
-----------
.. automodule:: bob.pipelines
Idiap SGE Support
Heterogeneous SGE
-----------------
.. automodule:: bob.pipelines.distributed.sge
......
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, key="1", metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_2",
),
)
X_transformed = pipeline.transform([X_as_sample])
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, CheckpointMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
def transform(self, X):
# Transform `X`
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1,2],[3,4]])
return self
# Mixing up MyTransformer with the capabilities of handling Samples AND checkpointing
MyBoostedTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyTransformer)
MyBoostedFitTransformer = mix_me_up((CheckpointMixin, SampleMixin), MyFitTranformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, key="1", metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(
transform_extra_arguments=(("metadata", "metadata"),),
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2",
model_path="./checkpoint_2/model.pkl",
),
)
X_transformed = pipeline.fit_transform([X_as_sample])
.. _bob.pipelines.sample:
==============================================================
=========================================================
Samples, a way to enhance scikit pipelines with metadata
==============================================================
=========================================================
Some tasks in pattern recognition demands the usage of metadata to support some processing (e.g. face cropping, audio segmentation).
To support scikit-learn based estimators with such requirement task, this package provides two mechanisms that:
1. Wraps input data in a layer called :any:`Sample` that allows you to append some metadata to your original input data.
2. A wrapper class (:any:`bob.pipelines.SampleWrapper`) that interplay between :any:`bob.pipelines.Sample` and your estimator.
2. A wrapper class (:any:`SampleWrapper`) that interplay between :any:`Sample` and your estimator.
What is a Sample ?
------------------
A :any:`bob.pipelines.Sample` is simple container that wraps a data-point.
A :any:`Sample` is simple container that wraps a data-point.
The example below shows how this can be used to wrap a :any:`numpy.array`.
.. doctest::
>>> # by convention, we import bob.pipelines as mario, because mario worked with pipes ;)
>>> # by convention, we import bob.pipelines as mario, because mario works with pipes ;)
>>> import bob.pipelines as mario
>>> import numpy as np
>>> data = np.array([1, 3])
......@@ -33,7 +33,7 @@ The example below shows how this can be used to wrap a :any:`numpy.array`.
Sample and metadata
-------------------
Metadata can be added as keyword arguments in :any:`bob.pipelines.Sample`, like:
Metadata can be added as keyword arguments in :any:`Sample`, like:
.. doctest::
......@@ -88,7 +88,7 @@ While this transformer works well by itself, it can't be used by
...
TypeError: _transform() takes 2 positional arguments but 3 were given
To approach this issue, :any:`bob.pipelines.SampleWrapper` can be used. This class wraps
To approach this issue, :any:`SampleWrapper` can be used. This class wraps
other estimators and accepts as input samples and passes the data with metadata inside
samples to the wrapped estimator:
......@@ -99,7 +99,7 @@ samples to the wrapped estimator:
>>> samples[1]
Sample(data=array([0., 0.]), offset=array([1]))
Now we need to tell :any:`bob.pipelines.SampleWrapper` to pass the ``offset`` inside
Now we need to tell :any:`SampleWrapper` to pass the ``offset`` inside
samples as an extra argument to our transformer as ``sample_specific_offsets``. This is
accommodated by the ``transform_extra_arguments`` parameter. It accepts a list of tuples
that maps sample metadata to arguments of the transformer:
......@@ -141,14 +141,14 @@ Delayed Sample
Sometimes keeping several samples into memory and transferring them over the network can
be very memory and bandwidth demanding. For these cases, there is
:any:`bob.pipelines.DelayedSample`.
:any:`DelayedSample`.
A :any:`bob.pipelines.DelayedSample` acts like a :any:`bob.pipelines.Sample`, but its `data` attribute is implemented as a
A :any:`DelayedSample` acts like a :any:`Sample`, but its `data` attribute is implemented as a
function that can load the respective data from its permanent storage representation. To
create a :any:`bob.pipelines.DelayedSample`, you pass a ``load()`` function that when called without any
create a :any:`DelayedSample`, you pass a ``load()`` function that when called without any
parameter, it must load and return the required data.
Below, follow an example on how to use :any:`bob.pipelines.DelayedSample`.
Below, follow an example on how to use :any:`DelayedSample`.
.. doctest::
......@@ -168,7 +168,7 @@ As soon as you access the ``.data`` attribute, the data is loaded and kept in me
Loading data from disk!
array([0., 0.])
:any:`bob.pipelines.DelayedSample` can be used instead of :any:`bob.pipelines.Sample`
:any:`DelayedSample` can be used instead of :any:`Sample`
transparently:
.. doctest::
......@@ -183,18 +183,20 @@ transparently:
[2., 2.],
[4., 4.]])
Actually, :any:`bob.pipelines.SampleWrapper` always returns
:any:`bob.pipelines.DelayedSample`s. This becomes useful when the data in returned
samples are not used which we will see that happen in :ref:`bob.pipelines.checkpoint`.
.. note::
Actually, :any:`SampleWrapper` always returns
:any:`DelayedSample`'s. This becomes useful when the data in returned
samples are not used which we will see that happen in :ref:`bob.pipelines.checkpoint`.
Sample Set
----------
A :any:`bob.pipelines.SampleSet`, as the name suggests, represents a set of samples.
A :any:`SampleSet`, as the name suggests, represents a set of samples.
Such set of samples can represent the samples that belongs to a class.
Below, follow an snippet on how to use :any:`bob.pipelines.SampleSet`.
Below, follow an snippet on how to use :any:`SampleSet`.
.. doctest::
......@@ -205,9 +207,9 @@ Below, follow an snippet on how to use :any:`bob.pipelines.SampleSet`.
>>> sample_sets[0]
SampleSet(samples=[Sample(data=array([0., 0.]), offset=array([0])), Sample(data=array([0., 0.]), offset=array([1])), Sample(data=array([0., 0.]), offset=array([2]))], class_name='A')
:any:`bob.pipelines.SampleWrapper` works transparently with
:any:`bob.pipelines.SampleSet`s as well. It will transform each sample inside and
returns the same SampleSets with new data.
:any:`SampleWrapper` works transparently with :any:`SampleSet`'s as well. It will
transform each sample inside and returns the same SampleSets with new data.
.. doctest::
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment