From b8efca2376b36a45aace8676242852f71f97d1e0 Mon Sep 17 00:00:00 2001
From: Amir MOHAMMADI <amir.mohammadi@idiap.ch>
Date: Wed, 11 Oct 2017 16:48:39 +0200
Subject: [PATCH] Add sequential and parallel processors, pre-processors, and
 extractors

---
 bob/bio/base/extractor/__init__.py    |  17 +++-
 bob/bio/base/extractor/stacks.py      | 125 ++++++++++++++++++++++++++
 bob/bio/base/preprocessor/__init__.py |  16 +++-
 bob/bio/base/preprocessor/stacks.py   |  61 +++++++++++++
 bob/bio/base/test/test_stacks.py      |  45 ++++++++++
 bob/bio/base/utils/__init__.py        |   1 +
 bob/bio/base/utils/processors.py      |  84 +++++++++++++++++
 doc/implemented.rst                   |   6 ++
 doc/py_api.rst                        |  11 +++
 9 files changed, 358 insertions(+), 8 deletions(-)
 create mode 100644 bob/bio/base/extractor/stacks.py
 create mode 100644 bob/bio/base/preprocessor/stacks.py
 create mode 100644 bob/bio/base/test/test_stacks.py
 create mode 100644 bob/bio/base/utils/processors.py

diff --git a/bob/bio/base/extractor/__init__.py b/bob/bio/base/extractor/__init__.py
index 7b55d421..03851247 100644
--- a/bob/bio/base/extractor/__init__.py
+++ b/bob/bio/base/extractor/__init__.py
@@ -1,11 +1,14 @@
 from .Extractor import Extractor
 from .Linearize import Linearize
+from .stacks import (SequentialExtractor, ParallelExtractor,
+                     CallableExtractor, MultipleExtractor)
+
 
 # gets sphinx autodoc done right - don't remove it
 def __appropriate__(*args):
   """Says object was actually declared here, and not in the import module.
-  Fixing sphinx warnings of not being able to find classes, when path is shortened.
-  Parameters:
+  Fixing sphinx warnings of not being able to find classes, when path is
+  shortened. Parameters:
 
     *args: An iterable of objects to modify
 
@@ -13,10 +16,16 @@ def __appropriate__(*args):
   <https://github.com/sphinx-doc/sphinx/issues/3048>`
   """
 
-  for obj in args: obj.__module__ = __name__
+  for obj in args:
+    obj.__module__ = __name__
+
 
 __appropriate__(
     Extractor,
     Linearize,
-    )
+    SequentialExtractor,
+    ParallelExtractor,
+    CallableExtractor,
+    MultipleExtractor,
+)
 __all__ = [_ for _ in dir() if not _.startswith('_')]
