New processing API

New processing API
parent 0b2a0fc8
Pipeline #37811 failed with stage
in 7 minutes and 50 seconds
This diff is collapsed.
......@@ -8,4 +8,7 @@ preprocessor = "face-detect"
extractor = 'linearize'
algorithm = 'pca'
from bob.bio.base.algorithm import PCA
import functools
algorithm = AlgorithmAdaptor(functools.partial(PCA,0.99))
......@@ -13,9 +13,22 @@ from bob.pipelines.utils import is_picklable
import logging
logger = logging.getLogger("bob.bio.base")
def save_scores_four_columns(path, probe):
"""
Write scores in the four columns format
"""
with open(path, "w") as f:
for biometric_reference in probe.samples:
line = "{0} {1} {2} {3}\n".format(biometric_reference.subject, probe.subject, probe.path, biometric_reference.data)
f.write(line)
return DelayedSample(functools.partial(open, path))
class SampleLoader:
"""Adaptor for loading, preprocessing and feature extracting samples
from bob.pipelines.processor import ProcessorPipeline
class ProcessorPipelineAnnotated(ProcessorPipeline):
"""Adaptor for loading, preprocessing and feature extracting samples that uses annotations
This adaptor class wraps around sample:
......@@ -51,152 +64,16 @@ class SampleLoader:
"""
def __init__(self, pipeline):
self.pipeline = copy.deepcopy(pipeline)
def _handle_step(self, sset, func, checkpoint):
"""Handles a single step in the pipeline, with optional checkpointing
Parameters
----------
super(ProcessorPipelineAnnotated, self).__init__(pipeline)
sset : SampleSet
The original sample set to be processed (delayed or pre-loaded)
func : callable
The processing function to call for processing **each** sample in
the set, if needs be
def _transform(self, func, sample):
checkpoint : str, None
An optional string that may point to a directory that will be used
for checkpointing the processing phase in question
Returns
-------
r : SampleSet
The prototype processed sample. If no checkpointing required, this
will be of type :py:class:`Sample`. Otherwise, it will be a
:py:class:`DelayedSample`
"""
if checkpoint is not None:
samples = [] # processed samples
for s in sset.samples:
# there can be a checkpoint for the data to be processed
candidate = os.path.join(checkpoint, s.path + ".hdf5")
if not os.path.exists(candidate):
# preprocessing is required, and checkpointing, do it now
data = func(s.data)
# notice this can be called in parallel w/o failing
bob.io.base.create_directories_safe(os.path.dirname(candidate))
# bob.bio.base standard interface for preprocessor
# has a read/write_data methods
writer = (
getattr(func, "write_data")
if hasattr(func, "write_data")
else getattr(func, "write_feature")
)
writer(data, candidate)
# because we are checkpointing, we return a DelayedSample
# instead of normal (preloaded) sample. This allows the next
# phase to avoid loading it would it be unnecessary (e.g. next
# phase is already check-pointed)
reader = (
getattr(func, "read_data")
if hasattr(func, "read_data")
else getattr(func, "read_feature")
)
if is_picklable(reader):
samples.append(
DelayedSample(
functools.partial(reader, candidate), parent=s
)
)
else:
logger.warning(f"The method {func} is not picklable. Shiping its unbounded method to `DelayedSample`.")
reader = reader.__func__ # The reader object might not be picklable
samples.append(
DelayedSample(
functools.partial(reader, None, candidate), parent=s
)
)
else:
# if checkpointing is not required, load the data and preprocess it
# as we would normally do
samples = [Sample(func(s.data), parent=s) for s in sset.samples]
r = SampleSet(samples, parent=sset)
return r
def _handle_sample(self, sset, pipeline):
"""Handles a single sampleset through a pipelien
Parameters
----------
sset : SampleSet
The original sample set to be processed (delayed or pre-loaded)
pipeline : :py:class:`list` of :py:class:`tuple`
A list of tuples, each comprising of one processing function and
one checkpoint directory (:py:class:`str` or ``None``, to avoid
checkpointing that phase), respectively
Returns
-------
r : Sample
The processed sample
"""
r = sset
for func, checkpoint in pipeline:
r = r if func is None else self._handle_step(r, func, checkpoint)
return r
def __call__(self, samples, checkpoints):
"""Applies the pipeline chaining with optional checkpointing
Our implementation is optimized to minimize disk I/O to the most. It
yields :py:class:`DelayedSample`'s instead of :py:class:`Sample` if
checkpointing is enabled.
Parameters
----------
samples : list
List of :py:class:`SampleSet` to be treated by this pipeline
checkpoints : dict
A dictionary (with any number of entries) that may contain as many
keys as those defined when you constructed this class with the
pipeline tuple list. Upon execution, the existance of an entry
that defines checkpointing, this phase of the pipeline will be
checkpointed. Notice that you are in the control of checkpointing.
If you miss an intermediary step, it will trigger this loader to
load the relevant sample, even if the next phase is supposed to be
checkpointed. This strategy keeps the implementation as simple as
possible.
Returns
-------
samplesets : list
Loaded samplesets, after optional preprocessing and extraction
"""
pipe = [(v(), checkpoints.get(k)) for k, v in self.pipeline]
return [self._handle_sample(k, pipe) for k in samples]
# TODO: Fix this on bob.bio.base
try:
return func.transform(sample.data, annotations=sample.annotations)
except:
return func.transform(sample.data)
class VanillaBiometricsAlgoritm(object):
......@@ -226,8 +103,7 @@ class VanillaBiometricsAlgoritm(object):
stack_per_sampleset: bool
If true will return a list of :py:class:`numpy.ndarray`, each one for a sample set
"""
"""
if stack_per_sampleset:
# TODO: Make it more efficient
all_data = []
......
......@@ -102,14 +102,14 @@ def biometric_pipeline(
"""
## Training background model (fit will return even if samples is ``None``,
## in which case we suppose the algorithm is not trainable in any way)
background_model = train_background_model(background_model_samples, loader, algorithm, npartitions, checkpoints)
## in which case we suppose the algorithm is not trainable in any way)
background_model = train_background_model(background_model_samples, loader.transform, algorithm, npartitions, checkpoints)
## Create biometric samples
biometric_references = create_biometric_reference(background_model,references,loader,algorithm,npartitions,checkpoints)
biometric_references = create_biometric_reference(background_model,references,loader.transform,algorithm,npartitions,checkpoints)
## Scores all probes
return compute_scores(background_model, biometric_references, probes, loader, algorithm, npartitions, checkpoints)
return compute_scores(background_model, biometric_references, probes, loader.transform, algorithm, npartitions, checkpoints)
def train_background_model(
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from bob.bio.base.pipelines.vanilla_biometrics.blocks import VanillaBiometricsAlgoritm
import sklearn.decomposition
from scipy.spatial.distance import euclidean
import numpy
import logging
logger = logging.getLogger("bob.bio.base")
class PCA(VanillaBiometricsAlgoritm):
"""Performs a principal component analysis (PCA) on the given data.
This algorithm computes a PCA projection (:py:class:`bob.learn.linear.PCATrainer`) on the given training features, projects the features to eigenspace and computes the distance of two projected features in eigenspace.
For example, the eigenface algorithm as proposed by [TP91]_ can be run with this class.
**Parameters:**
subspace_dimension : int or float
If specified as ``int``, defines the number of eigenvectors used in the PCA projection matrix.
If specified as ``float`` (between 0 and 1), the number of eigenvectors is calculated such that the given percentage of variance is kept.
distance_function : function
A function taking two parameters and returns a float.
If ``uses_variances`` is set to ``True``, the function is provided with a third parameter, which is the vector of variances (aka. eigenvalues).
svd_solver: std
The way to solve the eigen value problem
factor: float
Multiplication factor used for the scoring stage
kwargs : ``key=value`` pairs
A list of keyword arguments directly passed to the :py:class:`Algorithm` base class constructor.
"""
def __init__(
self,
subspace_dimension, # if int, number of subspace dimensions; if float, percentage of variance to keep
distance_function=euclidean,
svd_solver="auto",
factor=-1,
**kwargs, # parameters directly sent to the base class
):
# call base class constructor and register that the algorithm performs a projection
super(PCA, self).__init__(performs_projection=True)
self.subspace_dim = subspace_dimension
self.distance_function = distance_function
self.svd_solver = svd_solver
self.factor = -1
def fit(self, samplesets, checkpoints):
"""
This method should implement the sub-pipeline 0 of the Vanilla Biometrics Pipeline :ref:`_vanilla-pipeline-0`.
It represents the training of background models that an algorithm may need.
Parameters
----------
samplesets: :py:class:`bob.pipelines.sample.sample.SampleSet`
Set of samples used to train a background model
checkpoint: str
If provided, must the path leading to a location where this
model should be saved at (complete path without extension) -
currently, it needs to be provided because of existing
serialization requirements (see bob/bob.io.base#106), but
checkpointing will still work as expected.
"""
pca = sklearn.decomposition.PCA(self.subspace_dim, svd_solver=self.svd_solver)
samples_array = self._stack_samples_2_ndarray(samplesets)
logger.info(
"Training PCA with samples of shape {0}".format(samples_array.shape)
)
pca.fit(samples_array)
# TODO: save the shit
return pca
def project_one_sample(self, background_model, data):
if data.ndim == 1:
return background_model.transform(data.reshape(1, -1))
return background_model.transform(data)
def enroll_one_sample(self, data):
return numpy.mean(data, axis=0)
def score_one_sample(self, biometric_reference, data):
"""It handles the score computation for one sample
Parameters
----------
biometric_reference : list
Biometric reference to be compared
data : list
Data to be compared
Returns
-------
scores : list
For each sample in a probe, returns as many scores as there are
samples in the probe, together with the probe's and the
relevant reference's subject identifiers.
"""
return self.factor * self.distance_function(biometric_reference, data)
def get_config():
"""Returns a string containing the configuration information.
"""
import bob.extension
return bob.extension.get_config(__name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
......@@ -203,14 +203,17 @@ def vanilla_biometrics(
# Defines the processing pipeline for loading samples
# Can add any number of steps!
pipeline = [("preprocessor",preprocessor),
("extractor", extractor)]
# Can add any number of steps!
from bob.bio.base.pipelines.vanilla_biometrics.legacy import ProcessorBlockAdaptor
pipeline = [("preprocessor", ProcessorBlockAdaptor(cls=preprocessor)),
("extractor", ProcessorBlockAdaptor(cls=extractor))]
# Mechanism that loads samples
# from ..bob_bio.blocks import SampleLoader
from bob.bio.base.pipelines.vanilla_biometrics.annotated_legacy import SampleLoaderAnnotated as SampleLoader
loader = SampleLoader(pipeline)
#from bob.bio.base.pipelines.vanilla_biometrics.annotated_legacy import SampleLoaderAnnotated as SampleLoader
from bob.pipelines.processor import ProcessorPipeline
processor_pipeline = ProcessorPipeline(pipeline)
for g in group:
......@@ -221,7 +224,7 @@ def vanilla_biometrics(
database.background_model_samples(),
biometric_references,
database.probes(group=g),
loader,
processor_pipeline,
algorithm,
npartitions=len(dask_client.cluster.workers),
checkpoints=checkpoints,
......@@ -229,8 +232,7 @@ def vanilla_biometrics(
# result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR")
result = result.compute(scheduler=dask_client)
#result = result.compute(scheduler="single-threaded")
#import ipdb; ipdb.set_trace()
for probe in result:
for sample in probe.samples:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment