Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
bob
bob.pad.base
Commits
fef91a20
Commit
fef91a20
authored
Jun 07, 2019
by
Amir MOHAMMADI
Browse files
Merge branch 'one-class-gmm' into 'master'
Add a new one class GMM based on bob's GMMs See merge request
!62
parents
a4f96c14
54a16bc3
Pipeline
#30789
failed with stages
in 12 minutes and 29 seconds
Changes
9
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
bob/pad/base/algorithm/OneClassGMM.py
View file @
fef91a20
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 28 16:47:47 2017
@author: Olegs Nikisins
"""
# ==============================================================================
# Import what is needed here:
from
bob.pad.base.algorithm
import
Algorithm
from
bob.bio.video.utils
import
FrameContainer
import
numpy
as
np
import
bob.io.base
from
bob.pad.base.algorithm
import
Algorithm
from
bob.pad.base.utils
import
convert_frame_cont_to_array
,
mean_std_normalize
,
convert_and_prepare_features
from
sklearn
import
mixture
import
bob.io.base
import
logging
import
numpy
as
np
from
bob.pad.base.utils
import
convert_frame_cont_to_array
,
mean_std_normalize
,
convert_and_prepare_features
logger
=
logging
.
getLogger
(
__name__
)
# ==============================================================================
# Main body :
...
...
@@ -44,7 +41,7 @@ class OneClassGMM(Algorithm):
``random_state`` : :py:class:`int`
A seed for the random number generator used in the initialization of
the OneClassGMM. Default:
7
.
the OneClassGMM. Default:
3
.
``frame_level_scores_flag`` : :py:class:`bool`
Return scores for each frame individually if True. Otherwise, return a
...
...
@@ -54,7 +51,10 @@ class OneClassGMM(Algorithm):
def
__init__
(
self
,
n_components
=
1
,
random_state
=
3
,
frame_level_scores_flag
=
False
):
frame_level_scores_flag
=
False
,
covariance_type
=
'full'
,
reg_covar
=
1e-06
,
):
Algorithm
.
__init__
(
self
,
...
...
@@ -65,15 +65,13 @@ class OneClassGMM(Algorithm):
requires_projector_training
=
True
)
self
.
n_components
=
n_components
self
.
random_state
=
random_state
self
.
frame_level_scores_flag
=
frame_level_scores_flag
self
.
covariance_type
=
covariance_type
self
.
reg_covar
=
reg_covar
self
.
machine
=
None
# this argument will be updated with pretrained OneClassGMM machine
self
.
features_mean
=
None
# this argument will be updated with features mean
self
.
features_std
=
None
# this argument will be updated with features std
# names of the arguments of the pretrained OneClassGMM machine to be saved/loaded to/from HDF5 file:
...
...
@@ -84,7 +82,7 @@ class OneClassGMM(Algorithm):
]
# ==========================================================================
def
train_gmm
(
self
,
real
,
n_components
,
random_state
):
def
train_gmm
(
self
,
real
):
"""
Train OneClassGMM classifier given real class. Prior to the training the data is
mean-std normalized.
...
...
@@ -94,13 +92,6 @@ class OneClassGMM(Algorithm):
``real`` : 2D :py:class:`numpy.ndarray`
Training features for the real class.
``n_components`` : :py:class:`int`
Number of Gaussians in the OneClassGMM. Default: 1 .
``random_state`` : :py:class:`int`
A seed for the random number generator used in the initialization of
the OneClassGMM. Default: 7 .
**Returns:**
``machine`` : object
...
...
@@ -113,16 +104,41 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
features_norm
,
features_mean
,
features_std
=
mean_std_normalize
(
real
)
# real is now mean-std normalized
features_norm
,
features_mean
,
features_std
=
mean_std_normalize
(
real
,
copy
=
False
)
if
isinstance
(
self
.
n_components
,
(
tuple
,
list
))
or
isinstance
(
self
.
covariance_type
,
(
tuple
,
list
)):
# perform grid search on covariance_type and n_components
n_components
=
self
.
n_components
if
isinstance
(
self
.
n_components
,
(
tuple
,
list
))
else
[
self
.
n_components
]
covariance_type
=
self
.
covariance_type
if
isinstance
(
self
.
covariance_type
,
(
tuple
,
list
))
else
[
self
.
covariance_type
]
logger
.
info
(
"Performing grid search for GMM on covariance_type: %s and n_components: %s"
,
self
.
covariance_type
,
self
.
n_components
)
bic
=
[]
lowest_bic
=
np
.
infty
for
cv_type
in
covariance_type
:
for
nc
in
n_components
:
logger
.
info
(
"Testing for n_components: %s, covariance_type: %s"
,
nc
,
cv_type
)
gmm
=
mixture
.
GaussianMixture
(
n_components
=
nc
,
covariance_type
=
cv_type
,
reg_covar
=
self
.
reg_covar
)
try
:
gmm
.
fit
(
features_norm
)
except
Exception
:
logger
.
warn
(
"Failed to train current GMM"
,
exc_info
=
True
)
continue
bic
.
append
(
gmm
.
bic
(
features_norm
))
if
bic
[
-
1
]
<
lowest_bic
:
lowest_bic
=
bic
[
-
1
]
logger
.
info
(
"Best parameters so far: nc %s, cv_type: %s"
,
nc
,
cv_type
)
machine
=
gmm
machine
=
mixture
.
GaussianMixture
(
n_components
=
n_components
,
random_state
=
random_state
,
covariance_type
=
'full'
)
else
:
machine
=
mixture
.
GaussianMixture
(
n_components
=
self
.
n_components
,
random_state
=
self
.
random_state
,
covariance_type
=
self
.
covariance_type
,
reg_covar
=
self
.
reg_covar
)
machine
.
fit
(
features_norm
)
machine
.
fit
(
features_norm
)
return
machine
,
features_mean
,
features_std
...
...
@@ -150,19 +166,17 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
f
=
bob
.
io
.
base
.
HDF5File
(
projector_file
,
'w'
)
# open hdf5 file to save to
# open hdf5 file to save to
with
bob
.
io
.
base
.
HDF5File
(
projector_file
,
'w'
)
as
f
:
for
key
in
self
.
gmm_param_keys
:
data
=
getattr
(
machine
,
key
)
for
key
in
self
.
gmm_param_keys
:
data
=
getattr
(
machine
,
key
)
f
.
set
(
key
,
data
)
f
.
set
(
key
,
data
)
f
.
set
(
"features_mean"
,
features_mean
)
f
.
set
(
"features_mean"
,
features_mean
)
f
.
set
(
"features_std"
,
features_std
)
del
f
f
.
set
(
"features_std"
,
features_std
)
# ==========================================================================
def
train_projector
(
self
,
training_features
,
projector_file
):
...
...
@@ -183,18 +197,16 @@ class OneClassGMM(Algorithm):
``bob.pad.base`` framework.
"""
del
training_features
[
1
]
# training_features[0] - training features for the REAL class.
real
=
convert_and_prepare_features
(
training_features
[
0
]
)
# output is array
real
=
convert_and_prepare_features
(
training_features
[
0
],
dtype
=
None
)
del
training_features
[
0
]
# training_features[1] - training features for the ATTACK class.
# attack = self.convert_and_prepare_features(training_features[1]) # output is array
# Train the OneClassGMM machine and get normalizers:
machine
,
features_mean
,
features_std
=
self
.
train_gmm
(
real
=
real
,
n_components
=
self
.
n_components
,
random_state
=
self
.
random_state
)
machine
,
features_mean
,
features_std
=
self
.
train_gmm
(
real
=
real
)
# Save the GNN machine and normalizers:
self
.
save_gmm_machine_and_mean_std
(
projector_file
,
machine
,
...
...
@@ -224,23 +236,19 @@ class OneClassGMM(Algorithm):
Standart deviation of the features.
"""
f
=
bob
.
io
.
base
.
HDF5File
(
projector_file
,
'r'
)
# file to read the machine from
# initialize the machine:
machine
=
mixture
.
GaussianMixture
()
# set the params of the machine:
for
key
in
self
.
gmm_param_keys
:
data
=
f
.
read
(
key
)
# file to read the machine from
with
bob
.
io
.
base
.
HDF5File
(
projector_file
,
'r'
)
as
f
:
setattr
(
machine
,
key
,
data
)
# initialize the machine:
machine
=
mixture
.
GaussianMixture
()
features_mean
=
f
.
read
(
"features_mean"
)
# set the params of the machine:
for
key
in
self
.
gmm_param_keys
:
data
=
f
.
read
(
key
)
setattr
(
machine
,
key
,
data
)
features_std
=
f
.
read
(
"features_std"
)
del
f
features_mean
=
f
.
read
(
"features_mean"
)
features_std
=
f
.
read
(
"features_std"
)
return
machine
,
features_mean
,
features_std
...
...
@@ -272,9 +280,7 @@ class OneClassGMM(Algorithm):
projector_file
)
self
.
machine
=
machine
self
.
features_mean
=
features_mean
self
.
features_std
=
features_std
# ==========================================================================
...
...
@@ -320,7 +326,7 @@ class OneClassGMM(Algorithm):
features_array
=
feature
features_array_norm
,
_
,
_
=
mean_std_normalize
(
features_array
,
self
.
features_mean
,
self
.
features_std
)
features_array
,
self
.
features_mean
,
self
.
features_std
,
copy
=
False
)
scores
=
self
.
machine
.
score_samples
(
features_array_norm
)
...
...
bob/pad/base/algorithm/OneClassGMM2.py
0 → 100644
View file @
fef91a20
# -*- coding: utf-8 -*-
# @author: Amir Mohammadi
from
bob.pad.base.algorithm
import
Algorithm
from
bob.pad.base.utils
import
convert_and_prepare_features
from
bob.bio.gmm.algorithm
import
GMM
import
logging
import
numpy
as
np
from
collections.abc
import
Iterable
from
multiprocessing
import
cpu_count
logger
=
logging
.
getLogger
(
__name__
)
def
bic
(
trainer
,
machine
,
X
):
"""Bayesian information criterion for the current model on the input X.
Parameters
----------
X : array of shape (n_samples, n_dimensions)
Returns
-------
bic : float
The lower the better.
"""
log_likelihood
=
trainer
.
compute_likelihood
(
machine
)
n_parameters
=
(
machine
.
means
.
size
+
machine
.
variances
.
size
+
len
(
machine
.
weights
)
-
1
)
return
-
2
*
log_likelihood
*
X
.
shape
[
0
]
+
n_parameters
*
np
.
log
(
X
.
shape
[
0
])
class
OneClassGMM2
(
Algorithm
):
"""A one class GMM implementation based on Bob's GMM implementation which is more
stable than scikit-learn's one."""
def
__init__
(
self
,
# parameters for the GMM
number_of_gaussians
,
# parameters of UBM training
kmeans_training_iterations
=
25
,
# Maximum number of iterations for K-Means
gmm_training_iterations
=
25
,
# Maximum number of iterations for ML GMM Training
training_threshold
=
5e-4
,
# Threshold to end the ML training
variance_threshold
=
5e-4
,
# Minimum value that a variance can reach
update_weights
=
True
,
update_means
=
True
,
update_variances
=
True
,
n_threads
=
cpu_count
(),
**
kwargs
):
kwargs
.
setdefault
(
"performs_projection"
,
True
)
kwargs
.
setdefault
(
"requires_projector_training"
,
True
)
super
().
__init__
(
**
kwargs
)
self
.
gmm_alg
=
GMM
(
number_of_gaussians
=
number_of_gaussians
,
kmeans_training_iterations
=
kmeans_training_iterations
,
gmm_training_iterations
=
gmm_training_iterations
,
training_threshold
=
training_threshold
,
variance_threshold
=
variance_threshold
,
update_weights
=
update_weights
,
update_means
=
update_means
,
update_variances
=
update_variances
,
n_threads
=
n_threads
,
)
self
.
number_of_gaussians
=
number_of_gaussians
def
train_projector
(
self
,
training_features
,
projector_file
):
del
training_features
[
1
]
real
=
convert_and_prepare_features
(
training_features
[
0
],
dtype
=
"float64"
)
del
training_features
[
0
]
if
isinstance
(
self
.
number_of_gaussians
,
Iterable
):
logger
.
info
(
"Performing grid search for GMM on number_of_gaussians: %s"
,
self
.
number_of_gaussians
,
)
lowest_bic
=
np
.
infty
best_n_gaussians
=
None
for
nc
in
self
.
number_of_gaussians
:
logger
.
info
(
"Testing for number_of_gaussians: %s"
,
nc
)
self
.
gmm_alg
.
gaussians
=
nc
self
.
gmm_alg
.
train_ubm
(
real
)
bic_
=
bic
(
self
.
gmm_alg
.
ubm_trainer
,
self
.
gmm_alg
.
ubm
,
real
)
logger
.
info
(
"BIC for number_of_gaussians: %s is %s"
,
nc
,
bic_
)
if
bic_
<
lowest_bic
:
gmm
=
self
.
gmm_alg
.
ubm
lowest_bic
=
bic_
best_n_gaussians
=
nc
logger
.
info
(
"Best parameters so far: number_of_gaussians %s"
,
nc
)
assert
best_n_gaussians
is
not
None
self
.
gmm_alg
.
gaussians
=
best_n_gaussians
else
:
self
.
gmm_alg
.
train_ubm
(
real
)
gmm
=
self
.
gmm_alg
.
ubm
self
.
gmm_alg
.
ubm
=
gmm
self
.
gmm_alg
.
save_ubm
(
projector_file
)
def
load_projector
(
self
,
projector_file
):
self
.
gmm_alg
.
load_ubm
(
projector_file
)
def
project
(
self
,
feature
):
feature
=
convert_and_prepare_features
([
feature
],
dtype
=
"float64"
)[
0
]
return
self
.
gmm_alg
.
ubm
(
feature
)
def
score
(
self
,
toscore
):
return
[
toscore
]
bob/pad/base/algorithm/Predictions.py
View file @
fef91a20
from
bob.pad.base.algorithm
import
Algorithm
import
numpy
class
Predictions
(
Algorithm
):
...
...
@@ -6,9 +7,31 @@ class Predictions(Algorithm):
scoring."""
def
__init__
(
self
,
**
kwargs
):
super
(
Predictions
,
self
).
__init__
(
**
kwargs
)
super
(
Predictions
,
self
).
__init__
(
**
kwargs
)
def
score
(
self
,
predictions
):
predictions
=
numpy
.
asarray
(
predictions
)
if
predictions
.
size
==
1
:
# output of a sigmoid binary layer
return
predictions
# Assuming the predictions are the output of a softmax layer
return
[
predictions
[
1
]]
class
VideoPredictions
(
Algorithm
):
"""An algorithm that takes the precomputed predictions and uses them for
scoring."""
def
__init__
(
self
,
axis
=
1
,
frame_level_scoring
=
False
,
**
kwargs
):
super
(
VideoPredictions
,
self
).
__init__
(
**
kwargs
)
self
.
frame_level_scoring
=
frame_level_scoring
self
.
axis
=
axis
def
score
(
self
,
predictions
):
# Assuming the predictions are the output of a softmax layer
predictions
=
predictions
.
as_array
()[:,
self
.
axis
]
if
self
.
frame_level_scoring
:
return
predictions
else
:
return
[
numpy
.
mean
(
predictions
)]
bob/pad/base/algorithm/__init__.py
View file @
fef91a20
from
.Algorithm
import
Algorithm
from
.SVM
import
SVM
from
.OneClassGMM
import
OneClassGMM
from
.OneClassGMM2
import
OneClassGMM2
from
.LogRegr
import
LogRegr
from
.SVMCascadePCA
import
SVMCascadePCA
from
.Predictions
import
Predictions
from
.Predictions
import
Predictions
,
VideoPredictions
from
.MLP
import
MLP
from
.PadLDA
import
PadLDA
...
...
@@ -31,9 +32,11 @@ __appropriate__(
Algorithm
,
SVM
,
OneClassGMM
,
OneClassGMM2
,
LogRegr
,
SVMCascadePCA
,
Predictions
,
VideoPredictions
,
MLP
,
PadLDA
)
...
...
bob/pad/base/script/cross.py
View file @
fef91a20
...
...
@@ -7,9 +7,12 @@ import logging
import
math
import
os
import
yaml
from
bob.bio.base.score.load
import
split
from
bob.bio.base.score.load
import
load_score
,
get_negatives_positives
from
bob.extension.scripts.click_helper
import
(
verbosity_option
,
bool_option
,
log_parameters
)
verbosity_option
,
bool_option
,
log_parameters
,
)
from
bob.measure
import
eer_threshold
,
farfrr
from
bob.measure.script
import
common_options
from
bob.measure.utils
import
get_fta
...
...
@@ -19,40 +22,96 @@ from tabulate import tabulate
logger
=
logging
.
getLogger
(
__name__
)
@
click
.
command
(
epilog
=
'''
\b
@
click
.
command
(
epilog
=
"""
\b
Examples:
$ bin/bob pad cross 'results/{{ evaluation.database }}/{{ algorithm }}/{{ evaluation.protocol }}/scores/scores-{{ group }}'
\
-td replaymobile -d replaymobile -p grandtest -d oulunpu -p Protocol_1
\
-a replaymobile_frame-diff-svm
\
-a replaymobile_qm-svm-64
\
-a replaymobile_lbp-svm-64
\
-td replaymobile
\
-d replaymobile -p grandtest
\
-d oulunpu -p Protocol_1
\
-a replaymobile_grandtest_frame-diff-svm
\
-a replaymobile_grandtest_qm-svm-64
\
-a replaymobile_grandtest_lbp-svm-64
\
> replaymobile.rst &
'''
)
@
click
.
argument
(
'score_jinja_template'
)
@
click
.
option
(
'-d'
,
'--database'
,
'databases'
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
'Names of the evaluation databases'
)
@
click
.
option
(
'-p'
,
'--protocol'
,
'protocols'
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
'Names of the protocols of the evaluation databases'
)
@
click
.
option
(
'-a'
,
'--algorithm'
,
'algorithms'
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
'Names of the algorithms'
)
@
click
.
option
(
'-n'
,
'--names'
,
type
=
click
.
File
(
'r'
),
help
=
'Name of algorithms to show in the table. Provide a path '
'to a json file maps algorithm names to names that you want to '
'see in the table.'
)
@
click
.
option
(
'-td'
,
'--train-database'
,
required
=
True
,
help
=
'The database that was used to train the algorithms.'
)
@
click
.
option
(
'-g'
,
'--group'
,
'groups'
,
multiple
=
True
,
show_default
=
True
,
default
=
[
'train'
,
'dev'
,
'eval'
])
@
bool_option
(
'sort'
,
's'
,
'whether the table should be sorted.'
,
True
)
"""
)
@
click
.
argument
(
"score_jinja_template"
)
@
click
.
option
(
"-d"
,
"--database"
,
"databases"
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
"Names of the evaluation databases"
,
)
@
click
.
option
(
"-p"
,
"--protocol"
,
"protocols"
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
"Names of the protocols of the evaluation databases"
,
)
@
click
.
option
(
"-a"
,
"--algorithm"
,
"algorithms"
,
multiple
=
True
,
required
=
True
,
show_default
=
True
,
help
=
"Names of the algorithms"
,
)
@
click
.
option
(
"-n"
,
"--names"
,
type
=
click
.
File
(
"r"
),
help
=
"Name of algorithms to show in the table. Provide a path "
"to a json file maps algorithm names to names that you want to "
"see in the table."
,
)
@
click
.
option
(
"-td"
,
"--train-database"
,
required
=
True
,
help
=
"The database that was used to train the algorithms."
,
)
@
click
.
option
(
"-pn"
,
"--pai-names"
,
type
=
click
.
File
(
"r"
),
help
=
"Name of PAIs to compute the errors per PAI. Provide a path "
"to a json file maps attack_type in scores to PAIs that you want to "
"see in the table."
,
)
@
click
.
option
(
"-g"
,
"--group"
,
"groups"
,
multiple
=
True
,
show_default
=
True
,
default
=
[
"train"
,
"dev"
,
"eval"
],
)
@
bool_option
(
"sort"
,
"s"
,
"whether the table should be sorted."
,
True
)
@
common_options
.
table_option
()
@
common_options
.
output_log_metric_option
()
@
verbosity_option
()
@
click
.
pass_context
def
cross
(
ctx
,
score_jinja_template
,
databases
,
protocols
,
algorithms
,
names
,
train_database
,
groups
,
sort
,
**
kwargs
):
def
cross
(
ctx
,
score_jinja_template
,
databases
,
protocols
,
algorithms
,
names
,
train_database
,
pai_names
,
groups
,
sort
,
verbose
,
**
kwargs
):
"""Cross-db analysis metrics
"""
log_parameters
(
logger
)
...
...
@@ -62,10 +121,12 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
env
=
jinja2
.
Environment
(
undefined
=
jinja2
.
StrictUndefined
)
data
=
{
'evaluation'
:
[{
'database'
:
db
,
'protocol'
:
proto
}
for
db
,
proto
in
zip
(
databases
,
protocols
)],
'algorithm'
:
algorithms
,
'group'
:
groups
,
"evaluation"
:
[
{
"database"
:
db
,
"protocol"
:
proto
}
for
db
,
proto
in
zip
(
databases
,
protocols
)
],
"algorithm"
:
algorithms
,
"group"
:
groups
,
}
metrics
=
{}
...
...
@@ -74,27 +135,30 @@ def cross(ctx, score_jinja_template, databases, protocols, algorithms,
logger
.
debug
(
variables
)
score_path
=
env
.
from_string
(
score_jinja_template
).
render
(
variables
)
logger
.
debug
(
score_path
)
logger
.
info
(
score_path
)
database
,
protocol
,
algorithm
,
group
=
\
variables
[
'evaluation'
][
'database'
],
\
variables
[
'evaluation'
][
'protocol'
],
\
variables
[
'algorithm'
],
variables
[
'group'
]
database
,
protocol
,
algorithm
,
group
=
(
variables
[
"evaluation"
][
"database"
],
variables
[
"evaluation"
][
"protocol"
],
variables
[
"algorithm"
],
variables
[
"group"
],
)
# if algorithm name does not have train_database name in it.
if
train_database
not
in
algorithm
and
database
!=
train_database
:
score_path
=
score_path
.
replace
(
algorithm
,
database
+
'_'
+
algori
th
m
)
score_path
=
score_path
.
replace
(
algorithm
,
database
+
"_"
+
algorithm
)
logger
.
info
(
"Score path changed to: %s"
,
score_pa
th
)
if
not
os
.
path
.
exists
(
score_path
):
metrics
[(
database
,
protocol
,
algorithm
,
group
)]
=
\
(
float
(
'nan'
),
)
*
5
metrics
[(
database
,
protocol
,
algorithm
,
group
)]
=
(
float
(
"nan"
),)
*
5
continue
(
neg
,
pos
),
fta
=
get_fta
(
split
(
score_path
))
scores
=
load_score
(
score_path
)
neg
,
pos
=
get_negatives_positives
(
scores
)
(
neg
,
pos
),
fta
=
get_fta
((
neg
,
pos
))
if
group
==
'
eval
'
:
threshold
=
metrics
[(
database
,
protocol
,
algorithm
,
'
dev
'
)][
1
]
if
group