diff --git a/bob/bio/base/extractor/stacks.py b/bob/bio/base/extractor/stacks.py
new file mode 100644
index 00000000..511b7d04
--- /dev/null
+++ b/bob/bio/base/extractor/stacks.py
@@ -0,0 +1,125 @@
+from ..utils.processors import SequentialProcessor, ParallelProcessor
+from .Extractor import Extractor
+
+
+class MultipleExtractor(Extractor):
+  """Base class for SequentialExtractor and ParallelExtractor. This class is
+  not meant to be used directly."""
+
+  def get_attributes(self, processors):
+    requires_training = any((p.requires_training for p in processors))
+    split_training_data_by_client = any(
+        (p.split_training_data_by_client for p in processors))
+    min_extractor_file_size = min(
+        (p.min_extractor_file_size for p in processors))
+    min_feature_file_size = min(
+        (p.min_feature_file_size for p in processors))
+    return (requires_training, split_training_data_by_client,
+            min_extractor_file_size, min_feature_file_size)
+
+  def get_extractor_files(self, extractor_file):
+    paths = [extractor_file]
+    paths += [extractor_file +
+              '_{}.hdf5'.format(i) for i in range(1, len(self.processors))]
+    return paths
+
+  def train_one(self, e, training_data, extractor_file, apply=False):
+    if not e.requires_training:
+      return
+    if e.split_training_data_by_client:
+      e.train(training_data, extractor_file)
+      if not apply:
+        return
+      training_data = [[e(d) for d in datalist] for datalist in training_data]
+    elif not self.split_training_data_by_client:
+      e.train(training_data, extractor_file)
+      if not apply:
+        return
+      training_data = [e(d) for d in training_data]
+    else:
+      # make training_data flat
+      training_data_len = [len(datalist) for datalist in training_data]
+      training_data = [d for datalist in training_data for d in datalist]
+      e.train(training_data, extractor_file)
+      if not apply:
+        return
+      # split training data
+      new_training_data, i = [], 0
+      for length in training_data_len:
+        class_data = []
+        for _ in range(length):
+          class_data.append(e(training_data[i]))
+          i += 1
+        new_training_data.append(class_data)
+      training_data = new_training_data
+    return training_data
+
+  def load(self, extractor_file):
+    paths = self.get_extractor_files(extractor_file)
+    for e, path in zip(self.processors, paths):
+      e.load(path)
+
+
+class SequentialExtractor(SequentialProcessor, MultipleExtractor):
+  __doc__ = SequentialProcessor.__doc__
+
+  def __init__(self, processors):
+
+    (requires_training, split_training_data_by_client,
+     min_extractor_file_size, min_feature_file_size) = self.get_attributes(
+        processors)
+
+    SequentialProcessor.__init__(self, processors)
+    MultipleExtractor.__init__(
+        self,
+        requires_training=requires_training,
+        split_training_data_by_client=split_training_data_by_client,
+        min_extractor_file_size=min_extractor_file_size,
+        min_feature_file_size=min_feature_file_size)
+
+  def train(self, training_data, extractor_file):
+    paths = self.get_extractor_files(extractor_file)
+    for e, path in zip(self.processors, paths):
+      training_data = self.train_one(e, training_data, path, apply=True)
+
+
+class ParallelExtractor(ParallelProcessor, MultipleExtractor):
+  __doc__ = ParallelProcessor.__doc__
+
+  def __init__(self, processors):
+
+    (requires_training, split_training_data_by_client,
+     min_extractor_file_size, min_feature_file_size) = self.get_attributes(
+        processors)
+
+    ParallelProcessor.__init__(self, processors)
+    MultipleExtractor.__init__(
+        self,
+        requires_training=requires_training,
+        split_training_data_by_client=split_training_data_by_client,
+        min_extractor_file_size=min_extractor_file_size,
+        min_feature_file_size=min_feature_file_size)
+
+  def train(self, training_data, extractor_file):
+    paths = self.get_extractor_files(extractor_file)
+    for e, path in zip(self.processors, paths):
+      self.train_one(e, training_data, path)
+
+
+class CallableExtractor(Extractor):
+  """A simple extractor that takes a callable and applies that callable to the
+  input.
+
+  Attributes
+  ----------
+  callable : object
+      Anything that is callable. It will be used as an extractor in
+      bob.bio.base.
+  """
+
+  def __init__(self, callable, **kwargs):
+    super(CallableExtractor, self).__init__(**kwargs)
+    self.callable = callable
+
+  def __call__(self, data):
+    return self.callable(data)
diff --git a/bob/bio/base/preprocessor/__init__.py b/bob/bio/base/preprocessor/__init__.py
index 0f981ce2..8c0a0567 100644
--- a/bob/bio/base/preprocessor/__init__.py
+++ b/bob/bio/base/preprocessor/__init__.py
@@ -1,11 +1,14 @@
 from .Preprocessor import Preprocessor
 from .Filename import Filename
+from .stacks import (SequentialPreprocessor,
+                     ParallelPreprocessor, CallablePreprocessor)
+
 
 # gets sphinx autodoc done right - don't remove it
 def __appropriate__(*args):
   """Says object was actually declared here, and not in the import module.
-  Fixing sphinx warnings of not being able to find classes, when path is shortened.
-  Parameters:
+  Fixing sphinx warnings of not being able to find classes, when path is
+  shortened. Parameters:
 
     *args: An iterable of objects to modify
 
@@ -13,10 +16,15 @@ def __appropriate__(*args):
   <https://github.com/sphinx-doc/sphinx/issues/3048>`
   """
 
-  for obj in args: obj.__module__ = __name__
+  for obj in args:
+    obj.__module__ = __name__
+
 
 __appropriate__(
     Preprocessor,
     Filename,
-    )
+    SequentialPreprocessor,
+    ParallelPreprocessor,
+    CallablePreprocessor,
+)
 __all__ = [_ for _ in dir() if not _.startswith('_')]
