Skip to content
Snippets Groups Projects
Commit 80ceaa89 authored by Tiago Pereira's avatar Tiago Pereira Committed by Amir MOHAMMADI
Browse files

Added a shallow input test. Issue #21

parent a2ea2eec
Branches
Tags
1 merge request!24Re-write the user guide
...@@ -15,159 +15,179 @@ from bob.io.base.test_utils import datafile ...@@ -15,159 +15,179 @@ from bob.io.base.test_utils import datafile
from bob.learn.em import KMeansMachine, KMeansTrainer from bob.learn.em import KMeansMachine, KMeansTrainer
def equals(x, y, epsilon): def equals(x, y, epsilon):
return (abs(x - y) < epsilon).all() return (abs(x - y) < epsilon).all()
def kmeans_plus_plus(machine, data, seed): def kmeans_plus_plus(machine, data, seed):
"""Python implementation of K-Means++ (initialization)""" """Python implementation of K-Means++ (initialization)"""
n_data = data.shape[0] n_data = data.shape[0]
rng = bob.core.random.mt19937(seed) rng = bob.core.random.mt19937(seed)
u = bob.core.random.uniform('int32', 0, n_data-1) u = bob.core.random.uniform('int32', 0, n_data - 1)
index = u(rng) index = u(rng)
machine.set_mean(0, data[index,:]) machine.set_mean(0, data[index, :])
weights = numpy.zeros(shape=(n_data,), dtype=numpy.float64) weights = numpy.zeros(shape=(n_data,), dtype=numpy.float64)
for m in range(1,machine.dim_c): for m in range(1, machine.dim_c):
for s in range(n_data): for s in range(n_data):
s_cur = data[s,:] s_cur = data[s, :]
w_cur = machine.get_distance_from_mean(s_cur, 0) w_cur = machine.get_distance_from_mean(s_cur, 0)
for i in range(m): for i in range(m):
w_cur = min(machine.get_distance_from_mean(s_cur, i), w_cur) w_cur = min(machine.get_distance_from_mean(s_cur, i), w_cur)
weights[s] = w_cur weights[s] = w_cur
weights *= weights weights *= weights
weights /= numpy.sum(weights) weights /= numpy.sum(weights)
d = bob.core.random.discrete('int32', weights) d = bob.core.random.discrete('int32', weights)
index = d(rng) index = d(rng)
machine.set_mean(m, data[index,:]) machine.set_mean(m, data[index, :])
def NormalizeStdArray(path): def NormalizeStdArray(path):
array = bob.io.base.load(path).astype('float64') array = bob.io.base.load(path).astype('float64')
std = array.std(axis=0) std = array.std(axis=0)
return (array/std, std) return (array / std, std)
def multiplyVectorsByFactors(matrix, vector): def multiplyVectorsByFactors(matrix, vector):
for i in range(0, matrix.shape[0]): for i in range(0, matrix.shape[0]):
for j in range(0, matrix.shape[1]): for j in range(0, matrix.shape[1]):
matrix[i, j] *= vector[j] matrix[i, j] *= vector[j]
def flipRows(array): def flipRows(array):
if len(array.shape) == 2: if len(array.shape) == 2:
return numpy.array([numpy.array(array[1, :]), numpy.array(array[0, :])], 'float64') return numpy.array([numpy.array(array[1, :]), numpy.array(array[0, :])], 'float64')
elif len(array.shape) == 1: elif len(array.shape) == 1:
return numpy.array([array[1], array[0]], 'float64') return numpy.array([array[1], array[0]], 'float64')
else: else:
raise Exception('Input type not supportd by flipRows') raise Exception('Input type not supportd by flipRows')
if hasattr(KMeansTrainer, 'KMEANS_PLUS_PLUS'): if hasattr(KMeansTrainer, 'KMEANS_PLUS_PLUS'):
def test_kmeans_plus_plus(): def test_kmeans_plus_plus():
# Tests the K-Means++ initialization
dim_c = 5
dim_d = 7
n_samples = 150
data = numpy.random.randn(n_samples, dim_d)
seed = 0
# C++ implementation
machine = KMeansMachine(dim_c, dim_d)
trainer = KMeansTrainer()
trainer.rng = bob.core.random.mt19937(seed)
trainer.initialization_method = 'KMEANS_PLUS_PLUS'
trainer.initialize(machine, data)
# Python implementation
py_machine = KMeansMachine(dim_c, dim_d)
kmeans_plus_plus(py_machine, data, seed)
assert equals(machine.means, py_machine.means, 1e-8)
# Tests the K-Means++ initialization
dim_c = 5
dim_d = 7
n_samples = 150
data = numpy.random.randn(n_samples,dim_d)
seed = 0
# C++ implementation def test_kmeans_noduplicate():
# Data/dimensions
dim_c = 2
dim_d = 3
seed = 0
data = numpy.array([[1, 2, 3], [1, 2, 3], [1, 2, 3], [4, 5, 6.]])
# Defines machine and trainer
machine = KMeansMachine(dim_c, dim_d) machine = KMeansMachine(dim_c, dim_d)
trainer = KMeansTrainer() trainer = KMeansTrainer()
trainer.rng = bob.core.random.mt19937(seed) rng = bob.core.random.mt19937(seed)
trainer.initialization_method = 'KMEANS_PLUS_PLUS' trainer.initialization_method = 'RANDOM_NO_DUPLICATE'
trainer.initialize(machine, data) trainer.initialize(machine, data, rng)
# Makes sure that the two initial mean vectors selected are different
# Python implementation assert equals(machine.get_mean(0), machine.get_mean(1), 1e-8) == False
py_machine = KMeansMachine(dim_c, dim_d)
kmeans_plus_plus(py_machine, data, seed)
assert equals(machine.means, py_machine.means, 1e-8)
def test_kmeans_noduplicate():
# Data/dimensions
dim_c = 2
dim_d = 3
seed = 0
data = numpy.array([[1,2,3],[1,2,3],[1,2,3],[4,5,6.]])
# Defines machine and trainer
machine = KMeansMachine(dim_c, dim_d)
trainer = KMeansTrainer()
rng = bob.core.random.mt19937(seed)
trainer.initialization_method = 'RANDOM_NO_DUPLICATE'
trainer.initialize(machine, data, rng)
# Makes sure that the two initial mean vectors selected are different
assert equals(machine.get_mean(0), machine.get_mean(1), 1e-8) == False
def test_kmeans_a(): def test_kmeans_a():
# Trains a KMeansMachine
# This files contains draws from two 1D Gaussian distributions:
# * 100 samples from N(-10,1)
# * 100 samples from N(10,1)
data = bob.io.base.load(datafile("samplesFrom2G_f64.hdf5", __name__, path="../data/"))
# Trains a KMeansMachine machine = KMeansMachine(2, 1)
# This files contains draws from two 1D Gaussian distributions:
# * 100 samples from N(-10,1)
# * 100 samples from N(10,1)
data = bob.io.base.load(datafile("samplesFrom2G_f64.hdf5", __name__, path="../data/"))
machine = KMeansMachine(2, 1)
trainer = KMeansTrainer()
#trainer.train(machine, data)
bob.learn.em.train(trainer,machine,data)
[variances, weights] = machine.get_variances_and_weights_for_each_cluster(data) trainer = KMeansTrainer()
variances_b = numpy.ndarray(shape=(2,1), dtype=numpy.float64) # trainer.train(machine, data)
weights_b = numpy.ndarray(shape=(2,), dtype=numpy.float64) bob.learn.em.train(trainer, machine, data)
machine.__get_variances_and_weights_for_each_cluster_init__(variances_b, weights_b)
machine.__get_variances_and_weights_for_each_cluster_acc__(data, variances_b, weights_b) [variances, weights] = machine.get_variances_and_weights_for_each_cluster(data)
machine.__get_variances_and_weights_for_each_cluster_fin__(variances_b, weights_b) variances_b = numpy.ndarray(shape=(2, 1), dtype=numpy.float64)
m1 = machine.get_mean(0) weights_b = numpy.ndarray(shape=(2,), dtype=numpy.float64)
m2 = machine.get_mean(1) machine.__get_variances_and_weights_for_each_cluster_init__(variances_b, weights_b)
machine.__get_variances_and_weights_for_each_cluster_acc__(data, variances_b, weights_b)
machine.__get_variances_and_weights_for_each_cluster_fin__(variances_b, weights_b)
m1 = machine.get_mean(0)
m2 = machine.get_mean(1)
## Check means [-10,10] / variances [1,1] / weights [0.5,0.5]
if (m1 < m2):
means = numpy.array(([m1[0], m2[0]]), 'float64')
else:
means = numpy.array(([m2[0], m1[0]]), 'float64')
assert equals(means, numpy.array([-10., 10.]), 2e-1)
assert equals(variances, numpy.array([1., 1.]), 2e-1)
assert equals(weights, numpy.array([0.5, 0.5]), 1e-3)
assert equals(variances, variances_b, 1e-8)
assert equals(weights, weights_b, 1e-8)
## Check means [-10,10] / variances [1,1] / weights [0.5,0.5]
if(m1<m2): means=numpy.array(([m1[0],m2[0]]), 'float64')
else: means=numpy.array(([m2[0],m1[0]]), 'float64')
assert equals(means, numpy.array([-10.,10.]), 2e-1)
assert equals(variances, numpy.array([1.,1.]), 2e-1)
assert equals(weights, numpy.array([0.5,0.5]), 1e-3)
assert equals(variances, variances_b, 1e-8) def test_kmeans_b():
assert equals(weights, weights_b, 1e-8) # Trains a KMeansMachine
(arStd, std) = NormalizeStdArray(datafile("faithful.torch3.hdf5", __name__, path="../data/"))
machine = KMeansMachine(2, 2)
trainer = KMeansTrainer()
# trainer.seed = 1337
bob.learn.em.train(trainer, machine, arStd, convergence_threshold=0.001)
def test_kmeans_b(): [variances, weights] = machine.get_variances_and_weights_for_each_cluster(arStd)
# Trains a KMeansMachine means = numpy.array(machine.means)
(arStd,std) = NormalizeStdArray(datafile("faithful.torch3.hdf5", __name__, path="../data/")) variances = numpy.array(variances)
machine = KMeansMachine(2, 2) multiplyVectorsByFactors(means, std)
multiplyVectorsByFactors(variances, std ** 2)
trainer = KMeansTrainer() gmmWeights = bob.io.base.load(datafile('gmm.init_weights.hdf5', __name__, path="../data/"))
#trainer.seed = 1337 gmmMeans = bob.io.base.load(datafile('gmm.init_means.hdf5', __name__, path="../data/"))
bob.learn.em.train(trainer,machine, arStd, convergence_threshold=0.001) gmmVariances = bob.io.base.load(datafile('gmm.init_variances.hdf5', __name__, path="../data/"))
[variances, weights] = machine.get_variances_and_weights_for_each_cluster(arStd) if (means[0, 0] < means[1, 0]):
means = flipRows(means)
variances = flipRows(variances)
weights = flipRows(weights)
means = numpy.array(machine.means) assert equals(means, gmmMeans, 1e-3)
variances = numpy.array(variances) assert equals(weights, gmmWeights, 1e-3)
assert equals(variances, gmmVariances, 1e-3)
multiplyVectorsByFactors(means, std) # Check that there is no duplicate means during initialization
multiplyVectorsByFactors(variances, std ** 2) machine = KMeansMachine(2, 1)
trainer = KMeansTrainer()
trainer.initialization_method = 'RANDOM_NO_DUPLICATE'
data = numpy.array([[1.], [1.], [1.], [1.], [1.], [1.], [2.], [3.]])
bob.learn.em.train(trainer, machine, data)
assert (numpy.isnan(machine.means).any()) == False
gmmWeights = bob.io.base.load(datafile('gmm.init_weights.hdf5', __name__, path="../data/"))
gmmMeans = bob.io.base.load(datafile('gmm.init_means.hdf5', __name__, path="../data/"))
gmmVariances = bob.io.base.load(datafile('gmm.init_variances.hdf5', __name__, path="../data/"))
if (means[0, 0] < means[1, 0]): def test_trainer_execption():
means = flipRows(means) from nose.tools import assert_raises
variances = flipRows(variances)
weights = flipRows(weights)
assert equals(means, gmmMeans, 1e-3) # Testing Inf
assert equals(weights, gmmWeights, 1e-3) machine = KMeansMachine(2, 2)
assert equals(variances, gmmVariances, 1e-3) data = numpy.array([[1.0, 2.0], [2, 3.], [1, 1.], [2, 5.], [numpy.inf, 1.0]])
trainer = KMeansTrainer()
assert_raises(ValueError, bob.learn.em.train, trainer, machine, data, 0.001)
# Check that there is no duplicate means during initialization # Testing Nan
machine = KMeansMachine(2, 1) machine = KMeansMachine(2, 2)
trainer = KMeansTrainer() data = numpy.array([[1.0, 2.0], [2, 3.], [1, numpy.nan], [2, 5.], [2.0, 1.0]])
trainer.initialization_method = 'RANDOM_NO_DUPLICATE' trainer = KMeansTrainer()
data = numpy.array([[1.], [1.], [1.], [1.], [1.], [1.], [2.], [3.]]) assert_raises(ValueError, bob.learn.em.train, trainer, machine, data, 0.001)
bob.learn.em.train(trainer, machine, data)
assert (numpy.isnan(machine.means).any()) == False
...@@ -7,112 +7,123 @@ ...@@ -7,112 +7,123 @@
import numpy import numpy
import bob.learn.em import bob.learn.em
import logging import logging
logger = logging.getLogger('bob.learn.em') logger = logging.getLogger('bob.learn.em')
def train(trainer, machine, data, max_iterations = 50, convergence_threshold=None, initialize=True, rng=None):
def train(trainer, machine, data, max_iterations=50, convergence_threshold=None, initialize=True, rng=None,
""" check_inputs=True):
Trains a machine given a trainer and the proper data """
Trains a machine given a trainer and the proper data
**Parameters**:
trainer : one of :py:class:`KMeansTrainer`, :py:class:`MAP_GMMTrainer`, :py:class:`ML_GMMTrainer`, :py:class:`ISVTrainer`, :py:class:`IVectorTrainer`, :py:class:`PLDATrainer`, :py:class:`EMPCATrainer` **Parameters**:
A trainer mechanism trainer : one of :py:class:`KMeansTrainer`, :py:class:`MAP_GMMTrainer`, :py:class:`ML_GMMTrainer`, :py:class:`ISVTrainer`, :py:class:`IVectorTrainer`, :py:class:`PLDATrainer`, :py:class:`EMPCATrainer`
machine : one of :py:class:`KMeansMachine`, :py:class:`GMMMachine`, :py:class:`ISVBase`, :py:class:`IVectorMachine`, :py:class:`PLDAMachine`, :py:class:`bob.learn.linear.Machine` A trainer mechanism
A container machine machine : one of :py:class:`KMeansMachine`, :py:class:`GMMMachine`, :py:class:`ISVBase`, :py:class:`IVectorMachine`, :py:class:`PLDAMachine`, :py:class:`bob.learn.linear.Machine`
data : array_like <float, 2D> A container machine
The data to be trained data : array_like <float, 2D>
max_iterations : int The data to be trained
The maximum number of iterations to train a machine max_iterations : int
convergence_threshold : float The maximum number of iterations to train a machine
The convergence threshold to train a machine. If None, the training procedure will stop with the iterations criteria convergence_threshold : float
initialize : bool The convergence threshold to train a machine. If None, the training procedure will stop with the iterations criteria
If True, runs the initialization procedure initialize : bool
rng : :py:class:`bob.core.random.mt19937` If True, runs the initialization procedure
The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loop rng : :py:class:`bob.core.random.mt19937`
""" The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loop
#Initialization check_inputs: Shallow checks in the inputs. Check for inf and NaN
if initialize: """
if rng is not None:
trainer.initialize(machine, data, rng) if check_inputs:
else: if numpy.isinf(numpy.sum(data)):
trainer.initialize(machine, data) raise ValueError("Please, check your inputs; numpy.inf detected in `data` ")
trainer.e_step(machine, data) if numpy.isnan(numpy.sum(data)):
average_output = 0 raise ValueError("Please, check your inputs; numpy.nan detected in `data` ")
average_output_previous = 0
# Initialization
if hasattr(trainer,"compute_likelihood"): if initialize:
average_output = trainer.compute_likelihood(machine) if rng is not None:
trainer.initialize(machine, data, rng)
for i in range(max_iterations): else:
logger.info("Iteration = %d/%d", i, max_iterations) trainer.initialize(machine, data)
average_output_previous = average_output
trainer.m_step(machine, data)
trainer.e_step(machine, data) trainer.e_step(machine, data)
average_output = 0
if hasattr(trainer,"compute_likelihood"): average_output_previous = 0
average_output = trainer.compute_likelihood(machine)
if hasattr(trainer, "compute_likelihood"):
if type(machine) is bob.learn.em.KMeansMachine: average_output = trainer.compute_likelihood(machine)
logger.info("average euclidean distance = %f", average_output)
else: for i in range(max_iterations):
logger.info("log likelihood = %f", average_output) logger.info("Iteration = %d/%d", i, max_iterations)
average_output_previous = average_output
convergence_value = abs((average_output_previous - average_output)/average_output_previous) trainer.m_step(machine, data)
logger.info("convergence value = %f",convergence_value) trainer.e_step(machine, data)
#Terminates if converged (and likelihood computation is set) if hasattr(trainer, "compute_likelihood"):
if convergence_threshold!=None and convergence_value <= convergence_threshold: average_output = trainer.compute_likelihood(machine)
break
if hasattr(trainer,"finalize"): if type(machine) is bob.learn.em.KMeansMachine:
trainer.finalize(machine, data) logger.info("average euclidean distance = %f", average_output)
else:
logger.info("log likelihood = %f", average_output)
convergence_value = abs((average_output_previous - average_output) / average_output_previous)
logger.info("convergence value = %f", convergence_value)
# Terminates if converged (and likelihood computation is set)
if convergence_threshold != None and convergence_value <= convergence_threshold:
break
if hasattr(trainer, "finalize"):
trainer.finalize(machine, data)
def train_jfa(trainer, jfa_base, data, max_iterations=10, initialize=True, rng=None): def train_jfa(trainer, jfa_base, data, max_iterations=10, initialize=True, rng=None):
""" """
Trains a :py:class:`bob.learn.em.JFABase` given a :py:class:`bob.learn.em.JFATrainer` and the proper data Trains a :py:class:`bob.learn.em.JFABase` given a :py:class:`bob.learn.em.JFATrainer` and the proper data
**Parameters**: **Parameters**:
trainer : :py:class:`bob.learn.em.JFATrainer` trainer : :py:class:`bob.learn.em.JFATrainer`
A JFA trainer mechanism A JFA trainer mechanism
jfa_base : :py:class:`bob.learn.em.JFABase` jfa_base : :py:class:`bob.learn.em.JFABase`
A container machine A container machine
data : [[:py:class:`bob.learn.em.GMMStats`]] data : [[:py:class:`bob.learn.em.GMMStats`]]
The data to be trained The data to be trained
max_iterations : int max_iterations : int
The maximum number of iterations to train a machine The maximum number of iterations to train a machine
initialize : bool initialize : bool
If True, runs the initialization procedure If True, runs the initialization procedure
rng : :py:class:`bob.core.random.mt19937` rng : :py:class:`bob.core.random.mt19937`
The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loops The Mersenne Twister mt19937 random generator used for the initialization of subspaces/arrays before the EM loops
""" """
if initialize: if initialize:
if rng is not None: if rng is not None:
trainer.initialize(jfa_base, data, rng) trainer.initialize(jfa_base, data, rng)
else: else:
trainer.initialize(jfa_base, data) trainer.initialize(jfa_base, data)
#V Subspace # V Subspace
logger.info("V subspace estimation...") logger.info("V subspace estimation...")
for i in range(max_iterations): for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations) logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_v(jfa_base, data) trainer.e_step_v(jfa_base, data)
trainer.m_step_v(jfa_base, data) trainer.m_step_v(jfa_base, data)
trainer.finalize_v(jfa_base, data) trainer.finalize_v(jfa_base, data)
#U subspace # U subspace
logger.info("U subspace estimation...") logger.info("U subspace estimation...")
for i in range(max_iterations): for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations) logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_u(jfa_base, data) trainer.e_step_u(jfa_base, data)
trainer.m_step_u(jfa_base, data) trainer.m_step_u(jfa_base, data)
trainer.finalize_u(jfa_base, data) trainer.finalize_u(jfa_base, data)
# D subspace # D subspace
logger.info("D subspace estimation...") logger.info("D subspace estimation...")
for i in range(max_iterations): for i in range(max_iterations):
logger.info("Iteration = %d/%d", i, max_iterations) logger.info("Iteration = %d/%d", i, max_iterations)
trainer.e_step_d(jfa_base, data) trainer.e_step_d(jfa_base, data)
trainer.m_step_d(jfa_base, data) trainer.m_step_d(jfa_base, data)
trainer.finalize_d(jfa_base, data) trainer.finalize_d(jfa_base, data)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment