Commit 4e89bcb3 authored by Anjith GEORGE's avatar Anjith GEORGE

Merge branch 'correct-apcer-calculation' into 'master'

Correct apcer calculation

Closes #31

See merge request !60
parents 47b3f2d0 8f66c86e
Pipeline #30336 passed with stages
in 12 minutes and 22 seconds
......@@ -7,11 +7,8 @@ from bob.pad.base.algorithm import Algorithm
import bob.learn.mlp
import bob.io.base
from bob.bio.video.utils import FrameContainer
from bob.pad.base.utils import convert_frame_cont_to_array
from bob.core.log import setup
logger = setup("bob.pad.base")
import logging
logger = logging.getLogger(__name__)
class MLP(Algorithm):
......@@ -42,11 +39,11 @@ class MLP(Algorithm):
criterion to stop the training: if the difference
between current and last loss is smaller than
this number, then stop training.
"""
Algorithm.__init__(self,
Algorithm.__init__(self,
performs_projection=True,
requires_projector_training=True,
requires_projector_training=True,
**kwargs)
self.hidden_units = hidden_units
......@@ -54,32 +51,31 @@ class MLP(Algorithm):
self.precision = precision
self.mlp = None
def train_projector(self, training_features, projector_file):
"""Trains the MLP
Parameters
----------
training_features : :any:`list` of :py:class:`numpy.ndarray`
training_features : :any:`list` of :py:class:`numpy.ndarray`
Data used to train the MLP. The real attempts are in training_features[0] and the attacks are in training_features[1]
projector_file : str
Filename where to save the trained model.
"""
# training is done in batch (i.e. using all training data)
batch_size = len(training_features[0]) + len(training_features[1])
# The labels
label_real = numpy.zeros((len(training_features[0]), 2), dtype='float64')
label_real[:, 0] = 1
label_attack = numpy.zeros((len(training_features[1]), 2), dtype='float64')
label_attack[:, 1] = 0
real = numpy.array(training_features[0])
attack = numpy.array(training_features[1])
X = numpy.vstack([real, attack])
Y = numpy.vstack([label_real, label_attack])
# Building MLP architecture
input_dim = real.shape[1]
shape = []
......@@ -89,16 +85,16 @@ class MLP(Algorithm):
# last layer contains two units: one for each class (i.e. real and attack)
shape.append(2)
shape = tuple(shape)
self.mlp = bob.learn.mlp.Machine(shape)
self.mlp.output_activation = bob.learn.activation.Logistic()
self.mlp.randomize()
trainer = bob.learn.mlp.BackProp(batch_size, bob.learn.mlp.CrossEntropyLoss(self.mlp.output_activation), self.mlp, train_biases=True)
n_iter = 0
previous_cost = 0
current_cost = 1
while (n_iter < self.max_iter) and (abs(previous_cost - current_cost) > self.precision):
while (n_iter < self.max_iter) and (abs(previous_cost - current_cost) > self.precision):
previous_cost = current_cost
trainer.train(self.mlp, X, Y)
current_cost = trainer.cost(self.mlp, X, Y)
......@@ -107,14 +103,13 @@ class MLP(Algorithm):
f = bob.io.base.HDF5File(projector_file, 'w')
self.mlp.save(f)
def project(self, feature):
"""Project the given feature
Parameters
----------
feature : :py:class:`numpy.ndarray`
feature : :py:class:`numpy.ndarray`
The feature to classify
Returns
......@@ -126,18 +121,17 @@ class MLP(Algorithm):
# feature = convert_frame_cont_to_array(feature)
return self.mlp(feature)
def score(self, toscore):
"""Returns the probability of the real class.
Parameters
----------
toscore : :py:class:`numpy.ndarray`
Returns
-------
float
probability of the authentication attempt to be real.
probability of the authentication attempt to be real.
"""
if toscore.ndim == 1:
return [toscore[0]]
......
......@@ -5,22 +5,31 @@
import bob.measure
import numpy
from bob.measure import (
far_threshold, eer_threshold, min_hter_threshold)
from bob.measure import far_threshold, eer_threshold, min_hter_threshold, farfrr
from bob.bio.base.score.load import four_column
from collections import defaultdict
import re
def calc_threshold(method, neg, pos):
def calc_threshold(method, pos, negs, all_negs, far_value=None, is_sorted=False):
"""Calculates the threshold based on the given method.
The scores should be sorted!
Parameters
----------
method : str
One of ``bpcer20``, ``eer``, ``min-hter``.
neg : array_like
The negative scores. They should be sorted!
pos : array_like
The positive scores. They should be sorted!
negs : list
A list of array_like negative scores. Each item in the list corresponds to
scores of one PAI.
all_negs : array_like
An array of all negative scores. This can be calculated from negs as well but we
ask for it since you might have it already calculated.
far_value : None, optional
If method is far, far_value and all_negs are used to calculate the threshold.
is_sorted : bool, optional
If True, it means all scores are sorted and no sorting will happen.
Returns
-------
......@@ -33,12 +42,15 @@ def calc_threshold(method, neg, pos):
If method is unknown.
"""
method = method.lower()
if method == 'bpcer20':
threshold = far_threshold(neg, pos, 0.05, True)
elif method == 'eer':
threshold = eer_threshold(neg, pos, True)
elif method == 'min-hter':
threshold = min_hter_threshold(neg, pos, True)
if "bpcer" in method:
desired_apcer = 1 / float(method.replace("bpcer", ""))
threshold = apcer_threshold(desired_apcer, pos, *negs, is_sorted=is_sorted)
elif method == "far":
threshold = far_threshold(all_negs, pos, far_value, is_sorted=is_sorted)
elif method == "eer":
threshold = eer_threshold(all_negs, pos, is_sorted=is_sorted)
elif method == "min-hter":
threshold = min_hter_threshold(all_negs, pos, is_sorted=is_sorted)
else:
raise ValueError("Unknown threshold criteria: {}".format(method))
......@@ -63,11 +75,7 @@ def calc_pass_rate(threshold, attacks):
return (attacks >= threshold).mean()
def weighted_neg_error_rate_criteria(data,
weight,
thres,
beta=0.5,
criteria='eer'):
def weighted_neg_error_rate_criteria(data, weight, thres, beta=0.5, criteria="eer"):
"""Given the single value for the weight parameter balancing between
impostors and spoofing attacks and a threshold, calculates the error rates
and their relationship depending on the criteria (difference in case of
......@@ -100,26 +108,21 @@ def weighted_neg_error_rate_criteria(data,
far_w = (1 - weight) * far_i + weight * far_s
if criteria == 'eer':
if criteria == "eer":
if beta == 0.5:
return abs(far_w - frr)
else:
# return abs(far_w - frr)
return abs((1 - beta) * frr - beta * far_w)
elif criteria == 'min-hter':
elif criteria == "min-hter":
return (far_w + frr) / 2
else:
return (1 - beta) * frr + beta * far_w
def recursive_thr_search(data,
span_min,
span_max,
weight,
beta=0.5,
criteria='eer'):
def recursive_thr_search(data, span_min, span_max, weight, beta=0.5, criteria="eer"):
"""Recursive search for the optimal threshold given a criteria. It
evaluates the full range of thresholds at 100 points, and computes the one
which optimizes the threshold. In the next search iteration, it examines
......@@ -148,29 +151,27 @@ def recursive_thr_search(data,
return span_max # or span_min, it doesn't matter
else:
step_size = (span_max - span_min) / steps
thresholds = numpy.array(
[(i * step_size) + span_min for i in range(steps + 1)])
weighted_error_rates = numpy.array([
weighted_neg_error_rate_criteria(data, weight, thr, beta, criteria)
for thr in thresholds
])
selected_thres = thresholds[numpy.where(
weighted_error_rates == min(weighted_error_rates)
)] # all the thresholds which have minimum weighted error rate
thr = selected_thres[int(
selected_thres.size / 2
)] # choose the centrally positioned threshold
return recursive_thr_search(data, thr - step_size, thr + step_size,
weight, beta, criteria)
def weighted_negatives_threshold(licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
weight,
beta=0.5,
criteria='eer'):
thresholds = numpy.array([(i * step_size) + span_min for i in range(steps + 1)])
weighted_error_rates = numpy.array(
[
weighted_neg_error_rate_criteria(data, weight, thr, beta, criteria)
for thr in thresholds
]
)
selected_thres = thresholds[
numpy.where(weighted_error_rates == min(weighted_error_rates))
] # all the thresholds which have minimum weighted error rate
thr = selected_thres[
int(selected_thres.size / 2)
] # choose the centrally positioned threshold
return recursive_thr_search(
data, thr - step_size, thr + step_size, weight, beta, criteria
)
def weighted_negatives_threshold(
licit_neg, licit_pos, spoof_neg, spoof_pos, weight, beta=0.5, criteria="eer"
):
"""Calculates the threshold for achieving the given criteria between the
FAR_w and the FRR, given the single value for the weight parameter
balancing between impostors and spoofing attacks and a single value for the
......@@ -197,10 +198,13 @@ def weighted_negatives_threshold(licit_neg,
span_max = max(
numpy.append(licit_pos, spoof_pos)
) # the max of the span where we will search for the threshold
data = (licit_neg, licit_pos, spoof_neg,
spoof_pos) # pack the data into a single list
return recursive_thr_search(data, span_min, span_max, weight, beta,
criteria)
data = (
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
) # pack the data into a single list
return recursive_thr_search(data, span_min, span_max, weight, beta, criteria)
def epsc_weights(licit_neg, licit_pos, spoof_neg, spoof_pos, points=100):
......@@ -215,14 +219,16 @@ def epsc_weights(licit_neg, licit_pos, spoof_neg, spoof_pos, points=100):
return weights
def epsc_thresholds(licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
points=100,
criteria='eer',
omega=None,
beta=None):
def epsc_thresholds(
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
points=100,
criteria="eer",
omega=None,
beta=None,
):
"""Calculates the optimal thresholds for EPSC, for a range of the weight
parameter balancing between impostors and spoofing attacks, and for a range
of the beta parameter balancing between real accesses and all the negatives
......@@ -249,32 +255,37 @@ def epsc_thresholds(licit_neg,
if omega is None:
omega = numpy.array([(i * step_size) for i in range(points + 1)])
elif not isinstance(omega, list) and not isinstance(
omega, tuple) and not isinstance(omega, numpy.ndarray):
elif (
not isinstance(omega, list)
and not isinstance(omega, tuple)
and not isinstance(omega, numpy.ndarray)
):
omega = numpy.array([omega])
else:
omega = numpy.array(omega)
if beta is None:
beta = numpy.array([(i * step_size) for i in range(points + 1)])
elif not isinstance(beta, list) and not isinstance(
beta, tuple) and not isinstance(beta, numpy.ndarray):
elif (
not isinstance(beta, list)
and not isinstance(beta, tuple)
and not isinstance(beta, numpy.ndarray)
):
beta = numpy.array([beta])
else:
beta = numpy.array(beta)
thresholds = numpy.ndarray([beta.size, omega.size], 'float64')
thresholds = numpy.ndarray([beta.size, omega.size], "float64")
for bindex, b in enumerate(beta):
thresholds[bindex, :] = numpy.array([
weighted_negatives_threshold(
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
w,
b,
criteria=criteria) for w in omega
], 'float64')
thresholds[bindex, :] = numpy.array(
[
weighted_negatives_threshold(
licit_neg, licit_pos, spoof_neg, spoof_pos, w, b, criteria=criteria
)
for w in omega
],
"float64",
)
return omega, beta, thresholds
......@@ -291,13 +302,9 @@ def weighted_err(error_1, error_2, weight):
return (1 - weight) * error_1 + weight * error_2
def error_rates_at_weight(licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
omega,
threshold,
beta=0.5):
def error_rates_at_weight(
licit_neg, licit_pos, spoof_neg, spoof_pos, omega, threshold, beta=0.5
):
"""Calculates several error rates: FRR, FAR (zero-effort impostors), SFAR,
FAR_w, HTER_w for a given value of w. It returns the calculated threshold
as a last argument
......@@ -317,11 +324,11 @@ negative samples (impostors and spoofing attacks).
"""
farfrr_licit = bob.measure.farfrr(
licit_neg, licit_pos,
threshold) # calculate test frr @ threshold (licit scenario)
licit_neg, licit_pos, threshold
) # calculate test frr @ threshold (licit scenario)
farfrr_spoof = bob.measure.farfrr(
spoof_neg, spoof_pos,
threshold) # calculate test frr @ threshold (spoof scenario)
spoof_neg, spoof_pos, threshold
) # calculate test frr @ threshold (spoof scenario)
# we can take this value from farfrr_spoof as well, it doesn't matter
frr = farfrr_licit[1]
......@@ -335,8 +342,9 @@ negative samples (impostors and spoofing attacks).
return (frr, far, sfar, far_w, wer_wb, hter_w, threshold)
def epsc_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
omega, beta):
def epsc_error_rates(
licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds, omega, beta
):
"""Calculates several error rates: FAR_w and WER_wb for the given weights
(omega and beta) and thresholds (the thresholds need to be computed first
using the method: epsc_thresholds() before passing to this method)
......@@ -368,13 +376,20 @@ def epsc_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
WER_wb
"""
far_w_errors = numpy.ndarray((beta.size, omega.size), 'float64')
wer_wb_errors = numpy.ndarray((beta.size, omega.size), 'float64')
far_w_errors = numpy.ndarray((beta.size, omega.size), "float64")
wer_wb_errors = numpy.ndarray((beta.size, omega.size), "float64")
for bindex, b in enumerate(beta):
errors = [
error_rates_at_weight(licit_neg, licit_pos, spoof_neg, spoof_pos,
w, thresholds[bindex, windex], b)
error_rates_at_weight(
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
w,
thresholds[bindex, windex],
b,
)
for windex, w in enumerate(omega)
]
far_w_errors[bindex, :] = [errors[i][3] for i in range(len(errors))]
......@@ -383,8 +398,9 @@ def epsc_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
return far_w_errors, wer_wb_errors
def all_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
omega, beta):
def all_error_rates(
licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds, omega, beta
):
"""Calculates several error rates: FAR_w and WER_wb for the given weights
(omega and beta) and thresholds (the thresholds need to be computed first
using the method: epsc_thresholds() before passing to this method)
......@@ -416,17 +432,24 @@ def all_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
WER_wb
"""
frr_errors = numpy.ndarray((beta.size, omega.size), 'float64')
far_errors = numpy.ndarray((beta.size, omega.size), 'float64')
sfar_errors = numpy.ndarray((beta.size, omega.size), 'float64')
far_w_errors = numpy.ndarray((beta.size, omega.size), 'float64')
wer_wb_errors = numpy.ndarray((beta.size, omega.size), 'float64')
hter_wb_errors = numpy.ndarray((beta.size, omega.size), 'float64')
frr_errors = numpy.ndarray((beta.size, omega.size), "float64")
far_errors = numpy.ndarray((beta.size, omega.size), "float64")
sfar_errors = numpy.ndarray((beta.size, omega.size), "float64")
far_w_errors = numpy.ndarray((beta.size, omega.size), "float64")
wer_wb_errors = numpy.ndarray((beta.size, omega.size), "float64")
hter_wb_errors = numpy.ndarray((beta.size, omega.size), "float64")
for bindex, b in enumerate(beta):
errors = [
error_rates_at_weight(licit_neg, licit_pos, spoof_neg, spoof_pos,
w, thresholds[bindex, windex], b)
error_rates_at_weight(
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
w,
thresholds[bindex, windex],
b,
)
for windex, w in enumerate(omega)
]
frr_errors[bindex, :] = [errors[i][0] for i in range(len(errors))]
......@@ -436,20 +459,28 @@ def all_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds,
wer_wb_errors[bindex, :] = [errors[i][4] for i in range(len(errors))]
hter_wb_errors[bindex, :] = [errors[i][5] for i in range(len(errors))]
return (frr_errors, far_errors, sfar_errors, far_w_errors, wer_wb_errors,
hter_wb_errors)
def calc_aue(licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
thresholds,
omega,
beta,
l_bound=0,
h_bound=1,
var_param='omega'):
return (
frr_errors,
far_errors,
sfar_errors,
far_w_errors,
wer_wb_errors,
hter_wb_errors,
)
def calc_aue(
licit_neg,
licit_pos,
spoof_neg,
spoof_pos,
thresholds,
omega,
beta,
l_bound=0,
h_bound=1,
var_param="omega",
):
"""Calculates AUE of EPSC for the given thresholds and weights
Keyword arguments:
......@@ -468,13 +499,15 @@ def calc_aue(licit_neg,
from scipy import integrate
if var_param == 'omega':
errors = all_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos,
thresholds, omega, beta)
if var_param == "omega":
errors = all_error_rates(
licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds, omega, beta
)
weights = omega # setting the weights to the varying parameter
else:
errors = all_error_rates(licit_neg, licit_pos, spoof_neg, spoof_pos,
thresholds, omega, beta)
errors = all_error_rates(
licit_neg, licit_pos, spoof_neg, spoof_pos, thresholds, omega, beta
)
weights = beta # setting the weights to the varying parameter
wer_errors = errors[4].reshape(1, errors[4].size)
......@@ -482,8 +515,147 @@ def calc_aue(licit_neg,
l_ind = numpy.where(weights >= l_bound)[0][0]
h_ind = numpy.where(weights <= h_bound)[0][-1]
aue = integrate.cumtrapz(wer_errors, weights)
aue = numpy.append(
[0], aue) # for indexing purposes, aue is cumulative integration
aue = numpy.append([0], aue) # for indexing purposes, aue is cumulative integration
aue = aue[h_ind] - aue[l_ind]
return aue
def apcer_threshold(desired_apcer, pos, *negs, is_sorted=False):
"""Computes the threshold given the desired APCER as the criteria.
APCER is computed as max of all APCER_PAI values.
The threshold will be computed such that the real APCER is **at most** the desired
value.
Parameters
----------
desired_apcer : float
The desired APCER value.
pos : list
An array or list of positive scores in float.
*negs
A list of negative scores. Each item corresponds to the negative scores of one
PAI.
is_sorted : bool, optional
Set to ``True`` if ALL arrays (pos and negs) are sorted.
Returns
-------
float
The computed threshold that satisfies the desired APCER.
"""
threshold = max(
far_threshold(neg, pos, desired_apcer, is_sorted=is_sorted) for neg in negs
)
return threshold
def apcer_bpcer(threshold, pos, *negs):
"""Computes APCER_PAI, APCER, and BPCER given the positive scores and a list of
negative scores and a threshold.
Parameters
----------
threshold : float
The threshold to be used to compute the error rates.
pos : list
An array or list of positive scores in float.
*negs
A list of negative scores. Each item corresponds to the negative scores of one
PAI.
Returns
-------
tuple
A tuple such as (list of APCER_PAI, APCER, BPCER)
"""
apcers = []
assert len(negs) > 0, negs
for neg in negs:
far, frr = farfrr(neg, pos, threshold)
apcers.append(far)
bpcer = frr # bpcer will be the same in all cases
return apcers, max(apcers), bpcer
def negatives_per_pai_and_positives(filename, regexps=None, regexp_column="real_id"):
"""Returns scores for Bona-Fide samples and scores for each PAI.
By default, the real_id column (second column) is used as indication for each
Presentation Attack Instrument (PAI).
For example, if you have scores like:
001 001 bona_fide_sample_1_path 0.9
001 print print_sample_1_path 0.6
001 print print_sample_2_path 0.6
001 replay replay_sample_1_path 0.2
001 replay replay_sample_2_path 0.2
001 mask mask_sample_1_path 0.5
001 mask mask_sample_2_path 0.5
this function will return 3 sets of negative scores (for each print, replay, and
mask PAIs).
Otherwise, you can provide a list regular expressions that match each PAI.
For example, if you have scores like:
001 001 bona_fide_sample_1_path 0.9
001 print/1 print_sample_1_path 0.6
001 print/2 print_sample_2_path 0.6
001 replay/1 replay_sample_1_path 0.2
001 replay/2 replay_sample_2_path 0.2
001 mask/1 mask_sample_1_path 0.5
001 mask/2 mask_sample_2_path 0.5
and give a list of regexps as ('print', 'replay', 'mask') the function will return 3
sets of negative scores (for each print, replay, and mask PAIs).
Parameters
----------
filename : str
Path to the score file.
regexps : None, optional
A list of regular expressions that match each PAI. If not given, the values in
the real_id column are used to find scores for different PAIs.
regexp_column : str, optional
If a list of regular expressions are given, those patterns will be matched
against the values in this column.
Returns
-------
tuple
A tuple containing pos scores and a dict of negative scores mapping PAIs to
their scores.
Raises
------
ValueError
If none of the given regular expressions match the values in regexp_column.
"""
pos = []
negs = defaultdict(list)
if regexps:
regexps = [re.compile(pattern) for pattern in regexps]
assert regexp_column in ("claimed_id", "real_id", "test_label"), regexp_column
for claimed_id, real_id, test_label, score in four_column(filename):
# if it is a Bona-Fide score
if claimed_id == real_id:
pos.append(score)
continue
if not regexps:
negs[real_id].append(score)
continue
# if regexps is not None or empty and is not a Bona-Fide score
string = {
"claimed_id": claimed_id,
"real_id": real_id,
"test_label": test_label,
}[regexp_column]
for pattern in regexps:
if pattern.match(string):
negs[pattern.pattern].append(score)
break
else: # this else is for the for loop: ``for pattern in regexps:``
raise ValueError(
f"No regexps: {regexps} match `{string}' from `{regexp_column}' column"
)
return pos, negs
......@@ -7,93 +7,235 @@ import bob.bio.base.script.gen as bio_gen
import bob.measure.script.figure as measure_figure
from bob.bio.base.score import load
from . import pad_figure as figure
from .error_utils import negatives_per_pai_and_positives
from functools import partial
SCORE_FORMAT = (
"Files must be 4-col format, see "
":py:func:`bob.bio.base.score.load.four_column`.")
CRITERIA = ('eer', 'min-hter', 'bpcer20')
"Files must be 4-col format, see " ":py:func:`bob.bio.base.score.load.four_column`."
)
CRITERIA = (
"eer",
<