diff --git a/bob/bio/base/preprocessor/stacks.py b/bob/bio/base/preprocessor/stacks.py
new file mode 100644
index 00000000..1c8be13d
--- /dev/null
+++ b/bob/bio/base/preprocessor/stacks.py
@@ -0,0 +1,61 @@
+from ..utils.processors import SequentialProcessor, ParallelProcessor
+from .Preprocessor import Preprocessor
+
+
+class SequentialPreprocessor(SequentialProcessor, Preprocessor):
+  __doc__ = SequentialProcessor.__doc__
+
+  def __init__(self, processors, **kwargs):
+    min_preprocessed_file_size = 1000
+    try:
+      min_preprocessed_file_size = min(
+          (p.min_preprocessed_file_size for p in processors))
+    except AttributeError:
+      pass
+
+    SequentialProcessor.__init__(self, processors)
+    Preprocessor.__init__(
+        self, min_preprocessed_file_size=min_preprocessed_file_size, **kwargs)
+
+  def __call__(self, data, annotations):
+    return super(SequentialPreprocessor, self).__call__(
+        data, annotations=annotations)
+
+
+class ParallelPreprocessor(ParallelProcessor, Preprocessor):
+  __doc__ = ParallelProcessor.__doc__
+
+  def __init__(self, processors, **kwargs):
+    min_preprocessed_file_size = 1000
+    try:
+      min_preprocessed_file_size = min(
+          (p.min_preprocessed_file_size for p in processors))
+    except AttributeError:
+      pass
+
+    ParallelProcessor.__init__(self, processors)
+    Preprocessor.__init__(
+        self, min_preprocessed_file_size=min_preprocessed_file_size, **kwargs)
+
+  def __call__(self, data, annotations):
+    return super(ParallelPreprocessor, self).__call__(
+        data, annotations=annotations)
+
+
+class CallablePreprocessor(Preprocessor):
+  """A simple preprocessor that takes a callable and applies that callable to
+  the input.
+
+  Attributes
+  ----------
+  callable : object
+      Anything that is callable. It will be used as a preprocessor in
+      bob.bio.base.
+  """
+
+  def __init__(self, callable, **kwargs):
+    super(CallablePreprocessor, self).__init__(**kwargs)
+    self.callable = callable
+
+  def __call__(self, data, annotations):
+    return self.callable(data)
diff --git a/bob/bio/base/test/test_stacks.py b/bob/bio/base/test/test_stacks.py
new file mode 100644
index 00000000..3db57f56
--- /dev/null
+++ b/bob/bio/base/test/test_stacks.py
@@ -0,0 +1,45 @@
+from functools import partial
+import numpy as np
+from bob.bio.base.utils.processors import (
+    SequentialProcessor, ParallelProcessor)
+from bob.bio.base.preprocessor import (
+    SequentialPreprocessor, ParallelPreprocessor, CallablePreprocessor)
+from bob.bio.base.extractor import (
+    SequentialExtractor, ParallelExtractor, CallableExtractor)
+
+DATA = [0, 1, 2, 3, 4]
+PROCESSORS = [partial(np.power, 2), np.mean]
+SEQ_DATA = PROCESSORS[1](PROCESSORS[0](DATA))
+PAR_DATA = np.hstack([PROCESSORS[0](DATA), PROCESSORS[1](DATA)])
+
+
+def test_processors():
+  proc = SequentialProcessor(PROCESSORS)
+  data = proc(DATA)
+  assert np.allclose(data, SEQ_DATA)
+
+  proc = ParallelProcessor(PROCESSORS)
+  data = proc(DATA)
+  assert np.allclose(data, PAR_DATA)
+
+
+def test_preprocessors():
+  processors = [CallablePreprocessor(p) for p in PROCESSORS]
+  proc = SequentialPreprocessor(processors)
+  data = proc(DATA, None)
+  assert np.allclose(data, SEQ_DATA)
+
+  proc = ParallelPreprocessor(processors)
+  data = proc(DATA, None)
+  assert np.allclose(data, PAR_DATA)
+
+
+def test_extractors():
+  processors = [CallableExtractor(p) for p in PROCESSORS]
+  proc = SequentialExtractor(processors)
+  data = proc(DATA)
+  assert np.allclose(data, SEQ_DATA)
+
+  proc = ParallelExtractor(processors)
+  data = proc(DATA)
+  assert np.allclose(data, PAR_DATA)
diff --git a/bob/bio/base/utils/__init__.py b/bob/bio/base/utils/__init__.py
index 4f1b2967..b65569cb 100644
--- a/bob/bio/base/utils/__init__.py
+++ b/bob/bio/base/utils/__init__.py
@@ -6,6 +6,7 @@
 from .resources import *
 from .io import *
 from .singleton import *
