Skip to content
Snippets Groups Projects
Commit 4151b396 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

use 4 spaces. Implement IO for sequential preprocessors and extractors

parent 8b031ce9
No related branches found
No related tags found
1 merge request!102Add sequential and parallel processors, pre-processors, and extractors
Pipeline #
...@@ -4,133 +4,141 @@ from bob.io.base import HDF5File ...@@ -4,133 +4,141 @@ from bob.io.base import HDF5File
class MultipleExtractor(Extractor): class MultipleExtractor(Extractor):
"""Base class for SequentialExtractor and ParallelExtractor. This class is """Base class for SequentialExtractor and ParallelExtractor. This class is
not meant to be used directly.""" not meant to be used directly."""
def get_attributes(self, processors): def get_attributes(self, processors):
requires_training = any(p.requires_training for p in processors) requires_training = any(p.requires_training for p in processors)
split_training_data_by_client = any(p.split_training_data_by_client for p split_training_data_by_client = any(p.split_training_data_by_client for
in processors) p in processors)
min_extractor_file_size = min(p.min_extractor_file_size for p in min_extractor_file_size = min(p.min_extractor_file_size for p in
processors) processors)
min_feature_file_size = min(p.min_feature_file_size for p in processors) min_feature_file_size = min(
return (requires_training, split_training_data_by_client, p.min_feature_file_size for p in processors)
min_extractor_file_size, min_feature_file_size) return (requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size)
def get_extractor_groups(self):
groups = ['E_{}'.format(i + 1) for i in range(len(self.processors))] def get_extractor_groups(self):
return groups groups = ['E_{}'.format(i + 1) for i in range(len(self.processors))]
return groups
def train_one(self, e, training_data, extractor_file, apply=False):
if not e.requires_training: def train_one(self, e, training_data, extractor_file, apply=False):
return if not e.requires_training:
# if any of the extractors require splitting the data, the return
# split_training_data_by_client is True. # if any of the extractors require splitting the data, the
if e.split_training_data_by_client: # split_training_data_by_client is True.
e.train(training_data, extractor_file) if e.split_training_data_by_client:
if not apply: e.train(training_data, extractor_file)
return if not apply:
training_data = [[e(d) for d in datalist] for datalist in training_data] return
# when no extractor needs splitting training_data = [[e(d) for d in datalist]
elif not self.split_training_data_by_client: for datalist in training_data]
e.train(training_data, extractor_file) # when no extractor needs splitting
if not apply: elif not self.split_training_data_by_client:
return e.train(training_data, extractor_file)
training_data = [e(d) for d in training_data] if not apply:
# when e here wants it flat but the data is split return
else: training_data = [e(d) for d in training_data]
# make training_data flat # when e here wants it flat but the data is split
training_data_len = [len(datalist) for datalist in training_data] else:
training_data = [d for datalist in training_data for d in datalist] # make training_data flat
e.train(training_data, extractor_file) training_data_len = [len(datalist) for datalist in training_data]
if not apply: training_data = [d for datalist in training_data for d in datalist]
return e.train(training_data, extractor_file)
# split training data if not apply:
new_training_data, i = [], 0 return
for length in training_data_len: # split training data
class_data = [] new_training_data, i = [], 0
for _ in range(length): for length in training_data_len:
class_data.append(e(training_data[i])) class_data = []
i += 1 for _ in range(length):
new_training_data.append(class_data) class_data.append(e(training_data[i]))
training_data = new_training_data i += 1
return training_data new_training_data.append(class_data)
training_data = new_training_data
def load(self, extractor_file): return training_data
with HDF5File(extractor_file) as f:
groups = self.get_extractor_groups() def load(self, extractor_file):
for e, group in zip(self.processors, groups): with HDF5File(extractor_file) as f:
f.cd(group) groups = self.get_extractor_groups()
e.load(f) for e, group in zip(self.processors, groups):
f.cd('..') f.cd(group)
e.load(f)
f.cd('..')
class SequentialExtractor(SequentialProcessor, MultipleExtractor): class SequentialExtractor(SequentialProcessor, MultipleExtractor):
__doc__ = SequentialProcessor.__doc__ __doc__ = SequentialProcessor.__doc__
def __init__(self, processors): def __init__(self, processors):
(requires_training, split_training_data_by_client, (requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = \ min_extractor_file_size, min_feature_file_size) = \
self.get_attributes(processors) self.get_attributes(processors)
super(SequentialExtractor, self).__init__( super(SequentialExtractor, self).__init__(
processors=processors, processors=processors,
requires_training=requires_training, requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client, split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size, min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size) min_feature_file_size=min_feature_file_size)
def train(self, training_data, extractor_file): def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f: with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups() groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups): for e, group in zip(self.processors, groups):
f.create_group(group) f.create_group(group)
f.cd(group) f.cd(group)
training_data = self.train_one(e, training_data, f, apply=True) training_data = self.train_one(e, training_data, f, apply=True)
f.cd('..') f.cd('..')
def read_feature(self, feature_file):
return self.processors[-1].read_feature(feature_file)
def write_feature(self, feature, feature_file):
return self.processors[-1].write_feature(feature, feature_file)
class ParallelExtractor(ParallelProcessor, MultipleExtractor): class ParallelExtractor(ParallelProcessor, MultipleExtractor):
__doc__ = ParallelProcessor.__doc__ __doc__ = ParallelProcessor.__doc__
def __init__(self, processors): def __init__(self, processors):
(requires_training, split_training_data_by_client, (requires_training, split_training_data_by_client,
min_extractor_file_size, min_feature_file_size) = self.get_attributes( min_extractor_file_size, min_feature_file_size) = self.get_attributes(
processors) processors)
super(ParallelExtractor, self).__init__( super(ParallelExtractor, self).__init__(
processors=processors, processors=processors,
requires_training=requires_training, requires_training=requires_training,
split_training_data_by_client=split_training_data_by_client, split_training_data_by_client=split_training_data_by_client,
min_extractor_file_size=min_extractor_file_size, min_extractor_file_size=min_extractor_file_size,
min_feature_file_size=min_feature_file_size) min_feature_file_size=min_feature_file_size)
def train(self, training_data, extractor_file): def train(self, training_data, extractor_file):
with HDF5File(extractor_file, 'w') as f: with HDF5File(extractor_file, 'w') as f:
groups = self.get_extractor_groups() groups = self.get_extractor_groups()
for e, group in zip(self.processors, groups): for e, group in zip(self.processors, groups):
f.create_group(group) f.create_group(group)
f.cd(group) f.cd(group)
self.train_one(e, training_data, f, apply=False) self.train_one(e, training_data, f, apply=False)
f.cd('..') f.cd('..')
class CallableExtractor(Extractor): class CallableExtractor(Extractor):
"""A simple extractor that takes a callable and applies that callable to the """A simple extractor that takes a callable and applies that callable to
input. the input.
Attributes Attributes
---------- ----------
callable : object callable : object
Anything that is callable. It will be used as an extractor in Anything that is callable. It will be used as an extractor in
bob.bio.base. bob.bio.base.
""" """
def __init__(self, callable, **kwargs): def __init__(self, callable, **kwargs):
super(CallableExtractor, self).__init__(**kwargs) super(CallableExtractor, self).__init__(**kwargs)
self.callable = callable self.callable = callable
def __call__(self, data): def __call__(self, data):
return self.callable(data) return self.callable(data)
...@@ -3,62 +3,71 @@ from .Preprocessor import Preprocessor ...@@ -3,62 +3,71 @@ from .Preprocessor import Preprocessor
class SequentialPreprocessor(SequentialProcessor, Preprocessor): class SequentialPreprocessor(SequentialProcessor, Preprocessor):
__doc__ = SequentialProcessor.__doc__ __doc__ = SequentialProcessor.__doc__
def __init__(self, processors, **kwargs): def __init__(self, processors, **kwargs):
min_preprocessed_file_size = 1000 min_preprocessed_file_size = 1000
try: try:
min_preprocessed_file_size = min( min_preprocessed_file_size = min(
(p.min_preprocessed_file_size for p in processors)) (p.min_preprocessed_file_size for p in processors))
except AttributeError: except AttributeError:
pass pass
SequentialProcessor.__init__(self, processors) SequentialProcessor.__init__(self, processors)
Preprocessor.__init__( Preprocessor.__init__(
self, min_preprocessed_file_size=min_preprocessed_file_size, **kwargs) self, min_preprocessed_file_size=min_preprocessed_file_size,
**kwargs)
def __call__(self, data, annotations): def __call__(self, data, annotations):
return super(SequentialPreprocessor, self).__call__( return super(SequentialPreprocessor, self).__call__(
data, annotations=annotations) data, annotations=annotations)
def read_data(self, data_file):
return self.processors[-1].read_data(data_file)
def write_data(self, data, data_file):
return self.processors[-1].write_data(data, data_file)
class ParallelPreprocessor(ParallelProcessor, Preprocessor): class ParallelPreprocessor(ParallelProcessor, Preprocessor):
__doc__ = ParallelProcessor.__doc__ __doc__ = ParallelProcessor.__doc__
def __init__(self, processors, **kwargs): def __init__(self, processors, **kwargs):
min_preprocessed_file_size = min(p.min_preprocessed_file_size for p in min_preprocessed_file_size = min(p.min_preprocessed_file_size for p in
processors) processors)
ParallelProcessor.__init__(self, processors) ParallelProcessor.__init__(self, processors)
Preprocessor.__init__( Preprocessor.__init__(
self, min_preprocessed_file_size=min_preprocessed_file_size, **kwargs) self, min_preprocessed_file_size=min_preprocessed_file_size,
**kwargs)
def __call__(self, data, annotations): def __call__(self, data, annotations):
return super(ParallelPreprocessor, self).__call__( return super(ParallelPreprocessor, self).__call__(
data, annotations=annotations) data, annotations=annotations)
class CallablePreprocessor(Preprocessor): class CallablePreprocessor(Preprocessor):
"""A simple preprocessor that takes a callable and applies that callable to """A simple preprocessor that takes a callable and applies that callable to
the input. the input.
Attributes Attributes
---------- ----------
accepts_annotations : bool accepts_annotations : bool
If False, annotations are not passed to the callable. If False, annotations are not passed to the callable.
callable : object callable : object
Anything that is callable. It will be used as a preprocessor in Anything that is callable. It will be used as a preprocessor in
bob.bio.base. bob.bio.base.
""" """
def __init__(self, callable, accepts_annotations=True, **kwargs): def __init__(self, callable, accepts_annotations=True, **kwargs):
super(CallablePreprocessor, self).__init__( super(CallablePreprocessor, self).__init__(
callable=callable, accepts_annotations=accepts_annotations, **kwargs) callable=callable, accepts_annotations=accepts_annotations,
self.callable = callable **kwargs)
self.accepts_annotations = accepts_annotations self.callable = callable
self.accepts_annotations = accepts_annotations
def __call__(self, data, annotations):
if self.accepts_annotations: def __call__(self, data, annotations):
return self.callable(data, annotations) if self.accepts_annotations:
else: return self.callable(data, annotations)
return self.callable(data) else:
return self.callable(data)
...@@ -2,77 +2,77 @@ import numpy ...@@ -2,77 +2,77 @@ import numpy
class SequentialProcessor(object): class SequentialProcessor(object):
"""A helper class which takes several processors and applies them one by one """A helper class which takes several processors and applies them one by
sequentially one sequentially
Attributes Attributes
---------- ----------
processors : list processors : list
A list of processors to apply. A list of processors to apply.
""" """
def __init__(self, processors, **kwargs): def __init__(self, processors, **kwargs):
super(SequentialProcessor, self).__init__() super(SequentialProcessor, self).__init__()
self.processors = processors self.processors = processors
def __call__(self, data, **kwargs): def __call__(self, data, **kwargs):
"""Applies the processors on the data sequentially. The output of the first """Applies the processors on the data sequentially. The output of the
one goes as input to the next one. first one goes as input to the next one.
Parameters Parameters
---------- ----------
data : object data : object
The data that needs to be processed. The data that needs to be processed.
**kwargs **kwargs
Any kwargs are passed to the processors. Any kwargs are passed to the processors.
Returns Returns
------- -------
object object
The processed data. The processed data.
""" """
for processor in self.processors: for processor in self.processors:
data = processor(data, **kwargs) data = processor(data, **kwargs)
return data return data
class ParallelProcessor(object): class ParallelProcessor(object):
"""A helper class which takes several processors and applies them on each """A helper class which takes several processors and applies them on each
processor separately and outputs a list of their outputs in the end. processor separately and outputs a list of their outputs in the end.
Attributes Attributes
---------- ----------
processors : list processors : list
A list of processors to apply. A list of processors to apply.
stack : bool stack : bool
If True (default), :any:`numpy.hstack` is called on the list of outputs. If True (default), :any:`numpy.hstack` is called on the list of outputs
""" """
def __init__(self, processors, stack=True, **kwargs): def __init__(self, processors, stack=True, **kwargs):
super(ParallelProcessor, self).__init__() super(ParallelProcessor, self).__init__()
self.processors = processors self.processors = processors
self.stack = stack self.stack = stack
def __call__(self, data, **kwargs): def __call__(self, data, **kwargs):
"""Applies the processors on the data independently and outputs a list of """Applies the processors on the data independently and outputs a list of
their outputs. their outputs.
Parameters Parameters
---------- ----------
data : object data : object
The data that needs to be processed. The data that needs to be processed.
**kwargs **kwargs
Any kwargs are passed to the processors. Any kwargs are passed to the processors.
Returns Returns
------- -------
object object
The processed data. The processed data.
""" """
output = [] output = []
for processor in self.processors: for processor in self.processors:
out = processor(data, **kwargs) out = processor(data, **kwargs)
output.append(out) output.append(out)
if self.stack: if self.stack:
output = numpy.hstack(output) output = numpy.hstack(output)
return output return output
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment