Skip to content
Snippets Groups Projects
Commit 793516c0 authored by Amir Mohammadi's avatar Amir Mohammadi
Browse files

Revert "MLP train helper"

This reverts commit 7723199b.
parent 6a3b1ca9
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,6 @@ from ._library import *
from . import version
from .version import module as __version__
from .version import api as __api_version__
from .train_helper import MLPTrainer
def get_config():
"""Returns a string containing the configuration information.
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Tue 16 Aug 14:39:22 2011
"""Trains an MLP using RProp
"""
import sys
import bob.measure
import bob.learn.mlp
import bob.learn.activation
import numpy
import numpy.linalg as la
from . import utils
class Analyzer(object):
"""Can analyze results in the end of a run. It can also save itself"""
def gentargets(self, data, target):
t = numpy.vstack(data.shape[0] * (target,))
return t, numpy.empty_like(t)
def __init__(self, train, devel, target):
super(Analyzer, self).__init__()
self.train = train
self.devel = devel
self.target = target
real_train = self.gentargets(train[0], target[0])
attack_train = self.gentargets(train[1], target[1])
real_devel = self.gentargets(devel[0], target[0])
attack_devel = self.gentargets(devel[1], target[1])
self.train_target = (real_train[0], attack_train[0])
self.train_output = (real_train[1], attack_train[1])
self.devel_target = (real_devel[0], attack_devel[0])
self.devel_output = (real_devel[1], attack_devel[1])
self.data = {} # where to store variables that will be saved
self.data['epoch'] = []
self.data['real-train-rmse'] = []
self.data['attack-train-rmse'] = []
self.data['real-devel-rmse'] = []
self.data['attack-devel-rmse'] = []
self.data['train-far'] = []
self.data['train-frr'] = []
self.data['devel-far'] = []
self.data['devel-frr'] = []
def __call__(self, machine, iteration):
"""Computes current outputs and evaluate performance"""
def evalperf(outputs, targets):
return la.norm(bob.measure.rmse(outputs, targets))
for k in range(len(self.train)):
machine(self.train[k], self.train_output[k])
machine(self.devel[k], self.devel_output[k])
self.data['real-train-rmse'].append(evalperf(self.train_output[0],
self.train_target[0]))
self.data['attack-train-rmse'].append(evalperf(self.train_output[1],
self.train_target[1]))
self.data['real-devel-rmse'].append(evalperf(self.devel_output[0],
self.devel_target[0]))
self.data['attack-devel-rmse'].append(evalperf(self.devel_output[1],
self.devel_target[1]))
thres = bob.measure.eer_threshold(self.train_output[1][:, 0],
self.train_output[0][:, 0])
train_far, train_frr = bob.measure.farfrr(
self.train_output[1][:, 0], self.train_output[0][:, 0], thres)
devel_far, devel_frr = bob.measure.farfrr(
self.devel_output[1][:, 0], self.devel_output[0][:, 0], thres)
self.data['train-far'].append(train_far)
self.data['train-frr'].append(train_frr)
self.data['devel-far'].append(devel_far)
self.data['devel-frr'].append(devel_frr)
self.data['epoch'].append(iteration)
def str_header(self):
"""Returns the string header of what I can print"""
return "iteration: RMSE:real/RMSE:attack (EER:%) ( train | devel )"
def __str__(self):
"""Returns a string representation of myself"""
retval = "%d: %.4e/%.4e (%.2f%%) | %.4e/%.4e (%.2f%%)" % \
(self.data['epoch'][-1],
self.data['real-train-rmse'][-1],
self.data['attack-train-rmse'][-1],
50 *
(self.data['train-far'][-1] + self.data['train-frr'][-1]),
self.data['real-devel-rmse'][-1],
self.data['attack-devel-rmse'][-1],
50 *
(self.data['devel-far'][-1] + self.data['devel-frr'][-1]),
)
return retval
def save(self, f):
"""Saves my contents on the bob.io.base.HDF5File you give me."""
for k, v in self.data.items():
f.set(k, numpy.array(v))
def load(self, f):
"""Loads my contents from the bob.io.base.HDF5File you give me."""
for k in f.paths():
self.data[k.strip('/')] = f.read(k)
def report(self, machine, test, pdffile, cfgfile):
"""Complete analysis of the contained data, with plots and all..."""
import matplotlib
matplotlib.use('pdf') # avoids TkInter threaded start
import matplotlib.pyplot as mpl
from matplotlib.backends.backend_pdf import PdfPages
real_test = self.gentargets(test[0], self.target[0])
attack_test = self.gentargets(test[1], self.target[1])
# test_target = (real_test[0], attack_test[0])
test_output = (real_test[1], attack_test[1])
for k in range(len(self.train)):
machine(self.train[k], self.train_output[k])
machine(self.devel[k], self.devel_output[k])
machine(test[k], test_output[k])
# Here we start with the plotting and writing of tables in files
# --------------------------------------------------------------
if isinstance(cfgfile, str):
try:
from ConfigParser import SafeConfigParser
except ImportError:
from configparser import SafeConfigParser
tmp = SafeConfigParser()
eer_thres, mhter_thres = utils.performance_table(tmp, test_output,
self.devel_output)
tmp.write(open(cfgfile, 'wb'))
else:
eer_thres, mhter_thres = utils.performance_table(cfgfile, test_output,
self.devel_output)
# returns FAR/FRR/HTER for the development and test set
devel_res, test_res = utils.perf_hter_thorough(
test_output, self.devel_output, bob.measure.eer_threshold)
pp = PdfPages(pdffile)
fig = mpl.figure()
utils.score_distribution_plot(
test_output, self.devel_output, self.train_output,
self.data['epoch'][-1], 50, eer_thres, mhter_thres)
pp.savefig(fig)
fig = mpl.figure()
utils.roc(test_output, self.devel_output, self.train_output, 100,
eer_thres, mhter_thres)
pp.savefig(fig)
fig = mpl.figure()
utils.det(test_output, self.devel_output, self.train_output, 100,
eer_thres, mhter_thres)
pp.savefig(fig)
fig = mpl.figure()
utils.epc(test_output, self.devel_output, 100)
pp.savefig(fig)
fig = mpl.figure()
utils.plot_rmse_evolution(self.data)
pp.savefig(fig)
fig = mpl.figure()
utils.plot_eer_evolution(self.data)
pp.savefig(fig)
fig = mpl.figure()
utils.evaluate_relevance(test, self.devel, self.train, machine)
pp.savefig(fig)
pp.close()
return devel_res, test_res
class MLPTrainer(object):
"""Creates a randomly initialized MLP and train it using the input data.
This method will create an MLP with the shape (`mlp_shape`) that is
provided. Then it will initialize the MLP with random weights and
biases and train it for as long as the development shows improvement
and will stop as soon as it does not anymore or we reach the maximum
number of iterations.
Performance is evaluated both on the trainining and development set
during the training, every 'epoch' training steps. Each training step
is composed of `batch_size` elements drawn randomly from all classes
available in train set.
Keyword Parameters:
train
An iterable (tuple or list) containing two arraysets: the first
contains the real accesses (target = +1) and the second contains
the attacks (target = -1).
devel
An iterable (tuple or list) containing two arraysets: the first
contains the real accesses (target = +1) and the second contains
the attacks (target = -1).
batch_size
An integer defining the number of samples per training iteration.
Good values are greater than 100.
mlp_shape
Shape of the MLP machine.
epoch
The number of training steps to wait until we measure the error.
max_iter
If given (and different than zero), should tell us the maximum
number of training steps to train the network for. If set to 0
just train until the development sets reaches the valley (in RMSE
terms).
no_improvements
If given (and different than zero), should tell us the maximum
number of iterations we should continue trying for in case we have
no more improvements on the development set average RMSE term.
This value, if set, should not be too small as this may cause a
too-early stop. Values in the order of 10% of the max_iter should
be fine.
verbose
Makes the training more verbose
"""
def __init__(self,
train,
devel,
mlp_shape,
batch_size=100,
epoch=1,
max_iter=1000,
no_improvements=0,
verbose=False,
valley_condition=0.9,
machine=None,
trainer=None,
*args, **kwargs
):
super(MLPTrainer, self).__init__()
self.train = train
self.devel = devel
self.mlp_shape = mlp_shape
self.batch_size = batch_size
self.epoch = epoch
self.max_iter = max_iter
self.no_improvements = no_improvements
self.verbose = verbose
self.valley_condition = valley_condition
self.machine = machine if machine else \
bob.learn.mlp.Machine(self.mlp_shape)
self.machine.randomize()
self.trainer = trainer if trainer else \
bob.learn.mlp.RProp(batch_size, bob.learn.mlp.SquareError(
self.machine.output_activation), machine=self.machine,
train_biases=False)
def __call__(self):
return self.make_mlp()
def make_mlp(self):
# of the minimum devel. set RMSE detected so far
VALLEY_CONDITION = self.valley_condition
last_devel_rmse = 0
def stop_condition(min_devel_rmse, devel_rmse, last_devel_rmse):
"""This method will detect a valley in the devel set RMSE"""
stop = (VALLEY_CONDITION * devel_rmse) > (min_devel_rmse) or \
abs(devel_rmse - last_devel_rmse)/(devel_rmse+last_devel_rmse) < 0.00001
return stop
target = [
numpy.array([+1], 'float64'),
numpy.array([-1], 'float64'),
]
if self.verbose:
print("Preparing analysis framework...")
analyze = Analyzer(self.train, self.devel, target)
if self.verbose:
print("Setting up training infrastructure...")
shuffler = bob.learn.mlp.DataShuffler(self.train, target)
shuffler.auto_stdnorm = True
# shape = (shuffler.data_width, nhidden, 1)
# machine = bob.learn.mlp.Machine(self.shape)
# machine.activation = bob.learn.activation.HyperbolicTangent() #the
# defaults are anyway Hyperbolic Tangent for hidden and output layer
# machine.randomize()
# import ipdb; ipdb.set_trace()
self.machine.input_subtract, self.machine.input_divide = \
shuffler.stdnorm()
# trainer = bob.learn.mlp.RProp(
# self.batch_size,
# bob.learn.mlp.SquareError(machine.output_activation), machine)
self.trainer.train_biases = True
continue_training = True
iteration = 0
min_devel_rmse = sys.float_info.max
self.best_machine = bob.learn.mlp.Machine(self.machine) # deep copy
best_machine_iteration = 0
# temporary training data selected by the shuffer
shuffled_input = numpy.ndarray(
(self.batch_size, shuffler.data_width), 'float64')
shuffled_target = numpy.ndarray(
(self.batch_size, shuffler.target_width), 'float64')
if self.verbose:
print(analyze.str_header())
try:
while continue_training:
analyze(self.machine, iteration)
if self.verbose:
print(analyze)
avg_devel_rmse = (analyze.data['real-devel-rmse'][-1] +
analyze.data['attack-devel-rmse'][-1]) / 2
# save best network, record minima
if avg_devel_rmse < min_devel_rmse:
best_machine_iteration = iteration
self.best_machine = bob.learn.mlp.Machine(
self.machine) # deep copy
if self.verbose:
print("%d: Saving best network so far with average "
"devel. RMSE = %.4e" % (iteration, avg_devel_rmse))
min_devel_rmse = avg_devel_rmse
if self.verbose:
print("%d: New valley stop threshold set to %.4e" %
(iteration, avg_devel_rmse / VALLEY_CONDITION))
# if self.verbose:
# print('min_rmse: {}, avg_rmse: {}'.format(min_devel_rmse,
# avg_devel_rmse))
if stop_condition(min_devel_rmse, avg_devel_rmse, last_devel_rmse):
if self.verbose:
print("%d: Stopping on devel valley condition" % iteration)
print("%d: Best machine happened on iteration %d with average "
"devel. RMSE of %.4e" % (iteration, best_machine_iteration,
min_devel_rmse))
break
last_devel_rmse = avg_devel_rmse
# train for 'epoch' times w/o stopping for tests
for i in range(self.epoch):
# import ipdb; ipdb.set_trace()
shuffler(data=shuffled_input, target=shuffled_target)
self.trainer.batch_size = len(shuffled_input)
self.trainer.train(
self.machine, shuffled_input, shuffled_target)
iteration += 1
if self.max_iter > 0 and iteration > self.max_iter:
if self.verbose:
print("%d: Stopping on max. iterations condition" % iteration)
print("%d: Best machine happened on iteration %d with average "
"devel. RMSE of %.4e" % (iteration, best_machine_iteration,
min_devel_rmse))
break
if self.no_improvements > 0 and \
(iteration - best_machine_iteration) > self.no_improvements:
if self.verbose:
print("%d: Stopping because did not observe MLP performance "
"improvements for %d iterations" %
(iteration, iteration - best_machine_iteration))
print("%d: Best machine happened on iteration %d with average "
"devel. RMSE of %.4e" %
(iteration, best_machine_iteration, min_devel_rmse))
break
except KeyboardInterrupt:
if self.verbose:
print("%d: User interruption captured - exiting in a clean way" %
iteration)
print("%d: Best machine happened on iteration %d with average devel. "
"RMSE of %.4e" %
(iteration, best_machine_iteration, min_devel_rmse))
analyze(self.machine, iteration)
return self.best_machine, analyze
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 17 Aug 11:42:09 2011
"""A few utilities to plot and dump results.
"""
import os
import bob.io.base
import bob.learn.mlp
import bob.measure
import numpy
import re
def pyplot_axis_fontsize(ax, size):
"""Sets the font size on axis labels"""
for label in ax.xaxis.get_ticklabels():
label.set_fontsize(size)
for label in ax.yaxis.get_ticklabels():
label.set_fontsize(size)
def score_distribution_plot(test, devel, train, epochs, bins, eer_thres,
mhter_thres):
"""Plots the score distributions in 3 different subplots"""
import matplotlib.pyplot as mpl
histoargs = {'bins': bins, 'alpha': 0.8, 'histtype': 'step', 'range': (-1,1)}
lineargs = {'alpha': 0.5}
axis_fontsize = 8
# 3 plots (same page) with the tree sets
mpl.subplot(3,1,1)
mpl.hist(test[0][:,0], label='Real Accesses', color='g', **histoargs)
mpl.hist(test[1][:,0], label='Attacks', color='b', **histoargs)
xmax, xmin, ymax, ymin = mpl.axis()
mpl.vlines(eer_thres, ymin, ymax, color='red', label='EER',
linestyles='solid', **lineargs)
mpl.vlines(mhter_thres, ymin, ymax, color='magenta',
linestyles='dashed', label='Min.HTER', **lineargs)
mpl.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc=3,
ncol=4, mode="expand", borderaxespad=0.)
mpl.grid(True, alpha=0.5)
mpl.ylabel("Test set")
axis = mpl.gca()
axis.yaxis.set_label_position('right')
pyplot_axis_fontsize(axis, axis_fontsize)
mpl.subplot(3,1,2)
mpl.hist(devel[0][:,0], color='g', **histoargs)
mpl.hist(devel[1][:,0], color='b', **histoargs)
xmax, xmin, ymax, ymin = mpl.axis()
mpl.vlines(eer_thres, ymin, ymax, color='red', linestyles='solid',
label='EER', **lineargs)
mpl.vlines(mhter_thres, ymin, ymax, color='magenta', linestyles='dashed',
label='Min.HTER', **lineargs)
mpl.grid(True, alpha=0.5)
mpl.ylabel("Development set")
axis = mpl.gca()
axis.yaxis.set_label_position('right')
pyplot_axis_fontsize(axis, axis_fontsize)
mpl.subplot(3,1,3)
mpl.hist(train[0][:,0], color='g', **histoargs)
mpl.hist(train[1][:,0], color='b', **histoargs)
xmax, xmin, ymax, ymin = mpl.axis()
mpl.vlines(eer_thres, ymin, ymax, color='red', linestyles='solid',
label='EER', **lineargs)
mpl.vlines(mhter_thres, ymin, ymax, color='magenta', linestyles='dashed',
label='Min.HTER', **lineargs)
mpl.grid(True, alpha=0.5)
mpl.ylabel("Training set")
mpl.xlabel("Score distribution after training (%d steps)" % epochs)
axis = mpl.gca()
axis.yaxis.set_label_position('right')
pyplot_axis_fontsize(axis, axis_fontsize)
def perf_hter(test_scores, devel_scores, threshold_func):
"""Computes a performance table and returns the HTER for the test and development set, as well as a formatted text with the results and the value of the threshold obtained for the given threshold function
Keyword parameters:
test_scores - the scores of the samples in the test set
devel_scores - the scores of the samples in the development set
threshold function - the type of threshold
"""
from bob.measure import farfrr
devel_attack_scores = devel_scores[1][:,0]
devel_real_scores = devel_scores[0][:,0]
test_attack_scores = test_scores[1][:,0]
test_real_scores = test_scores[0][:,0]
devel_real = devel_real_scores.shape[0]
devel_attack = devel_attack_scores.shape[0]
test_real = test_real_scores.shape[0]
test_attack = test_attack_scores.shape[0]
thres = threshold_func(devel_attack_scores, devel_real_scores)
devel_far, devel_frr = farfrr(devel_attack_scores, devel_real_scores, thres)
test_far, test_frr = farfrr(test_attack_scores, test_real_scores, thres)
devel_hter = 50 * (devel_far + devel_frr)
test_hter = 50 * (test_far + test_frr)
devel_text = " d: FAR %.2f%% / FRR %.2f%% / HTER %.2f%% " % (100*devel_far, 100*devel_frr, devel_hter)
test_text = " t: FAR %.2f%% / FRR %.2f%% / HTER %.2f%% " % (100*test_far, 100*test_frr, test_hter)
return (test_hter, devel_hter), (test_text, devel_text), thres
def perf_hter_thorough(test_scores, devel_scores, threshold_func):
"""Computes a performance table and returns the HTER for the test and development set, as well as a formatted text with the results and the value of the threshold obtained for the given threshold function
Keyword parameters:
test_scores - the scores of the samples in the test set (tuple)
devel_scores - the scores of the samples in the development set (tuple)
threshold function - the type of threshold
"""
from bob.measure import farfrr
devel_attack_scores = devel_scores[1]
devel_real_scores = devel_scores[0]
test_attack_scores = test_scores[1]
test_real_scores = test_scores[0]
devel_attack_scores = devel_attack_scores.reshape([len(devel_attack_scores)]) # all the scores whould be arrays with shape (n,)
devel_real_scores = devel_real_scores.reshape([len(devel_real_scores)])
test_attack_scores = test_attack_scores.reshape([len(test_attack_scores)])
test_real_scores = test_real_scores.reshape([len(test_real_scores)])
thres = threshold_func(devel_attack_scores, devel_real_scores)
devel_far, devel_frr = farfrr(devel_attack_scores, devel_real_scores, thres)
test_far, test_frr = farfrr(test_attack_scores, test_real_scores, thres)
return (devel_far, devel_frr), (test_far, test_frr)
def performance_table(config, test, devel):
"""Returns a string containing the performance table"""
def make_dict(prefix, far, attack_count, frr, real_count):
retval = {}
retval[prefix + 'far-percent'] = '%.2f' % (100*far,)
retval[prefix + 'frr-percent'] = '%.2f' % (100*frr,)
retval[prefix + 'hter-percent'] = '%.2f' % (50 * (far + frr),)
retval[prefix + 'misclassified-attacks'] = str(int(round(far*attack_count)))
retval[prefix + 'misclassified-real-accesses'] = str(int(round(frr*real_count)))
retval[prefix + 'total-attacks'] = str(attack_count)
retval[prefix + 'total-real-accesses'] = str(real_count)
return retval
def perf(devel_scores, test_scores, threshold_func):
from bob.measure import farfrr
devel_attack_scores = devel_scores[1][:,0]
devel_real_scores = devel_scores[0][:,0]
test_attack_scores = test_scores[1][:,0]
test_real_scores = test_scores[0][:,0]
devel_real = devel_real_scores.shape[0]
devel_attack = devel_attack_scores.shape[0]
test_real = test_real_scores.shape[0]
test_attack = test_attack_scores.shape[0]
thres = threshold_func(devel_attack_scores, devel_real_scores)
devel_far, devel_frr = farfrr(devel_attack_scores, devel_real_scores, thres)
test_far, test_frr = farfrr(test_attack_scores, test_real_scores, thres)
retval = {'threshold': '%.4f' % thres}
d = make_dict('devel-', devel_far, devel_attack, devel_frr, devel_real)
retval.update(d)
d = make_dict('test-', test_far, test_attack, test_frr, test_real)
retval.update(d)
return retval, thres
config.add_section('error-eer')
table, eer_thres = perf(devel, test, bob.measure.eer_threshold)
for key in sorted(table.keys()): config.set('error-eer', key, table[key])
config.add_section('error-mhter')
table, mhter_thres = perf(devel, test, bob.measure.min_hter_threshold)
for key in sorted(table.keys()): config.set('error-mhter', key, table[key])
return eer_thres, mhter_thres
def roc(test, devel, train, npoints, eer_thres, mhter_thres):
"""Plots the ROC curve using Matplotlib"""
import matplotlib.pyplot as mpl
import matplotlib.patches as mpp
dev_neg = devel[1][:,0]
dev_pos = devel[0][:,0]
test_neg = test[1][:,0]
test_pos = test[0][:,0]
train_neg = train[1][:,0]
train_pos = train[0][:,0]
bob.measure.plot.roc(train_neg, train_pos, npoints, color=(0.3,0.3,0.3),
linestyle='--', dashes=(6,2), alpha=0.5, label='training')
bob.measure.plot.roc(dev_neg, dev_pos, npoints, color=(0.3,0.3,0.3),
linestyle='--', dashes=(6,2), label='development')
bob.measure.plot.roc(test_neg, test_pos, npoints, color=(0,0,0),
linestyle='-', label='test')
eer_far, eer_frr = bob.measure.farfrr(test_neg, test_pos, eer_thres)
mhter_far, mhter_frr = bob.measure.farfrr(test_neg, test_pos, mhter_thres)
xmax = min(100,2*100*eer_frr)
if xmax < 5: xmax = 5
ymax = min(100,2*100*eer_far)
if ymax < 5: ymax = 5
mpl.axis([0,xmax,0,ymax])
# roundness impression for the ellipse
xratio = float(xmax)/ymax
radius = 0.7
# for the test set line
ax = mpl.gca()
exy = (100*eer_frr, 100*eer_far)
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='r', alpha=0.7,
label='EER'))
exy = (100*mhter_frr, 100*mhter_far)
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='m', alpha=0.7,
label='Min.HTER'))
# for the development set line
eer_far, eer_frr = bob.measure.farfrr(dev_neg, dev_pos, eer_thres)
mhter_far, mhter_frr = bob.measure.farfrr(dev_neg, dev_pos, mhter_thres)
exy = (100*eer_frr, 100*eer_far)
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='r', alpha=0.2,
hatch='/'))
exy = (100*mhter_frr, 100*mhter_far)
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='m', alpha=0.2,
hatch='/'))
mpl.title("ROC Curve")
mpl.xlabel('FRR (%)')
mpl.ylabel('FAR (%)')
mpl.grid(True, alpha=0.3)
mpl.legend()
def det(test, devel, train, npoints, eer_thres, mhter_thres):
"""Plots the DET curve using Matplotlib"""
import matplotlib.pyplot as mpl
import matplotlib.patches as mpp
dev_neg = devel[1][:,0]
dev_pos = devel[0][:,0]
test_neg = test[1][:,0]
test_pos = test[0][:,0]
train_neg = train[1][:,0]
train_pos = train[0][:,0]
bob.measure.plot.det(train_neg, train_pos, npoints, color=(0.3,0.3,0.3),
linestyle='--', dashes=(6,2), alpha=0.5, label='training')
bob.measure.plot.det(dev_neg, dev_pos, npoints, color=(0.3,0.3,0.3),
linestyle='--', dashes=(6,2), label='development')
bob.measure.plot.det(test_neg, test_pos, npoints, color=(0,0,0),
linestyle='-', label='test')
eer_far, eer_frr = bob.measure.farfrr(test_neg, test_pos, eer_thres)
mhter_far, mhter_frr = bob.measure.farfrr(test_neg, test_pos, mhter_thres)
xmax = min(99.99, 4*100*eer_frr)
if xmax < 5.: xmax = 5.
ymax = min(99.99, 4*100*eer_far)
if ymax < 5.: ymax = 5.
bob.measure.plot.det_axis([0.01, xmax, 0.01, ymax])
# roundness impression for the ellipse
xratio = xmax/ymax
radius = 0.07
# for the test set line
ax = mpl.gca()
exy = [bob.measure.ppndf(k) for k in (eer_frr, eer_far)]
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='r', alpha=0.7,
label='EER'))
exy = [bob.measure.ppndf(k) for k in (mhter_frr, mhter_far)]
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='m', alpha=0.7,
label='Min.HTER'))
# for the development set line
eer_far, eer_frr = bob.measure.farfrr(dev_neg, dev_pos, eer_thres)
mhter_far, mhter_frr = bob.measure.farfrr(dev_neg, dev_pos, mhter_thres)
exy = [bob.measure.ppndf(k) for k in (eer_frr, eer_far)]
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='r', alpha=0.2,
hatch='/'))
exy = [bob.measure.ppndf(k) for k in (mhter_frr, mhter_far)]
ax.add_patch(mpp.Ellipse(exy, radius*xratio, radius, color='m', alpha=0.2,
hatch='/'))
mpl.title("DET Curve")
mpl.xlabel('FRR (%)')
mpl.ylabel('FAR (%)')
mpl.grid(True, alpha=0.3)
mpl.legend()
def epc(test, devel, npoints):
"""Plots the EPC curve using Matplotlib"""
import matplotlib.pyplot as mpl
dev_neg = devel[1][:,0]
dev_pos = devel[0][:,0]
test_neg = test[1][:,0]
test_pos = test[0][:,0]
bob.measure.plot.epc(dev_neg, dev_pos, test_neg, test_pos, npoints,
color=(0,0,0), linestyle='-')
mpl.title('EPC Curve')
mpl.xlabel('Cost')
mpl.ylabel('Min. HTER (%)')
mpl.grid(True, alpha=0.3)
def plot_rmse_evolution(data):
"""Performance evolution during training"""
import matplotlib.pyplot as mpl
mpl.plot(data['epoch'], data['real-train-rmse'], color='green',
linestyle='--', dashes=(6,2), alpha=0.5, label='Real Access (train)')
mpl.plot(data['epoch'], data['attack-train-rmse'], color='blue',
linestyle='--', dashes=(6,2), alpha=0.5, label='Attack (train)')
train = [0.5*sum(k) for k in zip(data['real-train-rmse'],
data['attack-train-rmse'])]
mpl.plot(data['epoch'], train, color='black',
linestyle='--', dashes=(6,2), label='Total (train)')
mpl.plot(data['epoch'], data['real-devel-rmse'], color='green',
alpha=0.5, label='Real Access (devel)')
mpl.plot(data['epoch'], data['attack-devel-rmse'], color='blue',
alpha=0.5, label='Attack (devel)')
devel = [0.5*sum(k) for k in zip(data['real-devel-rmse'],
data['attack-devel-rmse'])]
mpl.plot(data['epoch'], devel, color='black', label='Total (devel)')
mpl.title('RMSE Evolution')
mpl.xlabel('Training steps')
mpl.ylabel('RMSE')
mpl.grid(True, alpha=0.3)
mpl.legend()
# Reduce the size of the legend text
leg = mpl.gca().get_legend()
ltext = leg.get_texts()
mpl.setp(ltext, fontsize='small')
def plot_eer_evolution(data):
"""Performance evolution during training"""
import matplotlib.pyplot as mpl
train = [50*sum(k) for k in zip(data['train-frr'], data['train-far'])]
mpl.plot(data['epoch'], train, color='black', alpha=0.6,
linestyle='--', dashes=(6,2), label='EER (train)')
devel = [50*sum(k) for k in zip(data['devel-frr'], data['devel-far'])]
mpl.plot(data['epoch'], devel, color='black', label='EER (devel)')
mpl.title('EER Evolution (threshold from training set)')
mpl.xlabel('Training steps')
mpl.ylabel('Equal Error Rate')
mpl.grid(True, alpha=0.3)
mpl.legend()
# Reduce the size of the legend text
leg = mpl.gca().get_legend()
ltext = leg.get_texts()
mpl.setp(ltext, fontsize='small')
def evaluate_relevance(test, devel, train, machine):
"""Evaluates the relevance of each component"""
import matplotlib.pyplot as mpl
test_relevance = bob.measure.relevance(numpy.vstack(test),
machine)
test_relevance = [test_relevance[k] for k in range(test_relevance.shape[0])]
devel_relevance = bob.measure.relevance(numpy.vstack(devel),
machine)
devel_relevance = [devel_relevance[k] for k in range(devel_relevance.shape[0])]
train_relevance = bob.measure.relevance(numpy.vstack(train),
machine)
train_relevance = [train_relevance[k] for k in range(train_relevance.shape[0])]
data_width = len(test_relevance)
spacing = 0.1
width = (1.0-(2*spacing))/3.0
train_bottom = [k+spacing for k in range(data_width)]
devel_bottom = [k+width for k in train_bottom]
test_bottom = [k+width for k in devel_bottom]
mpl.barh(test_bottom, test_relevance, width, label='Test', color='black')
mpl.barh(devel_bottom, devel_relevance, width, label='Development',
color=(0.4, 0.4, 0.4))
mpl.barh(train_bottom, train_relevance, width, label='Training',
color=(0.9, 0.9, 0.9))
# labels and other details
ax = mpl.gca()
ax.set_yticks([k+spacing+1.5*width for k in range(data_width)])
ax.set_yticklabels(range(1,data_width+1))
mpl.title('Feature Relevance')
mpl.ylabel('Components')
mpl.xlabel('Relevance')
mpl.legend()
mpl.grid(True, alpha=0.3)
def get_hter(machine, datadir, mhter=False, verbose=False):
"""Returns the HTER on the test and development sets given the machine and
data directories. If the flag 'mhter' is set to True, then calculate the
test set HTER using the threshold found by minimizing the HTER on the
development set, otherwise, use the threshold at the EER on the development
set."""
def loader(group, cls, inputdir, verbose):
filename = os.path.join(inputdir, '%s-%s.hdf5' % (group, cls))
retval = bob.io.base.load(filename)
if verbose: print "[%-5s] %-6s: %8d" % (group, cls, retval.shape[0])
return retval
devel_real = loader('devel', 'real', datadir, verbose)
devel_attack = loader('devel', 'attack', datadir, verbose)
test_real = loader('test', 'real', datadir, verbose)
test_attack = loader('test', 'attack', datadir, verbose)
mfile = bob.io.base.HDF5File(machine, 'r')
mlp = bob.learn.mlp.Machine(mfile)
# runs the data through the MLP machine
dev_pos = mlp(devel_real)[:,0]
dev_neg = mlp(devel_attack)[:,0]
# calculates the threshold
if (mhter):
thres = bob.measure.min_hter_threshold(dev_neg, dev_pos)
else:
thres = bob.measure.eer_threshold(dev_neg, dev_pos)
# calculates the HTER on the test set using the previously calculated thres.
tst_pos = mlp(test_real)[:,0]
tst_neg = mlp(test_attack)[:,0]
dev_far, dev_frr = bob.measure.farfrr(dev_neg, dev_pos, thres)
far, frr = bob.measure.farfrr(tst_neg, tst_pos, thres)
return ((dev_far + dev_frr) / 2., (far + frr) / 2.)
def parse_error_table(table, mhter):
"""Parses result tables and extracts the HTER for both development and test
sets, returning them as a tuple.
The value of "mhter" is a boolean indicating if we should take the EER
threshold or the Min.HTER threshold performance values.
"""
perf_line = re.compile(r'^.*HTER\s*(?P<val>\d+\.\d+)%\s*$')
values = []
for line in open(table, 'rt'):
m = perf_line.match(line)
if m: values.append(float(m.groupdict()['val']))
if mhter: return values[3:]
return values[:2]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment