Commit 8d7a645c authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Merge branch 'processors' into 'master'

Add sequential and parallel processors, pre-processors, and extractors

Closes #95

See merge request !102
parents e777d420 afd5dcc3
Pipeline #13461 passed with stages
in 30 minutes and 52 seconds
from .Extractor import Extractor
from .Linearize import Linearize
from .stacks import (SequentialExtractor, ParallelExtractor,
CallableExtractor, MultipleExtractor)
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is shortened.
Parameters:
Fixing sphinx warnings of not being able to find classes, when path is
shortened. Parameters:
*args: An iterable of objects to modify
......@@ -13,10 +16,16 @@ def __appropriate__(*args):
<https://github.com/sphinx-doc/sphinx/issues/3048>`
"""
for obj in args: obj.__module__ = __name__
for obj in args:
obj.__module__ = __name__
__appropriate__(
Extractor,
Linearize,
)
SequentialExtractor,
ParallelExtractor,
CallableExtractor,
MultipleExtractor,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
from ..utils.processors import SequentialProcessor, ParallelProcessor
from .Extractor import Extractor
from bob.io.base import HDF5File
class MultipleExtractor(Extractor):
"""Base class for SequentialExtractor and ParallelExtractor. This class is
not meant to be used directly."""
def get_attributes(self, processors):
requires_training = any(p.requires_training for p in processors)
split_training_data_by_client = any(p.split_training_data_by_client for
p in processors)
min_extractor_file_size = min(p.min_extractor_file_size for p in
processors)
min_feature_file_size = min(
p.min_feature_file_size for p in processors)
return (requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size)
def get_extractor_groups(self):
groups = ['E_{}'.format(i + 1) for i in range(len(self.processors))]
return groups
def train_one(self, e, training_data, extractor_file, apply=False):
if not e.requires_training:
return
# if any of the extractors require splitting the data, the
# split_training_data_by_client is True.
if e.split_training_data_by_client:
e.train(training_data, extractor_file)
if not apply:
return
training_data = [[e(d) for d in datalist]
for datalist in training_data]
# when no extractor needs splitting
elif not self.split_training_data_by_client:
e.train(training_data, extractor_file)
if not apply:
return
training_data = [e(d) for d in training_data]
# when e here wants it flat but the data is split
else:
# make training_data flat
aligned_training_data = [d for datalist in training_data for d in
datalist]
e.train(aligned_training_data, extractor_file)
if not apply:
return
training_data = [[e(d) for d in datalist]
for datalist in training_data]
return training_data
def load(self, extractor_file):
with HDF5File(extractor_file) as f:
groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups):
f.cd(group)
e.load(f)
f.cd('..')
class SequentialExtractor(SequentialProcessor, MultipleExtractor):
"""A helper class which takes several extractors and applies them one by
one sequentially.
Attributes
----------
processors : list
A list of extractors to apply.
Examples
--------
You can use this class to apply a chain of extractors on your data. For
example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.extractor import SequentialExtractor, CallableExtractor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> seq_extractor = SequentialExtractor(
... [CallableExtractor(f) for f in
... [np.cast['float64'], lambda x: x / 2, partial(np.mean, axis=1)]])
>>> seq_extractor(raw_data)
array([ 1., 1.])
>>> np.all(seq_extractor(raw_data) ==
... np.mean(np.cast['float64'](raw_data) / 2, axis=1))
True
"""
def __init__(self, processors):
(requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = \
self.get_attributes(processors)
super(SequentialExtractor, self).__init__(
processors=processors,
requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size)
def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups):
f.create_group(group)
f.cd(group)
training_data = self.train_one(e, training_data, f, apply=True)
f.cd('..')
def read_feature(self, feature_file):
return self.processors[-1].read_feature(feature_file)
def write_feature(self, feature, feature_file):
self.processors[-1].write_feature(feature, feature_file)
class ParallelExtractor(ParallelProcessor, MultipleExtractor):
"""A helper class which takes several extractors and applies them on
each processor separately and yields their outputs one by one.
Attributes
----------
processors : list
A list of extractors to apply.
Examples
--------
You can use this class to apply several extractors on your data and get
all the results back. For example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.extractor import ParallelExtractor, CallableExtractor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> parallel_extractor = ParallelExtractor(
... [CallableExtractor(f) for f in
... [np.cast['float64'], lambda x: x / 2.0]])
>>> list(parallel_extractor(raw_data))
[array([[ 1., 2., 3.],
[ 1., 2., 3.]]), array([[ 0.5, 1. , 1.5],
[ 0.5, 1. , 1.5]])]
The data may be further processed using a :any:`SequentialProcessor`:
>>> from bob.bio.base.extractor import SequentialExtractor
>>> total_extractor = SequentialExtractor(
... [parallel_extractor, CallableExtractor(list),
... CallableExtractor(partial(np.concatenate, axis=1))])
>>> total_extractor(raw_data)
array([[ 1. , 2. , 3. , 0.5, 1. , 1.5],
[ 1. , 2. , 3. , 0.5, 1. , 1.5]])
"""
def __init__(self, processors):
(requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = self.get_attributes(
processors)
super(ParallelExtractor, self).__init__(
processors=processors,
requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size)
def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups):
f.create_group(group)
f.cd(group)
self.train_one(e, training_data, f, apply=False)
f.cd('..')
class CallableExtractor(Extractor):
"""A simple extractor that takes a callable and applies that callable to
the input.
Attributes
----------
callable : object
Anything that is callable. It will be used as an extractor in
bob.bio.base.
read_feature : object
A callable object with the signature of
``feature = read_feature(feature_file)``. If not provided, the default
implementation handles numpy arrays.
write_feature : object
A callable object with the signature of
``write_feature(feature, feature_file)``. If not provided, the default
implementation handles numpy arrays.
Examples
--------
You can take any function like ``numpy.cast['float32']`` to cast your data
to float32 for example. This is useful when you want to stack several
extractors using the :any:`SequentialExtractor` and
:any:`ParallelExtractor` classes.
"""
def __init__(self, callable, write_feature=None, read_feature=None,
**kwargs):
super(CallableExtractor, self).__init__(**kwargs)
self.callable = callable
if write_feature is not None:
self.write_feature = write_feature
if read_feature is not None:
self.read_feature = read_feature
def __call__(self, data):
return self.callable(data)
from .Preprocessor import Preprocessor
from .Filename import Filename
from .stacks import (SequentialPreprocessor,
ParallelPreprocessor, CallablePreprocessor)
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is shortened.
Parameters:
Fixing sphinx warnings of not being able to find classes, when path is
shortened. Parameters:
*args: An iterable of objects to modify
......@@ -13,10 +16,15 @@ def __appropriate__(*args):
<https://github.com/sphinx-doc/sphinx/issues/3048>`
"""
for obj in args: obj.__module__ = __name__
for obj in args:
obj.__module__ = __name__
__appropriate__(
Preprocessor,
Filename,
)
SequentialPreprocessor,
ParallelPreprocessor,
CallablePreprocessor,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
from ..utils.processors import SequentialProcessor, ParallelProcessor
from .Preprocessor import Preprocessor
class SequentialPreprocessor(SequentialProcessor, Preprocessor):
"""A helper class which takes several preprocessors and applies them one by
one sequentially.
Attributes
----------
processors : list
A list of preprocessors to apply.
Examples
--------
You can use this class to apply a chain of preprocessors on your data. For
example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.preprocessor import SequentialPreprocessor, CallablePreprocessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> seq_preprocessor = SequentialPreprocessor(
... [CallablePreprocessor(f, accepts_annotations=False) for f in
... [np.cast['float64'], lambda x: x / 2, partial(np.mean, axis=1)]])
>>> seq_preprocessor(raw_data)
array([ 1., 1.])
>>> np.all(seq_preprocessor(raw_data) ==
... np.mean(np.cast['float64'](raw_data) / 2, axis=1))
True
"""
def __init__(self, processors, read_original_data=None, **kwargs):
min_preprocessed_file_size = min(
(p.min_preprocessed_file_size for p in processors))
if read_original_data is None:
read_original_data = processors[0].read_original_data
super(SequentialPreprocessor, self).__init__(
processors=processors,
min_preprocessed_file_size=min_preprocessed_file_size,
read_original_data=read_original_data,
**kwargs)
def __call__(self, data, annotations=None):
return super(SequentialPreprocessor, self).__call__(
data, annotations=annotations)
def read_data(self, data_file):
return self.processors[-1].read_data(data_file)
def write_data(self, data, data_file):
self.processors[-1].write_data(data, data_file)
class ParallelPreprocessor(ParallelProcessor, Preprocessor):
"""A helper class which takes several preprocessors and applies them on
each processor separately and yields their outputs one by one.
Attributes
----------
processors : list
A list of preprocessors to apply.
Examples
--------
You can use this class to apply several preprocessors on your data and get
all the results back. For example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.preprocessor import ParallelPreprocessor, CallablePreprocessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> parallel_preprocessor = ParallelPreprocessor(
... [CallablePreprocessor(f, accepts_annotations=False) for f in
... [np.cast['float64'], lambda x: x / 2.0]])
>>> list(parallel_preprocessor(raw_data))
[array([[ 1., 2., 3.],
[ 1., 2., 3.]]), array([[ 0.5, 1. , 1.5],
[ 0.5, 1. , 1.5]])]
The data may be further processed using a :any:`SequentialProcessor`:
>>> from bob.bio.base.preprocessor import SequentialPreprocessor
>>> total_preprocessor = SequentialPreprocessor(
... [parallel_preprocessor, CallablePreprocessor(list, False),
... CallablePreprocessor(partial(np.concatenate, axis=1), False)])
>>> total_preprocessor(raw_data)
array([[ 1. , 2. , 3. , 0.5, 1. , 1.5],
[ 1. , 2. , 3. , 0.5, 1. , 1.5]])
"""
def __init__(self, processors, **kwargs):
min_preprocessed_file_size = min(p.min_preprocessed_file_size for p in
processors)
super(ParallelPreprocessor, self).__init__(
processors=processors,
min_preprocessed_file_size=min_preprocessed_file_size,
**kwargs)
def __call__(self, data, annotations=None):
return super(ParallelPreprocessor, self).__call__(
data, annotations=annotations)
class CallablePreprocessor(Preprocessor):
"""A simple preprocessor that takes a callable and applies that callable to
the input.
Attributes
----------
accepts_annotations : bool
If False, annotations are not passed to the callable.
callable : object
Anything that is callable. It will be used as a preprocessor in
bob.bio.base.
read_data : object
A callable object with the signature of
``data = read_data(data_file)``. If not provided, the default
implementation handles numpy arrays.
write_data : object
A callable object with the signature of
``write_data(data, data_file)``. If not provided, the default
implementation handles numpy arrays.
Examples
--------
You can take any function like ``numpy.cast['float32']`` to cast your data
to float32 for example. This is useful when you want to stack several
preprocessors using the :any:`SequentialPreprocessor` and
:any:`ParallelPreprocessor` classes.
"""
def __init__(self, callable, accepts_annotations=True, write_data=None,
read_data=None, **kwargs):
super(CallablePreprocessor, self).__init__(
callable=callable, accepts_annotations=accepts_annotations,
**kwargs)
self.callable = callable
self.accepts_annotations = accepts_annotations
if write_data is not None:
self.write_data = write_data
if read_data is not None:
self.read_data = read_data
def __call__(self, data, annotations=None):
if self.accepts_annotations:
return self.callable(data, annotations)
else:
return self.callable(data)
from functools import partial
import numpy as np
from bob.bio.base.utils.processors import (
SequentialProcessor, ParallelProcessor)
from bob.bio.base.preprocessor import (
SequentialPreprocessor, ParallelPreprocessor, CallablePreprocessor)
from bob.bio.base.extractor import (
SequentialExtractor, ParallelExtractor, CallableExtractor)
DATA = [0, 1, 2, 3, 4]
PROCESSORS = [partial(np.power, 2), np.mean]
SEQ_DATA = PROCESSORS[1](PROCESSORS[0](DATA))
PAR_DATA = (PROCESSORS[0](DATA), PROCESSORS[1](DATA))
def test_processors():
proc = SequentialProcessor(PROCESSORS)
data = proc(DATA)
assert np.allclose(data, SEQ_DATA)
proc = ParallelProcessor(PROCESSORS)
data = proc(DATA)
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
def test_preprocessors():
processors = [CallablePreprocessor(p, False) for p in PROCESSORS]
proc = SequentialPreprocessor(processors)
data = proc(DATA, None)
assert np.allclose(data, SEQ_DATA)
proc = ParallelPreprocessor(processors)
data = proc(DATA, None)
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
def test_extractors():
processors = [CallableExtractor(p) for p in PROCESSORS]
proc = SequentialExtractor(processors)
data = proc(DATA)
assert np.allclose(data, SEQ_DATA)
proc = ParallelExtractor(processors)
data = proc(DATA)
assert all(np.allclose(x1, x2) for x1, x2 in zip(data, PAR_DATA))
......@@ -6,6 +6,7 @@
from .resources import *
from .io import *
from .singleton import *
from . import processors
import numpy
......
class SequentialProcessor(object):
"""A helper class which takes several processors and applies them one by
one sequentially.
Attributes
----------
processors : list
A list of processors to apply.
Examples
--------
You can use this class to apply a chain of processes on your data. For
example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.utils.processors import SequentialProcessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> seq_processor = SequentialProcessor(
... [np.cast['float64'], lambda x: x / 2, partial(np.mean, axis=1)])
>>> seq_processor(raw_data)
array([ 1., 1.])
>>> np.all(seq_processor(raw_data) ==
... np.mean(np.cast['float64'](raw_data) / 2, axis=1))
True
"""
def __init__(self, processors, **kwargs):
super(SequentialProcessor, self).__init__()
self.processors = processors
def __call__(self, data, **kwargs):
"""Applies the processors on the data sequentially. The output of the
first one goes as input to the next one.
Parameters
----------
data : object
The data that needs to be processed.
**kwargs
Any kwargs are passed to the processors.
Returns
-------
object
The processed data.
"""
for processor in self.processors:
data = processor(data, **kwargs)
return data
class ParallelProcessor(object):
"""A helper class which takes several processors and applies them on each
processor separately and yields their outputs one by one.
Attributes
----------
processors : list
A list of processors to apply.
Examples
--------
You can use this class to apply several processes on your data and get all
the results back. For example:
>>> import numpy as np
>>> from functools import partial
>>> from bob.bio.base.utils.processors import ParallelProcessor
>>> raw_data = np.array([[1, 2, 3], [1, 2, 3]])
>>> parallel_processor = ParallelProcessor(
... [np.cast['float64'], lambda x: x / 2.0])
>>> list(parallel_processor(raw_data))
[array([[ 1., 2., 3.],
[ 1., 2., 3.]]), array([[ 0.5, 1. , 1.5],
[ 0.5, 1. , 1.5]])]
The data may be further processed using a :any:`SequentialProcessor`:
>>> from bob.bio.base.utils.processors import SequentialProcessor
>>> total_processor = SequentialProcessor(
... [parallel_processor, list, partial(np.concatenate, axis=1)])
>>> total_processor(raw_data)
array([[ 1. , 2. , 3. , 0.5, 1. , 1.5],
[ 1. , 2. , 3. , 0.5, 1. , 1.5]])
"""
def __init__(self, processors, **kwargs):
super(ParallelProcessor, self).__init__()
self.processors = processors
def __call__(self, data, **kwargs):
"""Applies the processors on the data independently and outputs a
generator of their outputs.
Parameters
----------
data : object
The data that needs to be processed.
**kwargs
Any kwargs are passed to the processors.
Yields
------
object
The processed data from processors one by one.
"""
for processor in self.processors:
yield processor(data, **kwargs)
python
numpy
bob.bio.face
bob.bio.speaker
bob.bio.gmm
bob.bio.video
bob.bio.csu
bob.bio.spear
bob.learn.linear
gridtk
bob.db.youtube
\ No newline at end of file