Commit 65ed7a8b authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Remove stackable preprocessors and extractors

Now that we have standardized on scikit-learn transformers,
scikit-learn alternatives such as FunctionTransformer, Pipeline,
and FeatureUnion should be used
parent 821dd2aa
Pipeline #45318 failed with stage
in 4 minutes and 12 seconds
from .Extractor import Extractor
from .Linearize import Linearize
from .stacks import (SequentialExtractor, ParallelExtractor,
CallableExtractor, MultipleExtractor)
# gets sphinx autodoc done right - don't remove it
......@@ -23,9 +21,5 @@ def __appropriate__(*args):
__appropriate__(
Extractor,
Linearize,
SequentialExtractor,
ParallelExtractor,
CallableExtractor,
MultipleExtractor,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
from bob.extension.processors import SequentialProcessor, ParallelProcessor
from .Extractor import Extractor
from bob.io.base import HDF5File
class MultipleExtractor(Extractor):
"""Base class for SequentialExtractor and ParallelExtractor. This class is
not meant to be used directly."""
@staticmethod
def get_attributes(processors):
requires_training = any(p.requires_training for p in processors)
split_training_data_by_client = any(p.split_training_data_by_client for
p in processors)
min_extractor_file_size = min(p.min_extractor_file_size for p in
processors)
min_feature_file_size = min(p.min_feature_file_size for p in
processors)
return (requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size)
def get_extractor_groups(self):
groups = ['E_{}'.format(i + 1) for i in range(len(self.processors))]
return groups
def train_one(self, e, training_data, extractor_file, apply=False):
"""Trains one extractor and optionally applies the extractor on the
training data after training.
Parameters
----------
e : :any:`Extractor`
The extractor to train. The extractor should be able to save itself
in an opened hdf5 file.
training_data : [object] or [[object]]
The data to be used for training.
extractor_file : :any:`bob.io.base.HDF5File`
The opened hdf5 file to save the trained extractor inside.
apply : :obj:`bool`, optional
If ``True``, the extractor is applied to the training data after it
is trained and the data is returned.
Returns
-------
None or [object] or [[object]]
Returns ``None`` if ``apply`` is ``False``. Otherwise, returns the
transformed ``training_data``.
"""
if not e.requires_training:
# do nothing since e does not require training!
pass
# if any of the extractors require splitting the data, the
# split_training_data_by_client is True.
elif e.split_training_data_by_client:
e.train(training_data, extractor_file)
# when no extractor needs splitting
elif not self.split_training_data_by_client:
e.train(training_data, extractor_file)
# when e here wants it flat but the data is split
else:
# make training_data flat
flat_training_data = [d for datalist in training_data for d in
datalist]
e.train(flat_training_data, extractor_file)
if not apply:
return
# prepare the training data for the next extractor
if self.split_training_data_by_client:
training_data = [[e(d) for d in datalist]
for datalist in training_data]
else:
training_data = [e(d) for d in training_data]
return training_data
def load(self, extractor_file):
if not self.requires_training:
return
with HDF5File(extractor_file) as f:
groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups):
f.cd(group)
e.load(f)
f.cd('..')
class SequentialExtractor(SequentialProcessor, MultipleExtractor):
"""A helper class which takes several extractors and applies them one by
one sequentially.
Attributes
----------
processors : list
A list of extractors to apply.
Examples
--------
You can use this class to apply a chain of extractors on your data. For
example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.extractor import SequentialExtractor, CallableExtractor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> seq_extractor = SequentialExtractor(
... [CallableExtractor(f) for f in
... [np.cast['float64'], lambda x: x / 2, partial(np.mean, axis=1)]])
>>> np.allclose(seq_extractor(raw_data),[ 1., 1.])
True
>>> np.all(seq_extractor(raw_data) ==
... np.mean(np.cast['float64'](raw_data) / 2, axis=1))
True
"""
def __init__(self, processors, **kwargs):
(requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = \
self.get_attributes(processors)
super(SequentialExtractor, self).__init__(
processors=processors,
requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size,
**kwargs)
def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups()
for i, (e, group) in enumerate(zip(self.processors, groups)):
apply = i != len(self.processors) - 1
f.create_group(group)
f.cd(group)
training_data = self.train_one(e, training_data, f,
apply=apply)
f.cd('..')
def read_feature(self, feature_file):
return self.processors[-1].read_feature(feature_file)
def write_feature(self, feature, feature_file):
self.processors[-1].write_feature(feature, feature_file)
class ParallelExtractor(ParallelProcessor, MultipleExtractor):
"""A helper class which takes several extractors and applies them on
each processor separately and yields their outputs one by one.
Attributes
----------
processors : list
A list of extractors to apply.
Examples
--------
You can use this class to apply several extractors on your data and get
all the results back. For example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.extractor import ParallelExtractor, CallableExtractor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> parallel_extractor = ParallelExtractor(
... [CallableExtractor(f) for f in
... [np.cast['float64'], lambda x: x / 2.0]])
>>> np.allclose(list(parallel_extractor(raw_data)),[[[ 1., 2., 3.],[ 1., 2., 3.]], [[ 0.5, 1. , 1.5],[ 0.5, 1. , 1.5]]])
True
The data may be further processed using a :any:`SequentialExtractor`:
>>> from bob.bio.base.extractor import SequentialExtractor
>>> total_extractor = SequentialExtractor(
... [parallel_extractor, CallableExtractor(list),
... CallableExtractor(partial(np.concatenate, axis=1))])
>>> np.allclose(total_extractor(raw_data),[[ 1. , 2. , 3. , 0.5, 1. , 1.5],[ 1. , 2. , 3. , 0.5, 1. , 1.5]])
True
"""
def __init__(self, processors, **kwargs):
(requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = self.get_attributes(
processors)
super(ParallelExtractor, self).__init__(
processors=processors,
requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size,
**kwargs)
def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups):
f.create_group(group)
f.cd(group)
self.train_one(e, training_data, f, apply=False)
f.cd('..')
class CallableExtractor(Extractor):
"""A simple extractor that takes a callable and applies that callable to
the input.
Attributes
----------
callable : object
Anything that is callable. It will be used as an extractor in
bob.bio.base.
read_feature : object
A callable object with the signature of
``feature = read_feature(feature_file)``. If not provided, the default
implementation handles numpy arrays.
write_feature : object
A callable object with the signature of
``write_feature(feature, feature_file)``. If not provided, the default
implementation handles numpy arrays.
Examples
--------
You can take any function like ``numpy.cast['float32']`` to cast your data
to float32 for example. This is useful when you want to stack several
extractors using the :any:`SequentialExtractor` and
:any:`ParallelExtractor` classes.
"""
def __init__(self, callable, write_feature=None, read_feature=None,
**kwargs):
super(CallableExtractor, self).__init__(**kwargs)
self.callable = callable
if write_feature is not None:
self.write_feature = write_feature
if read_feature is not None:
self.read_feature = read_feature
def __call__(self, data):
return self.callable(data)
from .Preprocessor import Preprocessor
from .Filename import Filename
from .stacks import (SequentialPreprocessor,
ParallelPreprocessor, CallablePreprocessor)
# gets sphinx autodoc done right - don't remove it
......@@ -23,8 +21,5 @@ def __appropriate__(*args):
__appropriate__(
Preprocessor,
Filename,
SequentialPreprocessor,
ParallelPreprocessor,
CallablePreprocessor,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
from bob.extension.processors import SequentialProcessor, ParallelProcessor
from .Preprocessor import Preprocessor
class SequentialPreprocessor(SequentialProcessor, Preprocessor):
"""A helper class which takes several preprocessors and applies them one by
one sequentially.
Attributes
----------
processors : list
A list of preprocessors to apply.
Examples
--------
You can use this class to apply a chain of preprocessors on your data. For
example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.preprocessor import SequentialPreprocessor, CallablePreprocessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> seq_preprocessor = SequentialPreprocessor(
... [CallablePreprocessor(f, accepts_annotations=False) for f in
... [np.cast['float64'], lambda x: x / 2, partial(np.mean, axis=1)]])
>>> np.allclose(seq_preprocessor(raw_data), [ 1., 1.])
True
>>> np.all(seq_preprocessor(raw_data) ==
... np.mean(np.cast['float64'](raw_data) / 2, axis=1))
True
"""
def __init__(self, processors, read_original_data=None, **kwargs):
min_preprocessed_file_size = min(
(p.min_preprocessed_file_size for p in processors))
if read_original_data is None:
read_original_data = processors[0].read_original_data
super(SequentialPreprocessor, self).__init__(
processors=processors,
min_preprocessed_file_size=min_preprocessed_file_size,
read_original_data=read_original_data,
**kwargs)
def __call__(self, data, annotations=None):
return super(SequentialPreprocessor, self).__call__(
data, annotations=annotations)
def read_data(self, data_file):
return self.processors[-1].read_data(data_file)
def write_data(self, data, data_file):
self.processors[-1].write_data(data, data_file)
class ParallelPreprocessor(ParallelProcessor, Preprocessor):
"""A helper class which takes several preprocessors and applies them on
each processor separately and yields their outputs one by one.
Attributes
----------
processors : list
A list of preprocessors to apply.
Examples
--------
You can use this class to apply several preprocessors on your data and get
all the results back. For example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.preprocessor import ParallelPreprocessor, CallablePreprocessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> parallel_preprocessor = ParallelPreprocessor(
... [CallablePreprocessor(f, accepts_annotations=False) for f in
... [np.cast['float64'], lambda x: x / 2.0]])
>>> np.allclose(list(parallel_preprocessor(raw_data)),[[[ 1., 2., 3.],[ 1., 2., 3.]], [[ 0.5, 1. , 1.5],[ 0.5, 1. , 1.5]]])
True
The data may be further processed using a :any:`SequentialPreprocessor`:
>>> from bob.bio.base.preprocessor import SequentialPreprocessor
>>> total_preprocessor = SequentialPreprocessor(
... [parallel_preprocessor, CallablePreprocessor(list, False),
... CallablePreprocessor(partial(np.concatenate, axis=1), False)])
>>> np.allclose(total_preprocessor(raw_data),[[ 1. , 2. , 3. , 0.5, 1. , 1.5],[ 1. , 2. , 3. , 0.5, 1. , 1.5]])
True
"""
def __init__(self, processors, **kwargs):
min_preprocessed_file_size = min(p.min_preprocessed_file_size for p in
processors)
super(ParallelPreprocessor, self).__init__(
processors=processors,
min_preprocessed_file_size=min_preprocessed_file_size,
**kwargs)
def __call__(self, data, annotations=None):
return super(ParallelPreprocessor, self).__call__(
data, annotations=annotations)
class CallablePreprocessor(Preprocessor):
"""A simple preprocessor that takes a callable and applies that callable to
the input.
Attributes
----------
accepts_annotations : bool
If False, annotations are not passed to the callable.
callable : object
Anything that is callable. It will be used as a preprocessor in
bob.bio.base.
read_data : object
A callable object with the signature of
``data = read_data(data_file)``. If not provided, the default
implementation handles numpy arrays.
write_data : object
A callable object with the signature of
``write_data(data, data_file)``. If not provided, the default
implementation handles numpy arrays.
Examples
--------
You can take any function like ``numpy.cast['float32']`` to cast your data
to float32 for example. This is useful when you want to stack several
preprocessors using the :any:`SequentialPreprocessor` and
:any:`ParallelPreprocessor` classes.
"""
def __init__(self, callable, accepts_annotations=True, write_data=None,
read_data=None, **kwargs):
super(CallablePreprocessor, self).__init__(
callable=callable, accepts_annotations=accepts_annotations,
**kwargs)
self.callable = callable
self.accepts_annotations = accepts_annotations
if write_data is not None:
self.write_data = write_data
if read_data is not None:
self.read_data = read_data
def __call__(self, data, annotations=None):
if self.accepts_annotations:
return self.callable(data, annotations)
else:
return self.callable(data)
from functools import partial
import numpy as np
import tempfile
from bob.bio.base.preprocessor import (
SequentialPreprocessor, ParallelPreprocessor, CallablePreprocessor)
from bob.bio.base.extractor import (
SequentialExtractor, ParallelExtractor, CallableExtractor)
from bob.bio.base.test.dummy.extractor import extractor as dummy_extractor
DATA = [0, 1, 2, 3, 4]
PROCESSORS = [partial(np.power, 2), np.mean]
SEQ_DATA = PROCESSORS[1](PROCESSORS[0](DATA))
PAR_DATA = (PROCESSORS[0](DATA), PROCESSORS[1](DATA))
def test_preprocessors():
processors = [CallablePreprocessor(p, False) for p in PROCESSORS]
proc = SequentialPreprocessor(processors)
data = proc(DATA, None)
assert np.allclose(data, SEQ_DATA)
proc = ParallelPreprocessor(processors)
data = proc(DATA, None)
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
def test_extractors():
processors = [CallableExtractor(p) for p in PROCESSORS]
proc = SequentialExtractor(processors)
proc.load(None)
data = proc(DATA)
assert np.allclose(data, SEQ_DATA)
proc = ParallelExtractor(processors)
proc.load(None)
data = proc(DATA)
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
def test_sequential_trainable_extractors():
processors = [CallableExtractor(p) for p in PROCESSORS] + [dummy_extractor]
proc = SequentialExtractor(processors)
with tempfile.NamedTemporaryFile(suffix='.hdf5') as f:
proc.train(DATA, f.name)
proc.load(f.name)
data = proc(DATA)
assert np.allclose(data, SEQ_DATA)
def test_parallel_trainable_extractors():
processors = [CallableExtractor(p) for p in PROCESSORS] + [dummy_extractor]
proc = ParallelExtractor(processors)
with tempfile.NamedTemporaryFile(suffix='.hdf5') as f:
proc.train(DATA, f.name)
proc.load(f.name)
data = proc(np.array(DATA))
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment