Commit a772a3d5 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Compute APCER correctly

parent 9f28f82a
Pipeline #29681 passed with stage
in 14 minutes and 32 seconds
This diff is collapsed.
...@@ -7,20 +7,115 @@ import bob.bio.base.script.gen as bio_gen ...@@ -7,20 +7,115 @@ import bob.bio.base.script.gen as bio_gen
import bob.measure.script.figure as measure_figure import bob.measure.script.figure as measure_figure
from bob.bio.base.score import load from bob.bio.base.score import load
from . import pad_figure as figure from . import pad_figure as figure
from .error_utils import negatives_per_pai_and_positives
from functools import partial
SCORE_FORMAT = ( SCORE_FORMAT = (
"Files must be 4-col format, see " "Files must be 4-col format, see " ":py:func:`bob.bio.base.score.load.four_column`."
":py:func:`bob.bio.base.score.load.four_column`.") )
CRITERIA = ('eer', 'min-hter', 'bpcer20') CRITERIA = (
"eer",
"min-hter",
"far",
"bpcer5000",
"bpcer2000",
"bpcer1000",
"bpcer500",
"bpcer200",
"bpcer100",
"bpcer50",
"bpcer20",
"bpcer10",
"bpcer5",
"bpcer2",
"bpcer1",
)
def metrics_option(
sname="-m",
lname="--metrics",
name="metrics",
help="List of metrics to print. Provide a string with comma separated metric "
"names. For possible values see the default value.",
default="apcer_pais,apcer,bpcer,acer,fta,fpr,fnr,hter,far,frr,precision,recall,f1_score",
**kwargs
):
"""The metrics option"""
def custom_metrics_option(func):
def callback(ctx, param, value):
if value is not None:
value = value.split(",")
ctx.meta[name] = value
return value
return click.option(
sname,
lname,
default=default,
help=help,
show_default=True,
callback=callback,
**kwargs
)(func)
return custom_metrics_option
def regexps_option(
help="A list of regular expressions (by repeating this option) to be used to "
"categorize PAIs. Each regexp must match one type of PAI.",
**kwargs
):
def custom_regexps_option(func):
def callback(ctx, param, value):
ctx.meta["regexps"] = value
return value
return click.option(
"-r",
"--regexps",
default=None,
multiple=True,
help=help,
callback=callback,
**kwargs
)(func)
return custom_regexps_option
def regexp_column_option(
help="The column in the score files to match the regular expressions against.",
**kwargs
):
def custom_regexp_column_option(func):
def callback(ctx, param, value):
ctx.meta["regexp_column"] = value
return value
return click.option(
"-rc",
"--regexp-column",
default="real_id",
type=click.Choice(("claimed_id", "real_id", "test_label")),
help=help,
show_default=True,
callback=callback,
**kwargs
)(func)
return custom_regexp_column_option
@click.command() @click.command()
@click.argument('outdir') @click.argument("outdir")
@click.option('-mm', '--mean-match', default=10, type=click.FLOAT, @click.option("-mm", "--mean-match", default=10, type=click.FLOAT, show_default=True)
show_default=True) @click.option(
@click.option('-mnm', '--mean-non-match', default=-10, "-mnm", "--mean-non-match", default=-10, type=click.FLOAT, show_default=True
type=click.FLOAT, show_default=True) )
@click.option('-n', '--n-sys', default=1, type=click.INT, show_default=True) @click.option("-n", "--n-sys", default=1, type=click.INT, show_default=True)
@verbosity_option() @verbosity_option()
@click.pass_context @click.pass_context
def gen(ctx, outdir, mean_match, mean_non_match, n_sys, **kwargs): def gen(ctx, outdir, mean_match, mean_non_match, n_sys, **kwargs):
...@@ -30,48 +125,73 @@ def gen(ctx, outdir, mean_match, mean_non_match, n_sys, **kwargs): ...@@ -30,48 +125,73 @@ def gen(ctx, outdir, mean_match, mean_non_match, n_sys, **kwargs):
parameter. The generated scores can be used as hypothetical datasets. parameter. The generated scores can be used as hypothetical datasets.
Invokes :py:func:`bob.bio.base.script.commands.gen`. Invokes :py:func:`bob.bio.base.script.commands.gen`.
""" """
ctx.meta['five_col'] = False ctx.meta["five_col"] = False
ctx.forward(bio_gen.gen) ctx.forward(bio_gen.gen)
@common_options.metrics_command(common_options.METRICS_HELP.format( @common_options.metrics_command(
names='FtA, APCER, BPCER, FAR, FRR, ACER', common_options.METRICS_HELP.format(
criteria=CRITERIA, score_format=SCORE_FORMAT, names="FtA, APCER, BPCER, FPR, FNR, FAR, FRR, ACER, HTER, precision, recall, f1_score",
hter_note='Note that FAR = APCER * (1 - FtA), ' criteria=CRITERIA,
'FRR = FtA + BPCER * (1 - FtA) and ACER = (APCER + BPCER) / 2.', score_format=SCORE_FORMAT,
command='bob pad metrics'), criteria=CRITERIA) hter_note="Note that APCER = max(APCER_pais), BPCER=FNR, "
def metrics(ctx, scores, evaluation, **kwargs): "FAR = FPR * (1 - FtA), "
process = figure.Metrics(ctx, scores, evaluation, load.split) "FRR = FtA + FNR * (1 - FtA), "
"ACER = (APCER + BPCER) / 2, "
"and HTER = (FPR + FNR) / 2. "
"You can control which metrics are printed using the --metrics option. "
"You can use --regexps and --regexp_column options to change the behavior "
"of finding Presentation Attack Instrument (PAI) types",
command="bob pad metrics",
),
criteria=CRITERIA,
epilog="""\b
More Examples:
\b
bob pad metrics -vvv -e -lg IQM,LBP -r print -r video -m fta,apcer_pais,apcer,bpcer,acer,hter \
/scores/oulunpu/{qm-svm,lbp-svm}/Protocol_1/scores/scores-{dev,eval}
See also ``bob pad multi-metrics``.
""",
)
@regexps_option()
@regexp_column_option()
@metrics_option()
def metrics(ctx, scores, evaluation, regexps, regexp_column, metrics, **kwargs):
load_fn = partial(
negatives_per_pai_and_positives, regexps=regexps, regexp_column=regexp_column
)
process = figure.Metrics(ctx, scores, evaluation, load_fn, metrics)
process.run() process.run()
@common_options.roc_command( @common_options.roc_command(
common_options.ROC_HELP.format( common_options.ROC_HELP.format(score_format=SCORE_FORMAT, command="bob pad roc")
score_format=SCORE_FORMAT, command='bob pad roc')) )
def roc(ctx, scores, evaluation, **kwargs): def roc(ctx, scores, evaluation, **kwargs):
process = figure.Roc(ctx, scores, evaluation, load.split) process = figure.Roc(ctx, scores, evaluation, load.split)
process.run() process.run()
@common_options.det_command( @common_options.det_command(
common_options.DET_HELP.format( common_options.DET_HELP.format(score_format=SCORE_FORMAT, command="bob pad det")
score_format=SCORE_FORMAT, command='bob pad det')) )
def det(ctx, scores, evaluation, **kwargs): def det(ctx, scores, evaluation, **kwargs):
process = figure.Det(ctx, scores, evaluation, load.split) process = figure.Det(ctx, scores, evaluation, load.split)
process.run() process.run()
@common_options.epc_command( @common_options.epc_command(
common_options.EPC_HELP.format( common_options.EPC_HELP.format(score_format=SCORE_FORMAT, command="bob pad epc")
score_format=SCORE_FORMAT, command='bob pad epc')) )
def epc(ctx, scores, **kwargs): def epc(ctx, scores, **kwargs):
process = measure_figure.Epc(ctx, scores, True, load.split, hter='ACER') process = measure_figure.Epc(ctx, scores, True, load.split, hter="ACER")
process.run() process.run()
@common_options.hist_command( @common_options.hist_command(
common_options.HIST_HELP.format( common_options.HIST_HELP.format(score_format=SCORE_FORMAT, command="bob pad hist")
score_format=SCORE_FORMAT, command='bob pad hist')) )
def hist(ctx, scores, evaluation, **kwargs): def hist(ctx, scores, evaluation, **kwargs):
process = figure.Hist(ctx, scores, evaluation, load.split) process = figure.Hist(ctx, scores, evaluation, load.split)
process.run() process.run()
...@@ -79,21 +199,43 @@ def hist(ctx, scores, evaluation, **kwargs): ...@@ -79,21 +199,43 @@ def hist(ctx, scores, evaluation, **kwargs):
@common_options.evaluate_command( @common_options.evaluate_command(
common_options.EVALUATE_HELP.format( common_options.EVALUATE_HELP.format(
score_format=SCORE_FORMAT, command='bob pad evaluate'), score_format=SCORE_FORMAT, command="bob pad evaluate"
criteria=CRITERIA) ),
criteria=CRITERIA,
)
def evaluate(ctx, scores, evaluation, **kwargs): def evaluate(ctx, scores, evaluation, **kwargs):
common_options.evaluate_flow( common_options.evaluate_flow(
ctx, scores, evaluation, metrics, roc, det, epc, hist, **kwargs) ctx, scores, evaluation, metrics, roc, det, epc, hist, **kwargs
)
@common_options.multi_metrics_command( @common_options.multi_metrics_command(
common_options.MULTI_METRICS_HELP.format( common_options.MULTI_METRICS_HELP.format(
names='FtA, APCER, BPCER, FAR, FRR, ACER', names="FtA, APCER, BPCER, FAR, FRR, ACER, HTER, precision, recall, f1_score",
criteria=CRITERIA, score_format=SCORE_FORMAT, criteria=CRITERIA,
command='bob pad multi-metrics'), score_format=SCORE_FORMAT,
criteria=CRITERIA) command="bob pad multi-metrics",
def multi_metrics(ctx, scores, evaluation, protocols_number, **kwargs): ),
ctx.meta['min_arg'] = protocols_number * (2 if evaluation else 1) criteria=CRITERIA,
process = figure.MultiMetrics( epilog="""\b
ctx, scores, evaluation, load.split) More examples:
\b
bob pad multi-metrics -vvv -e -pn 6 -lg IQM,LBP -r print -r video \
/scores/oulunpu/{qm-svm,lbp-svm}/Protocol_3_{1,2,3,4,5,6}/scores/scores-{dev,eval}
See also ``bob pad metrics``.
""",
)
@regexps_option()
@regexp_column_option()
@metrics_option(default="fta,apcer_pais,apcer,bpcer,acer,hter")
def multi_metrics(
ctx, scores, evaluation, protocols_number, regexps, regexp_column, metrics, **kwargs
):
ctx.meta["min_arg"] = protocols_number * (2 if evaluation else 1)
load_fn = partial(
negatives_per_pai_and_positives, regexps=regexps, regexp_column=regexp_column
)
process = figure.MultiMetrics(ctx, scores, evaluation, load_fn, metrics)
process.run() process.run()
"""Runs error analysis on score sets, outputs metrics and plots"""
'''Runs error analysis on score sets, outputs metrics and plots'''
import bob.measure.script.figure as measure_figure import bob.measure.script.figure as measure_figure
from bob.measure.utils import get_fta_list
from bob.measure import farfrr, precision_recall, f_score
import bob.bio.base.script.figure as bio_figure import bob.bio.base.script.figure as bio_figure
from .error_utils import calc_threshold from .error_utils import calc_threshold, apcer_bpcer
import click
from tabulate import tabulate
import numpy as np
ALL_CRITERIA = ('bpcer20', 'eer', 'min-hter') def _normalize_input_scores(input_score, input_name):
pos, negs = input_score
# convert scores to sorted numpy arrays and keep a copy of all negatives
pos = np.ascontiguousarray(sorted(pos))
all_negs = np.ascontiguousarray(sorted(s for neg in negs.values() for s in neg))
# FTA is calculated on pos and all_negs so we remove nans from negs
for k, v in negs.items():
v = np.ascontiguousarray(sorted(v))
negs[k] = v[~np.isnan(v)]
neg_list, pos_list, fta_list = get_fta_list([(all_negs, pos)])
all_negs, pos, fta = neg_list[0], pos_list[0], fta_list[0]
return input_name, pos, negs, all_negs, fta
class Metrics(bio_figure.Metrics): class Metrics(bio_figure.Metrics):
'''Compute metrics from score files''' """Compute metrics from score files"""
def __init__(self, ctx, scores, evaluation, func_load, names):
if isinstance(names, str):
names = names.split(",")
super(Metrics, self).__init__(ctx, scores, evaluation, func_load, names)
def get_thres(self, criterion, pos, negs, all_negs, far_value):
return calc_threshold(
criterion, pos, negs.values(), all_negs, far_value, is_sorted=True
)
def _numbers(self, threshold, pos, negs, all_negs, fta):
pais = list(negs.keys())
apcer_pais, apcer, bpcer = apcer_bpcer(threshold, pos, *[negs[k] for k in pais])
apcer_pais = {k: apcer_pais[i] for i, k in enumerate(pais)}
acer = (apcer + bpcer) / 2.0
fpr, fnr = farfrr(all_negs, pos, threshold)
hter = (fpr + fnr) / 2.0
far = fpr * (1 - fta)
frr = fta + fnr * (1 - fta)
nn = all_negs.shape[0] # number of attack
fp = int(round(fpr * nn)) # number of false positives
np = pos.shape[0] # number of bonafide
fn = int(round(fnr * np)) # number of false negatives
def __init__(self, ctx, scores, evaluation, func_load, # precision and recall
names=('FtA', 'APCER', 'BPCER', 'FAR', 'FRR', 'HTER')): precision, recall = precision_recall(all_negs, pos, threshold)
super(Metrics, self).__init__(
ctx, scores, evaluation, func_load, names # f_score
f1_score = f_score(all_negs, pos, threshold, 1)
metrics = dict(
apcer_pais=apcer_pais,
apcer=apcer,
bpcer=bpcer,
acer=acer,
fta=fta,
fpr=fpr,
fnr=fnr,
hter=hter,
far=far,
frr=frr,
fp=fp,
nn=nn,
fn=fn,
np=np,
precision=precision,
recall=recall,
f1_score=f1_score,
) )
return metrics
def get_thres(self, criterion, dev_neg, dev_pos, far): def _strings(self, metrics):
if self._criterion == 'bpcer20': n_dec = ".%df" % self._decimal
return calc_threshold('bpcer20', dev_neg, dev_pos) for k, v in metrics.items():
if k in ("prec", "recall", "f1"):
metrics[k] = "%s" % format(v, n_dec)
elif k in ("np", "nn", "fp", "fn"):
continue
elif k in ("fpr", "fnr"):
metrics[k] = "%s%% (%d/%d)" % (
format(100 * v, n_dec),
metrics["fp" if k == "fpr" else "fn"],
metrics["np" if k == "fpr" else "nn"],
)
elif k == "apcer_pais":
metrics[k] = {
k1: "%s%%" % format(100 * v1, n_dec) for k1, v1 in v.items()
}
else: else:
return super(Metrics, self).get_thres( metrics[k] = "%s%%" % format(100 * v, n_dec)
criterion, dev_neg, dev_pos, far)
return metrics
class MultiMetrics(measure_figure.MultiMetrics): def _get_all_metrics(self, idx, input_scores, input_names):
'''Compute metrics from score files''' """ Compute all metrics for dev and eval scores"""
for i, (score, name) in enumerate(zip(input_scores, input_names)):
input_scores[i] = _normalize_input_scores(score, name)
def __init__(self, ctx, scores, evaluation, func_load): dev_file, dev_pos, dev_negs, dev_all_negs, dev_fta = input_scores[0]
if self._eval:
eval_file, eval_pos, eval_negs, eval_all_negs, eval_fta = input_scores[1]
threshold = (
self.get_thres(self._criterion, dev_pos, dev_negs, dev_all_negs, self._far)
if self._thres is None
else self._thres[idx]
)
title = self._legends[idx] if self._legends is not None else None
if self._thres is None:
far_str = ""
if self._criterion == "far" and self._far is not None:
far_str = str(self._far)
click.echo(
"[Min. criterion: %s %s] Threshold on Development set `%s`: %e"
% (self._criterion.upper(), far_str, title or dev_file, threshold),
file=self.log_file,
)
else:
click.echo(
"[Min. criterion: user provided] Threshold on "
"Development set `%s`: %e" % (dev_file or title, threshold),
file=self.log_file,
)
res = []
res.append(
self._strings(
self._numbers(threshold, dev_pos, dev_negs, dev_all_negs, dev_fta)
)
)
if self._eval:
# computes statistics for the eval set based on the threshold a priori
res.append(
self._strings(
self._numbers(
threshold, eval_pos, eval_negs, eval_all_negs, eval_fta
)
)
)
else:
res.append(None)
return res
def compute(self, idx, input_scores, input_names):
""" Compute metrics for the given criteria"""
title = self._legends[idx] if self._legends is not None else None
all_metrics = self._get_all_metrics(idx, input_scores, input_names)
headers = [" " or title, "Development"]
if self._eval:
headers.append("Evaluation")
rows = []
for name in self.names:
if name == "apcer_pais":
for k, v in all_metrics[0][name].items():
print_name = f"APCER ({k})"
rows += [[print_name, v]]
if self._eval:
rows[-1].append(all_metrics[1][name][k])
continue
print_name = name.upper()
rows += [[print_name, all_metrics[0][name]]]
if self._eval:
rows[-1].append(all_metrics[1][name])
click.echo(tabulate(rows, headers, self._tablefmt), file=self.log_file)
class MultiMetrics(Metrics):
"""Compute metrics from score files"""
def __init__(self, ctx, scores, evaluation, func_load, names):
super(MultiMetrics, self).__init__( super(MultiMetrics, self).__init__(
ctx, scores, evaluation, func_load, ctx, scores, evaluation, func_load, names=names
names=('FtA', 'APCER', 'BPCER', 'FAR', 'FRR', 'ACER')) )
self.rows = []
self.headers = None
self.pais = None
def _compute_headers(self, pais):
names = list(self.names)
idx = names.index("apcer_pais")
if idx > -1:
names = (
[n.upper() for n in names[:idx]]
+ self.pais
+ [n.upper() for n in names[idx + 1 :]]
)
self.headers = ["Methods"] + names
if self._eval:
self.headers.insert(1, "HTER (dev)")
def _strings(self, metrics):
formatted_metrics = dict()
for name in self.names:
if name == "apcer_pais":
for pai in self.pais:
mean = metrics[pai].mean()
std = metrics[pai].std()
mean = super()._strings({pai: mean})[pai]
std = super()._strings({pai: std})[pai]
formatted_metrics[pai] = f"{mean} ({std})"
else:
mean = metrics[name].mean()
std = metrics[name].std()
mean = super()._strings({name: mean})[name]
std = super()._strings({name: std})[name]
formatted_metrics[name] = f"{mean} ({std})"
return formatted_metrics
def get_thres(self, criterion, dev_neg, dev_pos, far): def _structured_array(self, metrics):
if self._criterion == 'bpcer20': names = list(metrics[0].keys())
return calc_threshold('bpcer20', dev_neg, dev_pos) idx = names.index("apcer_pais")
if idx > -1:
pais = list(f"APCER ({pai})" for pai in metrics[0]["apcer_pais"].keys())
names = names[:idx] + pais + names[idx + 1 :]
self.pais = self.pais or pais
formats = [float] * len(names)
dtype = dict(names=names, formats=formats)
array = []
for each in metrics:
array.append([])
for k, v in each.items():
if k == "apcer_pais":
array[-1].extend(list(v.values()))
else: else:
return super(MultiMetrics, self).get_thres( array[-1].append(v)
criterion, dev_neg, dev_pos, far) array = [tuple(a) for a in array]
return np.array(array, dtype=dtype)
def compute(self, idx, input_scores, input_names):
"""Computes the average of metrics over several protocols."""
for i, (score, name) in enumerate(zip(input_scores, input_names)):
input_scores[i] = _normalize_input_scores(score, name)
step = 2 if self._eval else 1
self._dev_metrics = []
self._thresholds = []
for scores in input_scores[::step]:
name, pos, negs, all_negs, fta = scores
threshold = (
self.get_thres(self._criterion, pos, negs, all_negs, self._far)
if self._thres is