Commit 705306ae authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

convert mixins to wrappers, move bob.io.base.vstack_features to this package

parent 0750c4df
Pipeline #39199 failed with stage
in 7 minutes and 51 seconds
from . import utils
from .sample import Sample, DelayedSample, SampleSet
from .wrappers import BaseWrapper, DelayedSamplesCall, SampleWrapper, CheckpointWrapper, DaskWrapper, ToDaskBag, wrap, dask_tags
from . import distributed
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
from collections.abc import MutableSequence
"""Base definition of sample"""
from collections.abc import MutableSequence
def _copy_attributes(s, d):
"""Copies attributes from a dictionary to self
......@@ -15,7 +15,43 @@ def _copy_attributes(s, d):
)
class DelayedSample:
class _ReprMixin:
def __repr__(self):
return (
f"{self.__class__.__name__}("
+ ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items())
+ ")"
)
class Sample(_ReprMixin):
"""Representation of sample that is sufficient for the blocks in this module
Each sample must have the following attributes:
* attribute ``data``: Contains the data for this sample
Parameters
----------
data : object
Object representing the data to initialize this sample with.
parent : object
A parent object from which to inherit all other attributes (except
``data``)
"""
def __init__(self, data, parent=None, **kwargs):
self.data = data
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
class DelayedSample(_ReprMixin):
"""Representation of sample that can be loaded via a callable
The optional ``**kwargs`` argument allows you to attach more attributes to
......@@ -54,34 +90,7 @@ class DelayedSample:
return self._data
class Sample:
"""Representation of sample that is sufficient for the blocks in this module
Each sample must have the following attributes:
* attribute ``data``: Contains the data for this sample
Parameters
----------
data : object
Object representing the data to initialize this sample with.
parent : object
A parent object from which to inherit all other attributes (except
``data``)
"""
def __init__(self, data, parent=None, **kwargs):
self.data = data
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
class SampleSet(MutableSequence):
class SampleSet(MutableSequence, _ReprMixin):
"""A set of samples with extra attributes
https://docs.python.org/3/library/collections.abc.html#collections-abstract-base-classes
"""
......
import nose
import numpy as np
import os
import bob.pipelines as skblocks
from tempfile import NamedTemporaryFile
def test_io_vstack():
paths = [1, 2, 3, 4, 5]
def oracle(reader, paths):
return np.vstack([reader(p) for p in paths])
def reader_same_size_C(path):
return np.arange(10).reshape(5, 2)
def reader_different_size_C(path):
return np.arange(2 * path).reshape(path, 2)
def reader_same_size_F(path):
return np.asfortranarray(np.arange(10).reshape(5, 2))
def reader_different_size_F(path):
return np.asfortranarray(np.arange(2 * path).reshape(path, 2))
def reader_same_size_C2(path):
return np.arange(30).reshape(5, 2, 3)
def reader_different_size_C2(path):
return np.arange(6 * path).reshape(path, 2, 3)
def reader_same_size_F2(path):
return np.asfortranarray(np.arange(30).reshape(5, 2, 3))
def reader_different_size_F2(path):
return np.asfortranarray(np.arange(6 * path).reshape(path, 2, 3))
def reader_wrong_size(path):
return np.arange(2 * path).reshape(2, path)
# when same_size is False
for reader in [
reader_different_size_C,
reader_different_size_F,
reader_same_size_C,
reader_same_size_F,
reader_different_size_C2,
reader_different_size_F2,
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(skblocks.utils.vstack_features(reader, paths) == oracle(reader, paths))
# when same_size is True
for reader in [
reader_same_size_C,
reader_same_size_F,
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(
skblocks.utils.vstack_features(reader, paths, True) == oracle(reader, paths)
)
with nose.tools.assert_raises(AssertionError):
skblocks.utils.vstack_features(reader_wrong_size, paths)
# test actual files
suffix = ".npy"
with NamedTemporaryFile(suffix=suffix) as f1, NamedTemporaryFile(
suffix=suffix
) as f2, NamedTemporaryFile(suffix=suffix) as f3:
paths = [f1.name, f2.name, f3.name]
# try different readers:
for reader in [
reader_different_size_C,
reader_different_size_F,
reader_same_size_C,
reader_same_size_F,
reader_different_size_C2,
reader_different_size_F2,
reader_same_size_C2,
reader_same_size_F2,
]:
# save some data in files
for i, path in enumerate(paths):
np.save(path, reader(i + 1), allow_pickle=False)
# test when all data is present
reference = oracle(np.load, paths)
np.all(skblocks.utils.vstack_features(np.load, paths) == reference)
try:
os.remove(paths[0])
# Check if RuntimeError is raised when one of the files is missing
with nose.tools.assert_raises(FileNotFoundError):
skblocks.utils.vstack_features(np.load, paths)
finally:
# create the file back so NamedTemporaryFile does not complain
np.save(paths[0], reader(i + 1))
from .linearize import Linearize, SampleLinearize, CheckpointSampleLinearize
from .pca import CheckpointSamplePCA, SamplePCA
from .function import SampleFunctionTransformer, CheckpointSampleFunctionTransformer, StatelessPipeline
from .dask import ToDaskBag
import dask.bag
from sklearn.base import TransformerMixin, BaseEstimator
class ToDaskBag(TransformerMixin, BaseEstimator):
"""Transform an arbitrary iterator into a :py:class:`dask.bag`
Paramters
---------
npartitions: int
Number of partitions used in :py:meth:`dask.bag.from_sequence`
Example
-------
>>> transformer = DaskBagMixin()
>>> dask_bag = transformer.transform([1,2,3])
>>> dask_bag.map_partitions.....
"""
def __init__(self, npartitions=None, **kwargs):
super().__init__(**kwargs)
self.npartitions = npartitions
def fit(self, X, y=None):
return self
def transform(self, X):
return dask.bag.from_sequence(X, npartitions=self.npartitions)
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from ..mixins import SampleMixin, CheckpointMixin
from ..wrappers import wrap
class SampleFunctionTransformer(SampleMixin, FunctionTransformer):
def SampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer
(https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)
work with :any:`Sample`-based pipelines.
"""
return wrap([FunctionTransformer, "sample"], **kwargs)
class CheckpointSampleFunctionTransformer(
CheckpointMixin, SampleMixin, FunctionTransformer
):
def CheckpointSampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer
(https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)
work with :any:`Sample`-based pipelines.
Furthermore, it makes it checkpointable
"""
return wrap([FunctionTransformer, "sample", "checkpoint"], **kwargs)
class StatelessPipeline(Pipeline):
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from bob.pipelines.mixins import CheckpointMixin, SampleMixin
from sklearn.preprocessing import FunctionTransformer
import numpy as np
from ..wrappers import wrap
def linearize(X):
......@@ -16,14 +11,13 @@ def linearize(X):
class Linearize(FunctionTransformer):
"""Extracts features by simply concatenating all elements of the data into one long vector.
"""
def __init__(self, **kwargs):
super().__init__(func=linearize, **kwargs)
class SampleLinearize(SampleMixin, Linearize):
pass
def SampleLinearize(**kwargs):
return wrap([Linearize, "sample"], **kwargs)
class CheckpointSampleLinearize(CheckpointMixin, SampleMixin, Linearize):
pass
def CheckpointSampleLinearize(**kwargs):
return wrap([Linearize, "sample", "checkpoint"], **kwargs)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from bob.pipelines.mixins import CheckpointMixin, SampleMixin
from sklearn.decomposition import PCA
from ..wrappers import wrap
class SamplePCA(SampleMixin, PCA):
def SamplePCA(**kwargs):
"""
Enables SAMPLE handling for https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
Enables SAMPLE handling for :any:`sklearn.decomposition.PCA`
"""
return wrap([PCA, "sample"], **kwargs)
pass
class CheckpointSamplePCA(CheckpointMixin, SampleMixin, PCA):
def CheckpointSamplePCA(**kwargs):
"""
Enables SAMPLE and CHECKPOINTIN handling for https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
Enables SAMPLE and CHECKPOINTIN handling for :any:`sklearn.decomposition.PCA`
"""
pass
return wrap([PCA, "sample", "checkpoint"], **kwargs)
......@@ -2,6 +2,54 @@ import pickle
import nose
import numpy as np
class NonPicklableMixin:
"""Class that wraps objects that are not picklable
Example
-------
>>> from bob.pipelines.processor import NonPicklableMixin
>>> wrapper = NonPicklableMixin(my_non_picklable_class_callable)
Example
-------
>>> from bob.pipelines.processor import NonPicklableMixin
>>> import functools
>>> wrapper = NonPicklableMixin(functools.partial(MyNonPicklableClass, arg1, arg2))
Parameters
----------
callable: callable
Calleble function that instantiates the non-pickalbe function
"""
def __init__(self, callable, **kwargs):
super().__init__(**kwargs)
self.callable = callable
self._instance = None
@property
def instance(self):
if self._instance is None:
self._instance = self.callable()
return self._instance
def __getstate__(self):
d = dict(self.__dict__)
d.pop("_instance")
d["_NonPicklableMixin_instance_was_None"] = self._instance is None
return d
def __setstate__(self, d):
instance_was_None = d.pop("_NonPicklableMixin_instance_was_None")
self.__dict__ = d
self._instance = None
if not instance_was_None:
# access self.instance to create the instance
self.instance
def is_picklable(obj):
"""
Test if an object is picklable or not
......@@ -32,3 +80,153 @@ def assert_picklable(obj):
np.testing.assert_equal(v, new_obj[k])
else:
nose.tools.assert_equal(v, new_obj[k])
def is_estimator_stateless(estimator):
if not hasattr(estimator, "_get_tags"):
raise ValueError(
f"Passed estimator: {estimator} does not have the _get_tags method."
)
# See: https://scikit-learn.org/stable/developers/develop.html
# if the estimator does not require fit or is stateless don't call fit
tags = estimator._get_tags()
if tags["stateless"] or not tags["requires_fit"]:
return True
return False
def _generate_features(reader, paths, same_size=False):
"""Load and stack features in a memory efficient way. This function is meant
to be used inside :py:func:`vstack_features`.
Parameters
----------
reader : ``collections.Callable``
See the documentation of :py:func:`vstack_features`.
paths : ``collections.Iterable``
See the documentation of :py:func:`vstack_features`.
same_size : :obj:`bool`, optional
See the documentation of :py:func:`vstack_features`.
Yields
------
object
The first object returned is a tuple of :py:class:`numpy.dtype` of
features and the shape of the first feature. The rest of objects are
the actual values in features. The features are returned in C order.
"""
shape_determined = False
for i, path in enumerate(paths):
feature = np.atleast_2d(reader(path))
feature = np.ascontiguousarray(feature)
if not shape_determined:
shape_determined = True
dtype = feature.dtype
shape = list(feature.shape)
yield (dtype, shape)
else:
# make sure all features have the same shape and dtype
if same_size:
assert shape == list(feature.shape)
else:
assert shape[1:] == list(feature.shape[1:])
assert dtype == feature.dtype
for value in feature.flat:
yield value
def vstack_features(reader, paths, same_size=False):
"""Stacks all features in a memory efficient way.
Parameters
----------
reader : ``collections.Callable``
The function to load the features. The function should only take one
argument ``path`` and return loaded features. Use :any:`functools.partial`
to accommodate your reader to this format.
The features returned by ``reader`` are expected to have the same
:py:class:`numpy.dtype` and the same shape except for their first
dimension. First dimension should correspond to the number of samples.
paths : ``collections.Iterable``
An iterable of paths to iterate on. Whatever is inside path is given to
``reader`` so they do not need to be necessarily paths to actual files.
If ``same_size`` is ``True``, ``len(paths)`` must be valid.
same_size : :obj:`bool`, optional
If ``True``, it assumes that arrays inside all the paths are the same
shape. If you know the features are the same size in all paths, set this
to ``True`` to improve the performance.
Returns
-------
numpy.ndarray
The read features with the shape ``(n_samples, *features_shape[1:])``.
Examples
--------
This function in a simple way is equivalent to calling
``numpy.vstack(reader(p) for p in paths)``.
>>> import numpy
>>> from bob.io.base import vstack_features
>>> def reader(path):
... # in each file, there are 5 samples and features are 2 dimensional.
... return numpy.arange(10).reshape(5,2)
>>> paths = ['path1', 'path2']
>>> all_features = vstack_features(reader, paths)
>>> numpy.allclose(all_features, numpy.array(
... [[0, 1],
... [2, 3],
... [4, 5],
... [6, 7],
... [8, 9],
... [0, 1],
... [2, 3],
... [4, 5],
... [6, 7],
... [8, 9]]))
True
>>> all_features_with_more_memory = numpy.vstack(reader(p) for p in paths)
>>> numpy.allclose(all_features, all_features_with_more_memory)
True
You can allocate the array at once to improve the performance if you know
that all features in paths have the same shape and you know the total number
of the paths:
>>> all_features = vstack_features(reader, paths, same_size=True)
>>> numpy.allclose(all_features, numpy.array(
... [[0, 1],
... [2, 3],
... [4, 5],
... [6, 7],
... [8, 9],
... [0, 1],
... [2, 3],
... [4, 5],
... [6, 7],
... [8, 9]]))
True
.. note::
This function runs very slowly. Only use it when RAM is precious.
"""
iterable = _generate_features(reader, paths, same_size)
dtype, shape = next(iterable)
if same_size:
total_size = int(len(paths) * np.prod(shape))
all_features = np.fromiter(iterable, dtype, total_size)
else:
all_features = np.fromiter(iterable, dtype)
# the shape is assumed to be (n_samples, ...) it can be (5, 2) or (5, 3, 4).
shape = list(shape)
shape[0] = -1
return np.reshape(all_features, shape, order="C")
def samples_to_np_array(samples, same_size=True):
return vstack_features(lambda s: s.data, samples, same_size=same_size)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment