Commit 54a16bc3 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Add a new one class GMM based on bob's GMMs

parent a4f96c14
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 28 16:47:47 2017
@author: Olegs Nikisins
"""
# ==============================================================================
# Import what is needed here:
from bob.pad.base.algorithm import Algorithm
from bob.bio.video.utils import FrameContainer
import numpy as np
import bob.io.base
from bob.pad.base.algorithm import Algorithm
from bob.pad.base.utils import convert_frame_cont_to_array, mean_std_normalize, convert_and_prepare_features
from sklearn import mixture
import bob.io.base
import logging
import numpy as np
from bob.pad.base.utils import convert_frame_cont_to_array, mean_std_normalize, convert_and_prepare_features
logger = logging.getLogger(__name__)
# ==============================================================================
# Main body :
......@@ -44,7 +41,7 @@ class OneClassGMM(Algorithm):
``random_state`` : :py:class:`int`
A seed for the random number generator used in the initialization of
the OneClassGMM. Default: 7 .
the OneClassGMM. Default: 3 .
``frame_level_scores_flag`` : :py:class:`bool`
Return scores for each frame individually if True. Otherwise, return a
......@@ -54,7 +51,10 @@ class OneClassGMM(Algorithm):
def __init__(self,
n_components=1,
random_state=3,
frame_level_scores_flag=False):
frame_level_scores_flag=False,
covariance_type='full',
reg_covar=1e-06,
):
Algorithm.__init__(
self,
......@@ -65,15 +65,13 @@ class OneClassGMM(Algorithm):
requires_projector_training=True)
self.n_components = n_components
self.random_state = random_state
self.frame_level_scores_flag = frame_level_scores_flag
self.covariance_type = covariance_type
self.reg_covar = reg_covar
self.machine = None # this argument will be updated with pretrained OneClassGMM machine
self.features_mean = None # this argument will be updated with features mean
self.features_std = None # this argument will be updated with features std
# names of the arguments of the pretrained OneClassGMM machine to be saved/loaded to/from HDF5 file:
......@@ -84,7 +82,7 @@ class OneClassGMM(Algorithm):
]
# ==========================================================================
def train_gmm(self, real, n_components, random_state):
def train_gmm(self, real):
"""
Train OneClassGMM classifier given real class. Prior to the training the data is
mean-std normalized.
......@@ -94,13 +92,6 @@ class OneClassGMM(Algorithm):
``real`` : 2D :py:class:`numpy.ndarray`
Training features for the real class.
``n_components`` : :py:class:`int`
Number of Gaussians in the OneClassGMM. Default: 1 .
``random_state`` : :py:class:`int`
A seed for the random number generator used in the initialization of
the OneClassGMM. Default: 7 .
**Returns:**
``machine`` : object
......@@ -113,16 +104,41 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
features_norm, features_mean, features_std = mean_std_normalize(
real)
# real is now mean-std normalized
features_norm, features_mean, features_std = mean_std_normalize(real, copy=False)
if isinstance(self.n_components, (tuple, list)) or isinstance(self.covariance_type, (tuple, list)):
# perform grid search on covariance_type and n_components
n_components = self.n_components if isinstance(self.n_components, (tuple, list)) else [self.n_components]
covariance_type = self.covariance_type if isinstance(self.covariance_type, (tuple, list)) else [self.covariance_type]
logger.info("Performing grid search for GMM on covariance_type: %s and n_components: %s", self.covariance_type, self.n_components)
bic = []
lowest_bic = np.infty
for cv_type in covariance_type:
for nc in n_components:
logger.info("Testing for n_components: %s, covariance_type: %s", nc, cv_type)
gmm = mixture.GaussianMixture(
n_components=nc, covariance_type=cv_type,
reg_covar=self.reg_covar)
try:
gmm.fit(features_norm)
except Exception:
logger.warn("Failed to train current GMM", exc_info=True)
continue
bic.append(gmm.bic(features_norm))
if bic[-1] < lowest_bic:
lowest_bic = bic[-1]
logger.info("Best parameters so far: nc %s, cv_type: %s", nc, cv_type)
machine = gmm
machine = mixture.GaussianMixture(
n_components=n_components,
random_state=random_state,
covariance_type='full')
else:
machine = mixture.GaussianMixture(
n_components=self.n_components,
random_state=self.random_state,
covariance_type=self.covariance_type,
reg_covar=self.reg_covar)
machine.fit(features_norm)
machine.fit(features_norm)
return machine, features_mean, features_std
......@@ -150,19 +166,17 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
f = bob.io.base.HDF5File(projector_file,
'w') # open hdf5 file to save to
# open hdf5 file to save to
with bob.io.base.HDF5File(projector_file, 'w') as f:
for key in self.gmm_param_keys:
data = getattr(machine, key)
for key in self.gmm_param_keys:
data = getattr(machine, key)
f.set(key, data)
f.set(key, data)
f.set("features_mean", features_mean)
f.set("features_mean", features_mean)
f.set("features_std", features_std)
del f
f.set("features_std", features_std)
# ==========================================================================
def train_projector(self, training_features, projector_file):
......@@ -183,18 +197,16 @@ class OneClassGMM(Algorithm):
``bob.pad.base`` framework.
"""
del training_features[1]
# training_features[0] - training features for the REAL class.
real = convert_and_prepare_features(
training_features[0]) # output is array
real = convert_and_prepare_features(training_features[0], dtype=None)
del training_features[0]
# training_features[1] - training features for the ATTACK class.
# attack = self.convert_and_prepare_features(training_features[1]) # output is array
# Train the OneClassGMM machine and get normalizers:
machine, features_mean, features_std = self.train_gmm(
real=real,
n_components=self.n_components,
random_state=self.random_state)
machine, features_mean, features_std = self.train_gmm(real=real)
# Save the GNN machine and normalizers:
self.save_gmm_machine_and_mean_std(projector_file, machine,
......@@ -224,23 +236,19 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
f = bob.io.base.HDF5File(projector_file,
'r') # file to read the machine from
# initialize the machine:
machine = mixture.GaussianMixture()
# set the params of the machine:
for key in self.gmm_param_keys:
data = f.read(key)
# file to read the machine from
with bob.io.base.HDF5File(projector_file, 'r') as f:
setattr(machine, key, data)
# initialize the machine:
machine = mixture.GaussianMixture()
features_mean = f.read("features_mean")
# set the params of the machine:
for key in self.gmm_param_keys:
data = f.read(key)
setattr(machine, key, data)
features_std = f.read("features_std")
del f
features_mean = f.read("features_mean")
features_std = f.read("features_std")
return machine, features_mean, features_std
......@@ -272,9 +280,7 @@ class OneClassGMM(Algorithm):
projector_file)
self.machine = machine
self.features_mean = features_mean
self.features_std = features_std
# ==========================================================================
......@@ -320,7 +326,7 @@ class OneClassGMM(Algorithm):
features_array = feature
features_array_norm, _, _ = mean_std_normalize(
features_array, self.features_mean, self.features_std)
features_array, self.features_mean, self.features_std, copy=False)
scores = self.machine.score_samples(features_array_norm)
......
# -*- coding: utf-8 -*-
# @author: Amir Mohammadi
from bob.pad.base.algorithm import Algorithm
from bob.pad.base.utils import convert_and_prepare_features
from bob.bio.gmm.algorithm import GMM
import logging
import numpy as np
from collections.abc import Iterable
from multiprocessing import cpu_count
logger = logging.getLogger(__name__)
def bic(trainer, machine, X):
"""Bayesian information criterion for the current model on the input X.
Parameters
----------
X : array of shape (n_samples, n_dimensions)
Returns
-------
bic : float
The lower the better.
"""
log_likelihood = trainer.compute_likelihood(machine)
n_parameters = (
machine.means.size + machine.variances.size + len(machine.weights) - 1
)
return -2 * log_likelihood * X.shape[0] + n_parameters * np.log(X.shape[0])
class OneClassGMM2(Algorithm):
"""A one class GMM implementation based on Bob's GMM implementation which is more
stable than scikit-learn's one."""
def __init__(
self,
# parameters for the GMM
number_of_gaussians,
# parameters of UBM training
kmeans_training_iterations=25, # Maximum number of iterations for K-Means
gmm_training_iterations=25, # Maximum number of iterations for ML GMM Training
training_threshold=5e-4, # Threshold to end the ML training
variance_threshold=5e-4, # Minimum value that a variance can reach
update_weights=True,
update_means=True,
update_variances=True,
n_threads=cpu_count(),
**kwargs
):
kwargs.setdefault("performs_projection", True)
kwargs.setdefault("requires_projector_training", True)
super().__init__(**kwargs)
self.gmm_alg = GMM(
number_of_gaussians=number_of_gaussians,
kmeans_training_iterations=kmeans_training_iterations,
gmm_training_iterations=gmm_training_iterations,
training_threshold=training_threshold,
variance_threshold=variance_threshold,
update_weights=update_weights,
update_means=update_means,
update_variances=update_variances,
n_threads=n_threads,
)
self.number_of_gaussians = number_of_gaussians
def train_projector(self, training_features, projector_file):
del training_features[1]
real = convert_and_prepare_features(training_features[0], dtype="float64")
del training_features[0]
if isinstance(self.number_of_gaussians, Iterable):
logger.info(
"Performing grid search for GMM on number_of_gaussians: %s",
self.number_of_gaussians,
)
lowest_bic = np.infty
best_n_gaussians = None
for nc in self.number_of_gaussians:
logger.info("Testing for number_of_gaussians: %s", nc)
self.gmm_alg.gaussians = nc
self.gmm_alg.train_ubm(real)
bic_ = bic(self.gmm_alg.ubm_trainer, self.gmm_alg.ubm, real)
logger.info("BIC for number_of_gaussians: %s is %s", nc, bic_)
if bic_ < lowest_bic:
gmm = self.gmm_alg.ubm
lowest_bic = bic_
best_n_gaussians = nc
logger.info("Best parameters so far: number_of_gaussians %s", nc)
assert best_n_gaussians is not None
self.gmm_alg.gaussians = best_n_gaussians
else:
self.gmm_alg.train_ubm(real)
gmm = self.gmm_alg.ubm
self.gmm_alg.ubm = gmm
self.gmm_alg.save_ubm(projector_file)
def load_projector(self, projector_file):
self.gmm_alg.load_ubm(projector_file)
def project(self, feature):
feature = convert_and_prepare_features([feature], dtype="float64")[0]
return self.gmm_alg.ubm(feature)
def score(self, toscore):
return [toscore]
from bob.pad.base.algorithm import Algorithm
import numpy
class Predictions(Algorithm):
......@@ -6,9 +7,31 @@ class Predictions(Algorithm):
scoring."""
def __init__(self, **kwargs):
super(Predictions, self).__init__(
**kwargs)
super(Predictions, self).__init__(**kwargs)
def score(self, predictions):
predictions = numpy.asarray(predictions)
if predictions.size == 1:
# output of a sigmoid binary layer
return predictions
# Assuming the predictions are the output of a softmax layer
return [predictions[1]]
class VideoPredictions(Algorithm):
"""An algorithm that takes the precomputed predictions and uses them for
scoring."""
def __init__(self, axis=1, frame_level_scoring=False, **kwargs):
super(VideoPredictions, self).__init__(**kwargs)
self.frame_level_scoring = frame_level_scoring
self.axis = axis
def score(self, predictions):
# Assuming the predictions are the output of a softmax layer
predictions = predictions.as_array()[:, self.axis]
if self.frame_level_scoring:
return predictions
else:
return [numpy.mean(predictions)]
from .Algorithm import Algorithm
from .SVM import SVM
from .OneClassGMM import OneClassGMM
from .OneClassGMM2 import OneClassGMM2
from .LogRegr import LogRegr
from .SVMCascadePCA import SVMCascadePCA
from .Predictions import Predictions
from .Predictions import Predictions, VideoPredictions
from .MLP import MLP
from .PadLDA import PadLDA
......@@ -31,9 +32,11 @@ __appropriate__(
Algorithm,
SVM,
OneClassGMM,
OneClassGMM2,
LogRegr,
SVMCascadePCA,
Predictions,
VideoPredictions,
MLP,
PadLDA
)
......
......@@ -7,9 +7,12 @@ import logging
import math
import os
import yaml
from bob.bio.base.score.load import split
from bob.bio.base.score.load import load_score, get_negatives_positives
from bob.extension.scripts.click_helper import (
verbosity_option, bool_option, log_parameters)
verbosity_option,
bool_option,
log_parameters,
)
from bob.measure import eer_threshold, farfrr
from bob.measure.script import common_options
from bob.measure.utils import get_fta
......@@ -19,40 +22,96 @@ from tabulate import tabulate
logger = logging.getLogger(__name__)
@click.command(epilog='''\b
@click.command(
epilog="""\b
Examples:
$ bin/bob pad cross 'results/{{ evaluation.database }}/{{ algorithm }}/{{ evaluation.protocol }}/scores/scores-{{ group }}' \
-td replaymobile -d replaymobile -p grandtest -d oulunpu -p Protocol_1 \
-a replaymobile_frame-diff-svm \
-a replaymobile_qm-svm-64 \
-a replaymobile_lbp-svm-64 \
-td replaymobile \
-d replaymobile -p grandtest \
-d oulunpu -p Protocol_1 \
-a replaymobile_grandtest_frame-diff-svm \
-a replaymobile_grandtest_qm-svm-64 \
-a replaymobile_grandtest_lbp-svm-64 \
> replaymobile.rst &
''')
@click.argument('score_jinja_template')
@click.option('-d', '--database', 'databases', multiple=True, required=True,
show_default=True,
help='Names of the evaluation databases')
@click.option('-p', '--protocol', 'protocols', multiple=True, required=True,
show_default=True,
help='Names of the protocols of the evaluation databases')
@click.option('-a', '--algorithm', 'algorithms', multiple=True, required=True,
show_default=True,
help='Names of the algorithms')
@click.option('-n', '--names', type=click.File('r'),
help='Name of algorithms to show in the table. Provide a path '
'to a json file maps algorithm names to names that you want to '
'see in the table.')
@click.option('-td', '--train-database', required=True,
help='The database that was used to train the algorithms.')
@click.option('-g', '--group', 'groups', multiple=True, show_default=True,
default=['train', 'dev', 'eval'])
@bool_option('sort', 's', 'whether the table should be sorted.', True)
"""
)
@click.argument("score_jinja_template")
@click.option(
"-d",
"--database",
"databases",
multiple=True,
required=True,
show_default=True,
help="Names of the evaluation databases",
)
@click.option(
"-p",
"--protocol",
"protocols",
multiple=True,
required=True,
show_default=True,
help="Names of the protocols of the evaluation databases",
)
@click.option(
"-a",
"--algorithm",
"algorithms",
multiple=True,
required=True,
show_default=True,
help="Names of the algorithms",
)
@click.option(
"-n",
"--names",
type=click.File("r"),
help="Name of algorithms to show in the table. Provide a path "
"to a json file maps algorithm names to names that you want to "
"see in the table.",
)
@click.option(
"-td",
"--train-database",
required=True,
help="The database that was used to train the algorithms.",
)
@click.option(
"-pn",
"--pai-names",
type=click.File("r"),
help="Name of PAIs to compute the errors per PAI. Provide a path "
"to a json file maps attack_type in scores to PAIs that you want to "
"see in the table.",
)
@click.option(
"-g",
"--group",
"groups",
multiple=True,
show_default=True,
default=["train", "dev", "eval"],
)
@bool_option("sort", "s", "whether the table should be sorted.", True)
@common_options.table_option()
@common_options.output_log_metric_option()
@verbosity_option()
@click.pass_context
def cross(ctx, score_jinja_template, databases, protocols, algorithms,
names, train_database, groups, sort, **kwargs):
def cross(
ctx,
score_jinja_template,
databases,
protocols,
algorithms,
names,
train_database,
pai_names,
groups,
sort,
verbose,
**kwargs
):
"""Cross-db analysis metrics
"""
log_parameters(logger)
......@@ -62,10 +121,12 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
env = jinja2.Environment(undefined=jinja2.StrictUndefined)
data = {
'evaluation': [{'database': db, 'protocol': proto}
for db, proto in zip(databases, protocols)],
'algorithm': algorithms,
'group': groups,
"evaluation": [
{"database": db, "protocol": proto}
for db, proto in zip(databases, protocols)
],
"algorithm": algorithms,
"group": groups,
}
metrics = {}
......@@ -74,27 +135,30 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
logger.debug(variables)
score_path = env.from_string(score_jinja_template).render(variables)
logger.debug(score_path)
logger.info(score_path)
database, protocol, algorithm, group = \
variables['evaluation']['database'], \
variables['evaluation']['protocol'], \
variables['algorithm'], variables['group']
database, protocol, algorithm, group = (
variables["evaluation"]["database"],
variables["evaluation"]["protocol"],
variables["algorithm"],
variables["group"],
)
# if algorithm name does not have train_database name in it.
if train_database not in algorithm and database != train_database:
score_path = score_path.replace(
algorithm, database + '_' + algorithm)
score_path = score_path.replace(algorithm, database + "_" + algorithm)
logger.info("Score path changed to: %s", score_path)
if not os.path.exists(score_path):
metrics[(database, protocol, algorithm, group)] = \
(float('nan'), ) * 5
metrics[(database, protocol, algorithm, group)] = (float("nan"),) * 5
continue
(neg, pos), fta = get_fta(split(score_path))
scores = load_score(score_path)
neg, pos = get_negatives_positives(scores)
(neg, pos), fta = get_fta((neg, pos))
if group == 'eval':
threshold = metrics[(database, protocol, algorithm, 'dev')][1]
if group == "eval":
threshold = metrics[(database, protocol, algorithm, "dev")][1]
else:
try:
threshold = eer_threshold(neg, pos)
......@@ -105,10 +169,15 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
far, frr = farfrr(neg, pos, threshold)
hter = (far + frr) / 2
metrics[(database, protocol, algorithm, group)] = \
(hter, threshold, fta, far, frr)
metrics[(database, protocol, algorithm, group)] = (
hter,
threshold,
fta,
far,
frr,
)
logger.debug('metrics: %s', metrics)
logger.debug("metrics: %s", metrics)
headers = ["Algorithms"]
for db in databases:
......@@ -121,31 +190,39 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
def sort_key(alg):
r = []
for grp in ('eval', 'dev', 'train'):
for grp in ("eval", "dev", "train"):
hter = metrics[(train_database, train_protocol, alg, group)][0]
r.append(1 if math.isnan(hter) else hter)
return tuple(r)
algorithms = sorted(algorithms, key=sort_key)
for algorithm in algorithms:
name = algorithm.replace(train_database + '_', '')
name = name.replace(train_protocol + '_', '')
name = algorithm.replace(train_database + "_", "")
name = name.replace(train_protocol + "_", "")
name = names.get(name, name)
rows.append([name])
for database, protocol in zip(databases, protocols):
cell = []
for group in groups:
hter, threshold, fta, far, frr = metrics[(
database, protocol, algorithm, group)]
if group == 'eval':
hter, threshold, fta, far, frr = metrics[
(database, protocol, algorithm, group)
]
if group == "eval":
cell += [far, frr, hter]
else:
cell += [hter]
cell = [round(c * 100, 1) for c in cell]
rows[-1].extend(cell)
title = ' Trained on {} '.format(train_database)
title_line = '\n' + '=' * len(title) + '\n'
click.echo(title_line + title + title_line, file=ctx.meta['log'])
click.echo(tabulate(rows, headers, ctx.meta['tablefmt'], floatfmt=".1f"),
file=ctx.meta['log'])
title = " Trained on {} ".format(train_database)
title_line = "\n" + "=" * len(title) + "\n"
# open log file for writing if any
ctx.meta["log"] = (
ctx.meta["log"] if ctx.meta["log"] is None else open(ctx.meta["log"], "w")
)
click.echo(title_line + title + title_line, file=ctx.meta["log"])
click.echo(
tabulate(rows, headers, ctx.meta["tablefmt"], floatfmt=".1f"),
file=ctx.meta["log"],
)
......@@ -42,9 +42,6 @@ def test_padfile_to_label():
def test_video_svm_pad_algorithm():
"""
Test the SVM PAD algorithm.
"""
random.seed(7)
......@@ -107,9 +104,6 @@ def test_video_svm_pad_algorithm():
def test_video_gmm_pad_algorithm():
"""
Test the OneClassGMM PAD algorithm.
"""
random.seed(7)
......@@ -144,9 +138,7 @@ def test_video_gmm_pad_algorithm():
# Train the OneClassGMM machine and get normalizers:
machine, features_mean, features_std = algorithm.train_gmm(
real=real_array_converted,
n_components=algorithm.n_components,
random_state=algorithm.random_state)
real=real_array_converted)
algorithm.machine = machine
......@@ -178,9 +170,6 @@ def test_convert_list_of_frame_cont_to_array():
def test_MLP():
"""
Test the MLP PAD algorithm.
"""
random.seed(7)
......@@ -209,9 +198,6 @@ def test_MLP():