Skip to content
Snippets Groups Projects
Commit 8665d114 authored by Manuel Günther's avatar Manuel Günther
Browse files

Improved PCA testing

parent 16466aa0
No related branches found
No related tags found
No related merge requests found
...@@ -39,84 +39,85 @@ class PCA (Algorithm): ...@@ -39,84 +39,85 @@ class PCA (Algorithm):
**kwargs **kwargs
) )
self.m_subspace_dim = subspace_dimension self.subspace_dim = subspace_dimension
self.m_machine = None self.machine = None
self.m_distance_function = distance_function self.distance_function = distance_function
self.m_factor = -1. if is_distance_function else 1. self.factor = -1. if is_distance_function else 1.
self.m_uses_variances = uses_variances self.uses_variances = uses_variances
def _check_feature(self, feature):
"""Checks that the features are apropriate"""
if not isinstance(feature, numpy.ndarray) or len(feature.shape) != 1:
raise ValueError("The given feature is not appropriate")
def train_projector(self, training_features, projector_file): def train_projector(self, training_features, projector_file):
"""Generates the PCA covariance matrix""" """Generates the PCA covariance matrix"""
# Initializes the data # Assure that all data are 1D
data = numpy.vstack([feature.flatten() for feature in training_features]) [self._check_feature(feature) for feature in training_features]
# Initializes the data
data = numpy.vstack(training_features)
logger.info(" -> Training LinearMachine using PCA") logger.info(" -> Training LinearMachine using PCA")
t = bob.learn.linear.PCATrainer() t = bob.learn.linear.PCATrainer()
self.m_machine, self.m_variances = t.train(data) self.machine, self.variances = t.train(data)
# For re-shaping, we need to copy... # For re-shaping, we need to copy...
self.m_variances = self.m_variances.copy() self.variances = self.variances.copy()
# compute variance percentage, if desired # compute variance percentage, if desired
if isinstance(self.m_subspace_dim, float): if isinstance(self.subspace_dim, float):
cummulated = numpy.cumsum(self.m_variances) / numpy.sum(self.m_variances) cummulated = numpy.cumsum(self.variances) / numpy.sum(self.variances)
for index in range(len(cummulated)): for index in range(len(cummulated)):
if cummulated[index] > self.m_subspace_dim: if cummulated[index] > self.subspace_dim:
self.m_subspace_dim = index self.subspace_dim = index
break break
self.m_subspace_dim = index self.subspace_dim = index
logger.info(" ... Keeping %d PCA dimensions", self.subspace_dim)
logger.info(" ... Keeping %d PCA dimensions", self.m_subspace_dim)
# re-shape machine # re-shape machine
self.m_machine.resize(self.m_machine.shape[0], self.m_subspace_dim) self.machine.resize(self.machine.shape[0], self.subspace_dim)
self.m_variances.resize(self.m_subspace_dim) self.variances.resize(self.subspace_dim)
f = bob.io.base.HDF5File(projector_file, "w") f = bob.io.base.HDF5File(projector_file, "w")
f.set("Eigenvalues", self.m_variances) f.set("Eigenvalues", self.variances)
f.create_group("Machine") f.create_group("Machine")
f.cd("/Machine") f.cd("/Machine")
self.m_machine.save(f) self.machine.save(f)
def load_projector(self, projector_file): def load_projector(self, projector_file):
"""Reads the PCA projection matrix from file""" """Reads the PCA projection matrix from file"""
# read PCA projector # read PCA projector
f = bob.io.base.HDF5File(projector_file) f = bob.io.base.HDF5File(projector_file)
self.m_variances = f.read("Eigenvalues") self.variances = f.read("Eigenvalues")
f.cd("/Machine") f.cd("/Machine")
self.m_machine = bob.learn.linear.Machine(f) self.machine = bob.learn.linear.Machine(f)
# Allocates an array for the projected data
self.m_projected_feature = numpy.ndarray(self.m_machine.shape[1], numpy.float64)
def project(self, feature): def project(self, feature):
"""Projects the data using the stored covariance matrix""" """Projects the data using the stored covariance matrix"""
self._check_feature(feature)
# Projects the data # Projects the data
self.m_machine(feature, self.m_projected_feature) return self.machine(feature)
# return the projected data
return self.m_projected_feature
def enroll(self, enroll_features): def enroll(self, enroll_features):
"""Enrolls the model by computing an average of the given input vectors""" """Enrolls the model by storing all given input vectors"""
[self._check_feature(feature) for feature in enroll_features]
assert len(enroll_features) assert len(enroll_features)
# just store all the features # just store all the features
model = numpy.zeros((len(enroll_features), enroll_features[0].shape[0]), numpy.float64) return numpy.vstack(enroll_features)
for n, feature in enumerate(enroll_features):
model[n,:] += feature[:]
# return enrolled model
return model
def score(self, model, probe): def score(self, model, probe):
"""Computes the distance of the model to the probe using the distance function taken from the config file""" """Computes the distance of the model to the probe using the distance function"""
# return the negative distance (as a similarity measure) # return the negative distance (as a similarity measure)
if len(model.shape) == 2: if len(model.shape) == 2:
# we have multiple models, so we use the multiple model scoring # we have multiple models, so we use the multiple model scoring
return self.score_for_multiple_models(model, probe) return self.score_for_multiple_models(model, probe)
elif self.m_uses_variances: elif self.uses_variances:
# single model, single probe (multiple probes have already been handled) # single model, single probe (multiple probes have already been handled)
return self.m_factor * self.m_distance_function(model, probe, self.m_variances) return self.factor * self.distance_function(model, probe, self.variances)
else: else:
# single model, single probe (multiple probes have already been handled) # single model, single probe (multiple probes have already been handled)
return self.m_factor * self.m_distance_function(model, probe) return self.factor * self.distance_function(model, probe)
File added
File added
File added
This diff is collapsed.
...@@ -26,20 +26,27 @@ from nose.plugins.skip import SkipTest ...@@ -26,20 +26,27 @@ from nose.plugins.skip import SkipTest
import logging import logging
logger = logging.getLogger("bob.bio.base") logger = logging.getLogger("bob.bio.base")
def random_training_set(shape, count, minimum = 0, maximum = 1):
def random_array(shape, minimum = 0, maximum = 1, seed = 42):
# generate a random sequence of features
numpy.random.seed(seed)
return numpy.random.random(shape) * (maximum - minimum) + minimum
def random_training_set(shape, count, minimum = 0, maximum = 1, seed = 42):
"""Returns a random training set with the given shape and the given number of elements.""" """Returns a random training set with the given shape and the given number of elements."""
# generate a random sequence of features # generate a random sequence of features
numpy.random.seed(42) numpy.random.seed(seed)
return [numpy.random.random(shape) * (maximum - minimum) + minimum for i in range(count)] return [numpy.random.random(shape) * (maximum - minimum) + minimum for i in range(count)]
def random_training_set_by_id(shape, count = 50, minimum = 0, maximum = 1): def random_training_set_by_id(shape, count = 50, minimum = 0, maximum = 1, seed = 42):
# generate a random sequence of features # generate a random sequence of features
numpy.random.seed(42) numpy.random.seed(seed)
train_set = [] train_set = []
for i in range(count): for i in range(count):
train_set.append([numpy.random.random(shape) * (maximum - minimum) + minimum for j in range(count)]) train_set.append([numpy.random.random(shape) * (maximum - minimum) + minimum for j in range(count)])
return train_set return train_set
def grid_available(test): def grid_available(test):
'''Decorator to check if the gridtk is present, before running the test''' '''Decorator to check if the gridtk is present, before running the test'''
@functools.wraps(test) @functools.wraps(test)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment