Commit aa3ddc0b authored by Pavel KORSHUNOV's avatar Pavel KORSHUNOV

Extension of bob.bio.base for PAD

parents
*~
.idea
.DS_Store
*.swp
*.pyc
*.so
*.dylib
bin
eggs
parts
.installed.cfg
.mr.developer.cfg
*.egg-info
develop-eggs
sphinx
dist
.nfs*
.gdb_history
build
*.egg
src/
*.sql3
temp
results
\ No newline at end of file
This diff is collapsed.
include README.rst bootstrap-buildout.py buildout.cfg develop.cfg COPYING version.txt requirements.txt test-requirements.txt
recursive-include doc *.py *.rst
recursive-include bob/pad/base/voice/data scores-* *.wav
.. vim: set fileencoding=utf-8 :
.. Pavel Korshunov <pavel.korshunov@idiap.ch>
.. Thu 23 Jun 13:43:22 2016
=================================================
Presentation Attack Detection in Voice Biometrics
=================================================
This package is an extension to the ``bob.pad.base`` package, which provides the basic presentation attack detection (PAD) framework.
The ``bob.pad.voice`` contains additional functionality to run PAD experiments using speech databases.
Installation
------------
To install this package -- alone or together with other `Packages of Bob <https://github.com/idiap/bob/wiki/Packages>`_ -- please read the `Installation Instructions <https://github.com/idiap/bob/wiki/Installation>`_.
For Bob_ to be able to work properly, some dependent packages are required to be installed.
Please make sure that you have read the `Dependencies <https://github.com/idiap/bob/wiki/Dependencies>`_ for your operating system.
Documentation
-------------
For further documentation on this package, please read the `Documentation <http://pythonhosted.org/bob.empty.package/index.html>`_.
For a list of tutorials on this or the other packages of Bob_, or information on submitting issues, asking questions and starting discussions, please visit its website.
.. _bob: https://www.idiap.ch/software/bob
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Pavel Korshunov <pavel.korshunov@idiap.ch>
# Thu 23 Jun 11:16:22 2016
"""
The methods for the package
"""
def get_config():
"""
Returns a string containing the configuration information.
"""
import bob.extension
return bob.extension.get_config(__name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
from .gmm_algorithm import GmmAlgorithm
from .logregr_algorithm import LogRegrAlgorithm
from bob.pad.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.pad.voice")
class DummyAlgorithm (Algorithm):
"""This class is used to test all the possible functions of the tool chain, but it does basically nothing."""
def __init__(self, **kwargs):
"""Generates a test value that is read and written"""
# call base class constructor registering that this tool performs everything.
Algorithm.__init__(
self,
performs_projection = False,
requires_projector_training = False,
)
def score(self, toscore):
"""Returns the evarage value of the probe"""
logger.info("score() score %f", toscore)
return toscore
algorithm = DummyAlgorithm()
import bob.io.base
import numpy
import bob.learn.linear
import bob.learn.em
from bob.pad.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.pad.voice")
class GmmAlgorithm (Algorithm):
"""Trains Logistical Regression classifier and projects testing dat on it."""
def __init__(self,
normalize_features = False,
# parameters for the GMM
number_of_gaussians = 512,
# parameters of UBM training
kmeans_training_iterations = 25, # Maximum number of iterations for K-Means
gmm_training_iterations = 10, # Maximum number of iterations for ML GMM Training
training_threshold = 5e-4, # Threshold to end the ML training
variance_threshold = 5e-4, # Minimum value that a variance can reach
update_weights = True,
update_means = True,
update_variances = True,
responsibility_threshold = 0, # If set, the weight of a particular Gaussian will at least be greater than this threshold. In the case the real weight is lower, the prior mean value will be used to estimate the current mean and variance.
INIT_SEED = 2015,
**kwargs):
# call base class constructor registering that this tool performs everything.
Algorithm.__init__(
self,
number_of_gaussians = number_of_gaussians,
kmeans_training_iterations = kmeans_training_iterations,
gmm_training_iterations = gmm_training_iterations,
training_threshold = training_threshold,
variance_threshold = variance_threshold,
update_weights = update_weights,
update_means = update_means,
update_variances = update_variances,
responsibility_threshold = responsibility_threshold,
INIT_SEED = INIT_SEED,
# parameters of detection pipeline
performs_projection = True,
requires_projector_training = True,
use_projected_features_for_enrollment = True,
multiple_model_scoring = None,
)
# copy parameters
self.gaussians = number_of_gaussians
self.kmeans_training_iterations = kmeans_training_iterations
self.gmm_training_iterations = gmm_training_iterations
self.training_threshold = training_threshold
self.variance_threshold = variance_threshold
self.update_weights = update_weights
self.update_means = update_means
self.update_variances = update_variances
self.responsibility_threshold = responsibility_threshold
self.init_seed = INIT_SEED
self.rng = bob.core.random.mt19937(self.init_seed)
self.gmm_machine_real = None
self.gmm_machine_attack = None
self.normalize_features = normalize_features
self.kmeans_trainer = bob.learn.em.KMeansTrainer()
self.gmm_trainer = bob.learn.em.ML_GMMTrainer(self.update_means, self.update_variances, self.update_weights, self.responsibility_threshold)
def _check_feature(self, feature, machine=None, projected=False):
"""Checks that the features are appropriate."""
if not isinstance(feature, numpy.ndarray) or feature.ndim != 2 or feature.dtype != numpy.float64:
raise ValueError("The given feature is not appropriate", feature)
if self.gmm_machine_real is not None and feature.shape[1] != self.gmm_machine_real.shape[1]:
raise ValueError("The given feature is expected to have %d elements, but it has %d" % (self.gmm_machine_real.shape[1], feature.shape[1]))
if self.gmm_machine_attack is not None and feature.shape[1] != self.gmm_machine_attack.shape[1]:
raise ValueError("The given feature is expected to have %d elements, but it has %d" % (self.gmm_machine_attack.shape[1], feature.shape[1]))
return True
#######################################################
################ GMM training #########################
def train_gmm(self, array):
logger.debug(" .... Training with %d feature vectors", array.shape[0])
# Computes input size
input_size = array.shape[1]
# Creates the machines (KMeans and GMM)
logger.debug(" .... Creating machines")
kmeans_machine = bob.learn.em.KMeansMachine(self.gaussians, input_size)
gmm_machine = bob.learn.em.GMMMachine(self.gaussians, input_size)
# initialize the random generator with out one single cool seed that allows us to reproduce experiments
logger.info(" -> Init random generator with seed %d", self.init_seed)
self.rng = bob.core.random.mt19937(self.init_seed)
# Trains using the KMeansTrainer
logger.info(" -> Training K-Means")
bob.learn.em.train(self.kmeans_trainer, kmeans_machine, array, self.kmeans_training_iterations,
self.training_threshold, self.rng)
variances, weights = kmeans_machine.get_variances_and_weights_for_each_cluster(array)
means = kmeans_machine.means
# Initializes the GMM
gmm_machine.means = means
gmm_machine.variances = variances
gmm_machine.weights = weights
gmm_machine.set_variance_thresholds(self.variance_threshold)
# Trains the GMM
logger.info(" -> Training GMM")
bob.learn.em.train(self.gmm_trainer, gmm_machine, array, self.gmm_training_iterations,
self.training_threshold, self.rng)
return gmm_machine
def save_gmms(self, projector_file):
"""Save projector to file"""
# Saves the trained GMMs to file
logger.debug(" .... Saving GMM models to file '%s'", projector_file)
hdf5 = projector_file if isinstance(projector_file, bob.io.base.HDF5File) else bob.io.base.HDF5File(projector_file, 'w')
hdf5.create_group('GMMReal')
hdf5.cd('GMMReal')
self.gmm_machine_real.save(hdf5)
hdf5.cd('/')
hdf5.create_group('GMMAttack')
hdf5.cd('GMMAttack')
self.gmm_machine_attack.save(hdf5)
def train_projector(self, training_features, projector_file):
if len(training_features) < 2:
raise ValueError("Training projector: features should contain two lists: real and attack!")
logger.info(" - Training: number of real features %d", len(training_features[0]))
logger.info(" - Training: number of attack features %d", len(training_features[1]))
[self._check_feature(feature) for feature in training_features[0]]
[self._check_feature(feature) for feature in training_features[1]]
# Loads the data into an arrays
real_features = numpy.vstack(training_features[0])
attack_features = numpy.vstack(training_features[1])
print ("GmmAlgorithm:train_projector(), real_features shape:", real_features.shape)
print ("GmmAlgorithm:train_projector(), attack_features shape:", attack_features.shape)
print ("Min real ", numpy.min(real_features))
print ("Max real ", numpy.max(real_features))
print ("Min attack ", numpy.min(attack_features))
print ("Max attack ", numpy.max(attack_features))
logger.info(" -> Training GMM model with %s real training features", str(real_features.shape))
self.gmm_machine_real = self.train_gmm(real_features)
logger.info(" -> Training GMM model with %s attack training features", str(attack_features.shape))
self.gmm_machine_attack = self.train_gmm(attack_features)
self.save_gmms(projector_file)
def load_projector(self, projector_file):
# this is a hack to load pre-trained GMMs for licit and spoof protocols
if '_spoof' in projector_file:
hdf5file_spoof = bob.io.base.HDF5File(projector_file)
projector_file_licit = projector_file.replace('_spoof', '_licit')
hdf5file_licit = bob.io.base.HDF5File(projector_file_licit)
self.gmm_machine_real = bob.learn.em.GMMMachine(hdf5file_licit)
self.gmm_machine_attack = bob.learn.em.GMMMachine(hdf5file_spoof)
else: # this is a 'normal' case
hdf5file = bob.io.base.HDF5File(projector_file)
# read GMM for real data
hdf5file.cd('/GMMReal')
self.gmm_machine_real = bob.learn.em.GMMMachine(hdf5file)
# read GMM for attack data
hdf5file.cd('/GMMAttack')
self.gmm_machine_attack = bob.learn.em.GMMMachine(hdf5file)
self.gmm_machine_real.set_variance_thresholds(self.variance_threshold)
self.gmm_machine_attack.set_variance_thresholds(self.variance_threshold)
def project_feature(self, feature):
feature = numpy.asarray(feature, dtype=numpy.float64)
logger.debug(" .... Projecting %d features vector" % feature.shape[0])
# return the resulting statistics
return numpy.asarray([self.gmm_machine_real(feature), self.gmm_machine_attack(feature)], dtype=numpy.float64)
def project(self, feature):
"""project(feature) -> projected
Projects the given feature into GMM space.
**Parameters:**
feature : 1D :py:class:`numpy.ndarray`
The 1D feature to be projected.
**Returns:**
projected : 1D :py:class:`numpy.ndarray`
The ``feature`` projected into GMM space.
"""
if len(feature) > 0:
self._check_feature(feature)
return self.project_feature(feature)
else:
return numpy.zeros(1, dtype=numpy.float64)
def read_gmm_stats(self, gmm_stats_file):
"""Reads GMM stats from file."""
hdf5file = bob.io.base.HDF5File(gmm_stats_file)
# read GMM for real data
hdf5file.cd('/GMMReal')
gmm_stats_real = bob.learn.em.GMMStats(hdf5file)
# read GMM for attack data
hdf5file.cd('/GMMAttack')
gmm_stats_attack = bob.learn.em.GMMStats(hdf5file)
return [gmm_stats_real, gmm_stats_attack]
def score(self, toscore):
"""Returns the difference between log likelihoods of being real or attack"""
return [toscore[0] - toscore[1]]
def score_for_multiple_projections(self, toscore):
"""Returns the difference between log likelihoods of being real or attack"""
self.score(toscore)
algorithm = GmmAlgorithm()
import bob.io.base
import numpy
import math
import bob.learn.linear
from bob.pad.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.pad.voice")
class HistDistanceAlgorithm(Algorithm):
"""This class is used to test all the possible functions of the tool chain, but it does basically nothing."""
def __init__(self, chi_square=False, hist_intersection=True, probab_dist=False, normalize_features=True, **kwargs):
"""Generates a test value that is read and written"""
# call base class constructor registering that this tool performs everything.
Algorithm.__init__(
self,
performs_projection=True,
requires_projector_training=True,
use_projected_features_for_enrollment=True,
)
self.real_mean = None
self.attack_mean = None
self.normalize_features = normalize_features
self.hist_intersection = hist_intersection
self.chi_square = chi_square
self.probab_dist = probab_dist
def _check_feature(self, feature, mean_hist=None):
"""Checks that the features are appropriate."""
if not isinstance(feature, numpy.ndarray) or feature.ndim != 1 or feature.dtype != numpy.float64:
raise ValueError("The given feature is not appropriate", feature)
if mean_hist is not None and feature.shape[0] != mean_hist.shape[0]:
logger.warn("The given feature is expected to have %d elements, but it has %d" % (
mean_hist.shape[0], feature.shape[0]))
return False
return True
def train_projector(self, training_features, projector_file):
if len(training_features) < 2:
raise ValueError("Training projector: features should contain two lists: real and attack!")
# the format is specified in FileSelector.py:training_list() of bob.spoof.base
# print ("HistDistanceAlgorithm:train_projector(), training_features", type(training_features[0][0]))
if isinstance(training_features[0][0][0], numpy.ndarray):
print ("HistDistanceAlgorithm:train_projector(), features are set of arrays of length: ",
len(training_features[0][0][0]))
real_features = numpy.array([row for feat in training_features[0] for row in feat], dtype=numpy.float64)
attack_features = numpy.array([row for feat in training_features[1] for row in feat], dtype=numpy.float64)
else:
real_features = numpy.array(training_features[0], dtype=numpy.float64)
attack_features = numpy.array(training_features[1], dtype=numpy.float64)
# # print ("HistDistanceAlgorithm:train_projector(), real_features", real_features)
# # print ("HistDistanceAlgorithm:train_projector(), attack_features", attack_features)
# print ("HistDistanceAlgorithm:train_projector(), real_features shape:", real_features.shape)
# print ("HistDistanceAlgorithm:train_projector(), attack_features shape:", attack_features.shape)
# # real_features[real_features<-1024] = -1024
# # attack_features[attack_features<-1024] = -1024
# print ("Min real ", numpy.min(real_features))
# print ("Max real ", numpy.max(real_features))
# print ("Min attack ", numpy.min(attack_features))
# print ("Max attack ", numpy.max(attack_features))
from antispoofing.utils.ml import norm
mean = None
std = None
# normalize features column-wise
if self.normalize_features:
mean, std = norm.calc_mean_std(real_features, attack_features, nonStdZero=True)
real_features = norm.zeromean_unitvar_norm(real_features, mean, std)
attack_features = norm.zeromean_unitvar_norm(attack_features, mean, std)
# compute average histogram for each type of features
self.real_mean = numpy.mean(real_features, axis=0)
self.attack_mean = numpy.mean(attack_features, axis=0)
# print ("shape of average real", self.real_mean.shape)
# print ("(min, max) average real (%f, %f)" % (numpy.min(self.real_mean), numpy.max(self.real_mean)))
# print ("shape of average attack", self.attack_mean.shape)
# print ("(min, max) average attack (%f, %f)" % (numpy.min(self.attack_mean), numpy.max(self.attack_mean)))
from bob.pad.base import utils
# save the models to file for future use
hdf5file = bob.io.base.HDF5File(projector_file, "w")
hdf5file.set("AvHistReal", self.real_mean)
hdf5file.set("AvHistAttackl", self.attack_mean)
def load_projector(self, projector_file):
hdf5file = bob.io.base.HDF5File(projector_file)
self.real_mean = hdf5file.read("AvHistReal")
self.attack_mean = hdf5file.read("AvHistAttackl")
def hist_bin(self, hist, xi):
if xi > hist.shape[0]:
raise ValueError("The coordinate for bin value of histogram (size: %d) is %d, which is too large",
hist.shape[0], xi)
return hist[xi]
def project_feature(self, feature):
feature = numpy.asarray(feature, dtype=numpy.float64)
# here, features are lbp images, so they are different from the rest
if self.probab_dist:
if feature.shape[0]: # not empty
pprobab_real = numpy.sum([math.log(self.hist_bin(self.real_mean, xi)) for xi in feature])
pprobab_attack = numpy.sum([math.log(self.hist_bin(self.attack_mean, xi)) for xi in feature])
return numpy.array([pprobab_real - pprobab_attack], dtype=numpy.float64)
if self._check_feature(feature, self.real_mean):
import bob.math
# Find the distance from the feature-histogram and the average models
if self.chi_square:
dist_real = bob.math.chi_square(self.real_mean, feature)
dist_attack = bob.math.chi_square(self.attack_mean, feature)
elif self.hist_intersection:
dist_real = bob.math.histogram_intersection(self.real_mean, feature)
dist_attack = bob.math.histogram_intersection(self.attack_mean, feature)
else:
raise ValueError("HistDistanceAlgorithm: please specify the metric for histogram distance")
# print ("HistDistanceAlgorithm:project(), projection: ", projection)
return numpy.array([dist_real, dist_attack], dtype=numpy.float64)
# return self.machine(feature)
return numpy.zeros(2, dtype=numpy.float64)
def project(self, feature):
"""project(feature) -> projected
Projects the given feature into Fisher space.
**Parameters:**
feature : 1D :py:class:`numpy.ndarray`
The 1D feature to be projected.
**Returns:**
projected : 1D :py:class:`numpy.ndarray`
The ``feature`` projected into Fisher space.
"""
print ("HistDistanceAlgorithm:project(), feature shape: ", feature.shape)
if len(feature) > 0:
if isinstance(feature[0], numpy.ndarray) or isinstance(feature[0], list):
return [self.project_feature(feat) for feat in feature]
else:
return self.project_feature(feature)
else:
return numpy.zeros(1, dtype=numpy.float64)
def score(self, toscore):
"""Returns the evarage value of the probe"""
print("HistDistanceAlgorithm:score() the score: ", toscore)
# projection is already the score in this case
if self.probab_dist:
return toscore
dist_real = toscore[0]
dist_attack = toscore[1]
if self.chi_square:
# chi-square distance to attack is smaller if it is nearer the attack mean
# so, attack features have negative scores and real - positive
return [dist_attack - dist_real]
elif self.hist_intersection:
# the situation with histogram intersection metrics is reversed compared to chi-square
# histogram intersection is similarity measure, the higher value the closeer it is
# attack features have negative scores and real - positive scores
return [dist_real - dist_attack]
else:
raise ValueError("HistDistanceAlgorithm:scoring() please specify the metric for histogram distance")
def score_for_multiple_projections(self, toscore):
print("HistDistanceAlgorithm:score_for_multiple_projections() the score: ", len(toscore))
return numpy.array([self.score(score) for score in toscore], dtype=numpy.float64)
algorithm = HistDistanceAlgorithm()
import bob.io.base
import numpy
import bob.learn.linear
from bob.pad.base.algorithm import Algorithm
import logging
logger = logging.getLogger("bob.pad.voice")
class LogRegrAlgorithm(Algorithm):
"""Trains Logistical Regression classifier and projects testing dat on it."""
def __init__(self, use_PCA_training=False, normalize_features=False, **kwargs):
# call base class constructor registering that this tool performs everything.
Algorithm.__init__(
self,
performs_projection=True,
requires_projector_training=True,
use_projected_features_for_enrollment=True,
)
self.machine = None
self.pca_machine = None
self.use_PCA_training = use_PCA_training
self.normalize_features = normalize_features
def _check_feature(self, feature, machine=None, projected=False):
"""Checks that the features are appropriate."""
if not isinstance(feature, numpy.ndarray) or feature.ndim != 1 or feature.dtype != numpy.float64:
raise ValueError("The given feature is not appropriate", feature)
index = 1 if projected else 0
if machine is not None and feature.shape[0] != machine.shape[index]:
logger.warn("The given feature is expected to have %d elements, but it has %d" % (
machine.shape[index], feature.shape[0]))
return False
return True
def train_projector(self, training_features, projector_file):
if len(training_features) < 2:
raise ValueError("Training projector: features should contain two lists: real and attack!")
# the format is specified in FileSelector.py:training_list() of bob.spoof.base
logger.info(" - Training: number of real features %d", len(training_features[0]))
# print (training_features[0])
if isinstance(training_features[0][0][0], numpy.ndarray):
logger.info(" - Training: each feature is a set of arrays")
real_features = numpy.array(
[row if self._check_feature(row) else numpy.nan for feat in training_features[0] for row in feat],
dtype=numpy.float64)
attack_features = numpy.array(
[row if self._check_feature(row) else numpy.nan for feat in training_features[1] for row in feat],
dtype=numpy.float64)
else:
logger.info(" - Training: each feature is a single array")
real_features = numpy.array(
[feat if self._check_feature(feat) else numpy.nan for feat in training_features[0]],
dtype=numpy.float64)
attack_features = numpy.array(
[feat if self._check_feature(feat) else numpy.nan for feat in training_features[1]],
dtype=numpy.float64)
# print ("LogRegrAlgorithm:train_projector(), real_features shape:", real_features.shape)
# print ("LogRegrAlgorithm:train_projector(), attack_features shape:", attack_features.shape)
# print ("Min real ", numpy.min(real_features))
# print ("Max real ", numpy.max(real_features))
# print ("Min attack ", numpy.min(attack_features))
# print ("Max attack ", numpy.max(attack_features))
# save the trained model to file for future use
hdf5file = bob.io.base.HDF5File(projector_file, "w")
from antispoofing.utils.ml import norm
mean = None
std = None
# reduce the feature space using PCA
if self.use_PCA_training or self.normalize_features:
mean, std = norm.calc_mean_std(real_features, attack_features, nonStdZero=True)
real_features = norm.zeromean_unitvar_norm(real_features, mean, std)
attack_features = norm.zeromean_unitvar_norm(attack_features, mean, std)
if self.use_PCA_training:
pca_trainer = bob.learn.linear.PCATrainer()
self.pca_machine, eigenvalues = pca_trainer.train(numpy.vstack((real_features, attack_features)))
# select only meaningful weights
cummulated = numpy.cumsum(eigenvalues) / numpy.sum(eigenvalues)
for index in range(len(cummulated)):
if cummulated[index] > 0.99: # variance
subspace_dimension = index
break
subspace_dimension = index
# save the PCA matrix
self.pca_machine.resize(self.pca_machine.shape[0], subspace_dimension)
if mean is not None and std is not None:
self.pca_machine.input_subtract = mean