+from . import processors
 
 import numpy
 
diff --git a/bob/bio/base/utils/processors.py b/bob/bio/base/utils/processors.py
new file mode 100644
index 00000000..64549d76
--- /dev/null
+++ b/bob/bio/base/utils/processors.py
@@ -0,0 +1,84 @@
+import numpy
+
+
+class SequentialProcessor(object):
+  """A helper class which takes several processors and applies them one by one
+  sequentially
+
+  Attributes
+  ----------
+  processors : list
+      A list of processors to apply.
+  """
+
+  def __init__(self, processors):
+    super(SequentialProcessor, self).__init__()
+    self.processors = processors
+
+  def __call__(self, data, **kwargs):
+    """Applies the processors on the data sequentially. The output of the first
+    one goes as input to the next one.
+
+    Parameters
+    ----------
+    data : object
+        The data that needs to be processed.
+    **kwargs
+        Any kwargs are passed to the processors.
+
+    Returns
+    -------
+    object
+        The processed data.
+    """
+    for processor in self.processors:
+      try:
+        data = processor(data, **kwargs)
+      except ValueError:
+        data = processor(data)
+    return data
+
+
+class ParallelProcessor(object):
+  """A helper class which takes several processors and applies them on each
+  processor separately and outputs a list of their outputs in the end.
+
+  Attributes
+  ----------
+  processors : list
+      A list of processors to apply.
+  stack : bool
+      If True (default), :any:`numpy.hstack` is called on the list of outputs.
+  """
+
+  def __init__(self, processors, stack=True):
+    super(ParallelProcessor, self).__init__()
+    self.processors = processors
+    self.stack = stack
+
+  def __call__(self, data, **kwargs):
+    """Applies the processors on the data independently and outputs a list of
+    their outputs.
+
+    Parameters
+    ----------
+    data : object
+        The data that needs to be processed.
+    **kwargs
+        Any kwargs are passed to the processors.
+
+    Returns
+    -------
+    object
+        The processed data.
+    """
+    output = []
+    for processor in self.processors:
+      try:
+        out = processor(data, **kwargs)
+      except ValueError:
+        out = processor(data)
+      output.append(out)
+    if self.stack:
+      output = numpy.hstack(output)
+    return output
diff --git a/doc/implemented.rst b/doc/implemented.rst
index 8ceb9fe9..84b358bf 100644
--- a/doc/implemented.rst
+++ b/doc/implemented.rst
@@ -22,6 +22,12 @@ Implementations
 
 .. autosummary::
    bob.bio.base.preprocessor.Filename
+   bob.bio.base.preprocessor.SequentialPreprocessor
+   bob.bio.base.preprocessor.ParallelPreprocessor
+   bob.bio.base.preprocessor.CallablePreprocessor
+   bob.bio.base.extractor.SequentialExtractor
+   bob.bio.base.extractor.ParallelExtractor
+   bob.bio.base.extractor.CallableExtractor
    bob.bio.base.extractor.Linearize
    bob.bio.base.algorithm.Distance
    bob.bio.base.algorithm.PCA
diff --git a/doc/py_api.rst b/doc/py_api.rst
index c9841b2b..13daaa8f 100644
--- a/doc/py_api.rst
+++ b/doc/py_api.rst
@@ -42,6 +42,14 @@ Miscellaneous functions
    bob.bio.base.selected_indices
 
 
+Generic classes
+---------------
+
+.. autosummary::
+   bob.bio.base.utils.processors.SequentialProcessor
+   bob.bio.base.utils.processors.ParallelProcessor
+
+
 Tools to run recognition experiments
 ------------------------------------
 
@@ -109,4 +117,7 @@ Details
    .. autoclass:: FileSelector
 
 
+.. automodule:: bob.bio.base.utils.processors
+
+
 .. include:: links.rst
-- 
GitLab