Commit a17eb7c0 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Fixed the sample guide

parent 705306ae
Pipeline #39264 failed with stage
in 6 minutes and 16 seconds
from bob.pipelines.sample import Sample, SampleSet, DelayedSample
import bob.pipelines as mario
import numpy
import copy
......@@ -8,13 +8,13 @@ def test_sampleset_collection():
n_samples = 10
X = numpy.ones(shape=(n_samples, 2), dtype=int)
sampleset = SampleSet(
[Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
sampleset = mario.SampleSet(
[mario.Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
)
assert len(sampleset) == n_samples
# Testing insert
sample = Sample(X, key=100)
sample = mario.Sample(X, key=100)
sampleset.insert(1, sample)
assert len(sampleset) == n_samples + 1
......@@ -27,4 +27,4 @@ def test_sampleset_collection():
# Testing iterator
for i in sampleset:
assert isinstance(i, Sample)
assert isinstance(i, mario.Sample)
import nose
import numpy as np
import os
import bob.pipelines as skblocks
import bob.pipelines as mario
from tempfile import NamedTemporaryFile
......@@ -50,7 +50,7 @@ def test_io_vstack():
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(skblocks.utils.vstack_features(reader, paths) == oracle(reader, paths))
np.all(mario.utils.vstack_features(reader, paths) == oracle(reader, paths))
# when same_size is True
for reader in [
......@@ -60,11 +60,11 @@ def test_io_vstack():
reader_same_size_F2,
]:
np.all(
skblocks.utils.vstack_features(reader, paths, True) == oracle(reader, paths)
mario.utils.vstack_features(reader, paths, True) == oracle(reader, paths)
)
with nose.tools.assert_raises(AssertionError):
skblocks.utils.vstack_features(reader_wrong_size, paths)
mario.utils.vstack_features(reader_wrong_size, paths)
# test actual files
suffix = ".npy"
......@@ -88,12 +88,12 @@ def test_io_vstack():
np.save(path, reader(i + 1), allow_pickle=False)
# test when all data is present
reference = oracle(np.load, paths)
np.all(skblocks.utils.vstack_features(np.load, paths) == reference)
np.all(mario.utils.vstack_features(np.load, paths) == reference)
try:
os.remove(paths[0])
# Check if RuntimeError is raised when one of the files is missing
with nose.tools.assert_raises(FileNotFoundError):
skblocks.utils.vstack_features(np.load, paths)
mario.utils.vstack_features(np.load, paths)
finally:
# create the file back so NamedTemporaryFile does not complain
np.save(paths[0], reader(i + 1))
......@@ -9,8 +9,7 @@ from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
# import skpipes as skp
import bob.pipelines as skp
import bob.pipelines as mario
def _offset_add_func(X, offset=1):
......@@ -89,9 +88,9 @@ def test_sklearn_compatible_estimator():
def test_function_sample_transfomer():
X = np.zeros(shape=(10, 2), dtype=int)
samples = [skp.Sample(data) for data in X]
samples = [mario.Sample(data) for data in X]
transformer = skp.wrap(
transformer = mario.wrap(
[FunctionTransformer, "sample"],
func=_offset_add_func,
kw_args=dict(offset=3),
......@@ -108,10 +107,10 @@ def test_function_sample_transfomer():
def test_fittable_sample_transformer():
X = np.ones(shape=(10, 2), dtype=int)
samples = [skp.Sample(data) for data in X]
samples = [mario.Sample(data) for data in X]
# Mixing up with an object
transformer = skp.wrap([DummyWithFit, "sample"])
transformer = mario.wrap([DummyWithFit, "sample"])
features = transformer.fit(samples).transform(samples)
_assert_all_close_numpy_array(X + 1, [s.data for s in features])
......@@ -132,13 +131,13 @@ def _assert_checkpoints(features, oracle, model_path, features_dir, stateless):
def _assert_delayed_samples(samples):
for s in samples:
assert isinstance(s, skp.DelayedSample)
assert isinstance(s, mario.DelayedSample)
def test_checkpoint_function_sample_transfomer():
X = np.arange(20, dtype=int).reshape(10, 2)
samples = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
offset = 3
oracle = X + offset
......@@ -146,7 +145,7 @@ def test_checkpoint_function_sample_transfomer():
model_path = os.path.join(d, "model.pkl")
features_dir = os.path.join(d, "features")
transformer = skp.wrap(
transformer = mario.wrap(
[FunctionTransformer, "sample", "checkpoint"],
func=_offset_add_func,
kw_args=dict(offset=offset),
......@@ -168,7 +167,7 @@ def test_checkpoint_function_sample_transfomer():
_assert_checkpoints(features, oracle, model_path, features_dir, True)
# test when both model_path and features_dir is None
transformer = skp.wrap(
transformer = mario.wrap(
[FunctionTransformer, "sample", "checkpoint"],
func=_offset_add_func,
kw_args=dict(offset=offset),
......@@ -180,19 +179,19 @@ def test_checkpoint_function_sample_transfomer():
def test_checkpoint_fittable_sample_transformer():
X = np.ones(shape=(10, 2), dtype=int)
samples = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
oracle = X + 1
with tempfile.TemporaryDirectory() as d:
model_path = os.path.join(d, "model.pkl")
features_dir = os.path.join(d, "features")
transformer = skp.wrap(
transformer = mario.wrap(
[DummyWithFit, "sample", "checkpoint"],
model_path=model_path,
features_dir=features_dir,
)
assert not skp.utils.is_estimator_stateless(transformer)
assert not mario.utils.is_estimator_stateless(transformer)
features = transformer.fit(samples).transform(samples)
_assert_checkpoints(features, oracle, model_path, features_dir, False)
......@@ -212,7 +211,7 @@ def _build_estimator(path, i):
model_path = os.path.join(base_dir, "model.pkl")
features_dir = os.path.join(base_dir, "features")
transformer = skp.wrap(
transformer = mario.wrap(
[DummyWithFit, "sample", "checkpoint"],
model_path=model_path,
features_dir=features_dir,
......@@ -223,7 +222,7 @@ def _build_estimator(path, i):
def _build_transformer(path, i, picklable=True):
features_dir = os.path.join(path, f"transformer{i}")
estimator = skp.wrap(
estimator = mario.wrap(
[DummyTransformer, "sample", "checkpoint"], i=i, features_dir=features_dir
)
return estimator
......@@ -232,8 +231,8 @@ def _build_transformer(path, i, picklable=True):
def test_checkpoint_fittable_pipeline():
X = np.ones(shape=(10, 2), dtype=int)
samples = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [skp.Sample(data, key=str(i + 10)) for i, data in enumerate(X)]
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [mario.Sample(data, key=str(i + 10)) for i, data in enumerate(X)]
oracle = X + 3
with tempfile.TemporaryDirectory() as d:
......@@ -249,7 +248,7 @@ def test_checkpoint_transform_pipeline():
def _run(dask_enabled):
X = np.ones(shape=(10, 2), dtype=int)
samples_transform = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
offset = 2
oracle = X + offset
......@@ -258,7 +257,7 @@ def test_checkpoint_transform_pipeline():
[(f"{i}", _build_transformer(d, i)) for i in range(offset)]
)
if dask_enabled:
pipeline = skp.wrap(["dask"], pipeline)
pipeline = mario.wrap(["dask"], pipeline)
transformed_samples = pipeline.transform(samples_transform).compute(
scheduler="single-threaded"
)
......@@ -274,9 +273,9 @@ def test_checkpoint_transform_pipeline():
def test_checkpoint_fit_transform_pipeline():
def _run(dask_enabled):
X = np.ones(shape=(10, 2), dtype=int)
samples = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [
skp.Sample(data, key=str(i + 10)) for i, data in enumerate(X)
mario.Sample(data, key=str(i + 10)) for i, data in enumerate(X)
]
oracle = X + 2
......@@ -285,9 +284,9 @@ def test_checkpoint_fit_transform_pipeline():
transformer = ("1", _build_transformer(d, 1))
pipeline = Pipeline([fitter, transformer])
if dask_enabled:
pipeline = skp.wrap(["dask"], pipeline, fit_tag=[(1, "GPU")])
pipeline = mario.wrap(["dask"], pipeline, fit_tag=[(1, "GPU")])
pipeline = pipeline.fit(samples)
tags = skp.dask_tags(pipeline)
tags = mario.dask_tags(pipeline)
assert len(tags) == 1, tags
transformed_samples = pipeline.transform(samples_transform)
......@@ -318,9 +317,9 @@ def _get_local_client():
def test_checkpoint_fit_transform_pipeline_with_dask_non_pickle():
def _run(dask_enabled):
X = np.ones(shape=(10, 2), dtype=int)
samples = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [
skp.Sample(data, key=str(i + 10)) for i, data in enumerate(X)
mario.Sample(data, key=str(i + 10)) for i, data in enumerate(X)
]
oracle = X + 2
......@@ -334,7 +333,7 @@ def test_checkpoint_fit_transform_pipeline_with_dask_non_pickle():
pipeline = Pipeline([fitter, transformer])
if dask_enabled:
dask_client = _get_local_client()
pipeline = skp.wrap(["dask"], pipeline)
pipeline = mario.wrap(["dask"], pipeline)
pipeline = pipeline.fit(samples)
transformed_samples = pipeline.transform(samples_transform).compute(
scheduler=dask_client
......@@ -351,12 +350,12 @@ def test_checkpoint_fit_transform_pipeline_with_dask_non_pickle():
def test_dask_checkpoint_transform_pipeline():
X = np.ones(shape=(10, 2), dtype=int)
samples_transform = [skp.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
with tempfile.TemporaryDirectory() as d:
bag_transformer = skp.ToDaskBag()
estimator = skp.wrap(["dask"], _build_transformer(d, 0), transform_tag="CPU")
bag_transformer = mario.ToDaskBag()
estimator = mario.wrap(["dask"], _build_transformer(d, 0), transform_tag="CPU")
X_tr = estimator.transform(bag_transformer.transform(samples_transform))
assert len(skp.dask_tags(estimator)) == 1
assert len(mario.dask_tags(estimator)) == 1
assert len(X_tr.compute(scheduler="single-threaded")) == 10
......@@ -364,8 +363,8 @@ def test_checkpoint_transform_pipeline_with_sampleset():
def _run(dask_enabled):
X = np.ones(shape=(10, 2), dtype=int)
samples_transform = skp.SampleSet(
[skp.Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
samples_transform = mario.SampleSet(
[mario.Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
)
offset = 2
oracle = X + offset
......@@ -375,7 +374,7 @@ def test_checkpoint_transform_pipeline_with_sampleset():
[(f"{i}", _build_transformer(d, i)) for i in range(offset)]
)
if dask_enabled:
pipeline = skp.wrap(["dask"], pipeline)
pipeline = mario.wrap(["dask"], pipeline)
transformed_samples = pipeline.transform([samples_transform]).compute(
scheduler="single-threaded"
)
......
......@@ -110,10 +110,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
if isinstance(samples[0], SampleSet):
return [
SampleSet(
self._samples_transform(
method, sset.samples, self.transform_extra_arguments
),
parent=sset,
self._samples_transform(sset.samples, method_name), parent=sset,
)
for sset in samples
]
......
.. _checkpoint:
.. _bob.pipelines.checkpoint:
=======================
Checkpointing Samples
=======================
Mechanism that allows checkpointing of :py:class:`bob.pipelines.sample.Sample` during the processing of :py:class:`sklearn.pipeline.Pipeline` using `HDF5 <https://www.hdfgroup.org/solutions/hdf5/>`_ files.
Here, we detail a mechanism that allows saving of :any:`bob.pipelines.Sample`s
during the processing of estimators into the disk.
Very often during the processing of :py:class:`sklearn.pipeline.Pipeline` with big chunks of data is useful to have checkpoints of some steps of the pipeline into the disk.
This is useful for several purposes:
Very often during the processing of :any:`sklearn.pipeline.Pipeline` with big chunks of
data is useful to have checkpoints of some steps of the pipeline into the disk. This is
useful for several purposes:
- Reuse samples that are expensive to be re-computed
- Inspection of algorithms
- Inspection of algorithms
Scikit learn has a caching mechanism that allows the caching of `estimators <https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline>`_ that can be used for such purpose.
Althought useful, such structure is not user friendly.
Scikit-learn has a caching mechanism that allows the caching of `estimators <https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline>`_ that can be used for such purpose.
Although useful, such structure is not user friendly.
As in :ref:`sample`, this can be approached with the :py:class:`bob.pipelines.mixins.CheckpointMixin` mixin, where a new class can be created either dynamically with the :py:func:`bob.pipelines.mixins.mix_me_up` function:
As in :ref:`bob.pipelines.sample`, this can be approached with the :any:`bob.pipelines.mixins.CheckpointMixin` mixin, where a new class can be created either dynamically with the :py:func:`bob.pipelines.mixins.mix_me_up` function:
.. code:: python
......@@ -36,7 +38,7 @@ or explicitly:
Checkpointing a transformer
---------------------------
The code below is a repetition of the example from :ref:`sample`, but now `MyTransformer` is checkpointable once `MyTransformer.transform` is executed.
The code below is a repetition of the example from :ref:`bob.pipelines.sample`, but now `MyTransformer` is checkpointable once `MyTransformer.transform` is executed.
.. literalinclude:: ./python/pipeline_example_boosted_checkpoint.py
:linenos:
......@@ -45,7 +47,7 @@ The code below is a repetition of the example from :ref:`sample`, but now `MyTra
.. warning::
In line 28, samples are created with the keyword argument, `key`. The :py:class:`bob.pipelines.mixins.CheckpointMixin` uses this information for saving.
In line 28, samples are created with the keyword argument, `key`. The :any:`bob.pipelines.mixins.CheckpointMixin` uses this information for saving.
The keyword argument `features_dir` defined in lines 34 and 38 sets the absolute path where those samples will be saved
......
......@@ -6,7 +6,7 @@
Bob Pipelines
===============
Easilly boost your :doc:`Scikit Learn Pipelines <modules/generated/sklearn.pipeline.Pipeline>` with powerfull features, such as:
Easily boost your :doc:`Scikit Learn Pipelines <modules/generated/sklearn.pipeline.Pipeline>` with powerful features, such as:
......@@ -21,7 +21,7 @@ Easilly boost your :doc:`Scikit Learn Pipelines <modules/generated/sklearn.pipel
:align: center
Wrap datapoints with metadata and pass them to the `estimator.fit` and `estimator.transform` methods
.. figure:: img/checkpoint.png
:width: 40%
:align: center
......
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m["offset"] for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Creating X
X = numpy.zeros((2, 2))
# Building an arbitrary pipeline
pipeline = make_pipeline(MyTransformer(), MyTransformer())
X_transformed = pipeline.transform([X])
from bob.pipelines.sample import Sample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples
MyBoostedTransformer = mix_me_up((SampleMixin,), MyTransformer)
# Creating X
X = numpy.zeros((2, 2))
# Wrapping X with Samples
X_as_sample = Sample(X, metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
)
X_transformed = pipeline.transform([X_as_sample])
from bob.pipelines.sample import DelayedSample
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
import pickle
import functools
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples
MyBoostedTransformer = mix_me_up((SampleMixin,), MyTransformer)
# X is stored in the disk
X = open("delayed_sample.pkl", "rb")
# Wrapping X with Samples
X_as_sample = DelayedSample(functools.partial(pickle.load, X), metadata=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
)
X_transformed = pipeline.transform([X_as_sample])
\ No newline at end of file
from bob.pipelines.sample import Sample, SampleSet
from bob.pipelines.mixins import SampleMixin, mix_me_up
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
import numpy
class MyTransformer(TransformerMixin, BaseEstimator):
def transform(self, X, metadata=None):
# Transform `X` with metadata
if metadata is None:
return X
return [x + m for x, m in zip(X, metadata)]
def fit(self, X):
pass
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
# Mixing up MyTransformer with the capabilities of handling Samples
MyBoostedTransformer = mix_me_up((SampleMixin,), MyTransformer)
# Creating X
X1 = numpy.zeros((2, 2))
X2 = numpy.ones((2, 2))
# Wrapping X with Samples
X1_as_sample = Sample(X1, metadata=1)
X2_as_sample = Sample(X2, metadata=1)
X_sample_set = SampleSet([X1_as_sample, X2_as_sample], class_name=1)
# Building an arbitrary pipeline
pipeline = make_pipeline(
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
MyBoostedTransformer(transform_extra_arguments=(("metadata", "metadata"),)),
)
X_transformed = pipeline.transform([X_sample_set])
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment