Commit 676fac8a authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Remove deprecated code

parent a682de99
Pipeline #45011 failed with stage
in 4 minutes and 19 seconds
from . import extractor, preprocessor, database, test
from . import extractor, preprocessor, database
def get_config():
......
......@@ -8,7 +8,7 @@ from bob.pad.face.extractor import ImageQualityMeasure
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import make_pipeline
from bob.pad.base.pipelines.vanilla_pad import FrameContainersToFrames
from bob.pad.face.transformer import VideoToFrames
import bob.pipelines as mario
database = globals().get("database")
......@@ -42,7 +42,7 @@ extractor = mario.wrap(
)
# new stuff #
frame_cont_to_array = FrameContainersToFrames()
frame_cont_to_array = VideoToFrames()
param_grid = [
{"C": [1, 10, 100, 1000], "kernel": ["linear"]},
......
from .database import VideoPadFile
from .replay import ReplayPadDatabase
from .replay_mobile import ReplayMobilePadDatabase
from .msu_mfsd import MsuMfsdPadDatabase
from .aggregated_db import AggregatedDbPadDatabase
from .mifs import MIFSPadDatabase
from .batl import BatlPadDatabase
from .celeb_a import CELEBAPadDatabase
......@@ -32,8 +30,6 @@ __appropriate__(
VideoPadFile,
ReplayPadDatabase,
ReplayMobilePadDatabase,
MsuMfsdPadDatabase,
AggregatedDbPadDatabase,
MIFSPadDatabase,
BatlPadDatabase,
CELEBAPadDatabase,
......
This diff is collapsed.
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from bob.bio.video import FrameSelector, FrameContainer
from bob.pad.face.database import VideoPadFile # Used in MsuMfsdPadFile class
from bob.pad.base.database import PadDatabase
from bob.extension import rc
import os
import numpy as np
class MsuMfsdPadFile(VideoPadFile):
"""
A high level implementation of the File class for the MSU MFSD database.
"""
def __init__(self, f):
"""
**Parameters:**
``f`` : :py:class:`object`
An instance of the File class defined in the low level db interface
of the MSU MFSD database, in the bob.db.msu_mfsd_mod.models.py file.
"""
self.f = f
# this f is actually an instance of the File class that is defined in
# bob.db.msu_mfsd_mod.models and the PadFile class here needs
# client_id, path, attack_type, file_id for initialization. We have to
# convert information here and provide them to PadFile. attack_type is a
# little tricky to get here. Based on the documentation of PadFile:
# In cased of a spoofed data, this parameter should indicate what kind of spoofed attack it is.
# The default None value is interpreted that the PadFile is a genuine or real sample.
if f.is_real():
attack_type = None
else:
attack_type = "attack"
# attack_type is a string and I decided to make it like this for this
# particular database. You can do whatever you want for your own database.
super(MsuMfsdPadFile, self).__init__(
client_id=f.client_id, path=f.path, attack_type=attack_type, file_id=f.id
)
def load(
self,
directory=None,
extension=None,
frame_selector=FrameSelector(selection_style="all"),
):
"""
Overridden version of the load method defined in the ``VideoPadFile``.
**Parameters:**
``directory`` : :py:class:`str`
String containing the path to the MSU MFSD database.
Default: None
``extension`` : :py:class:`str`
Extension of the video files in the MSU MFSD database.
Note: ``extension`` value is not used in the code of this method.
Default: None
``frame_selector`` : ``FrameSelector``
The frame selector to use.
**Returns:**
``video_data`` : FrameContainer
Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer``
for further details.
"""
_, extension = os.path.splitext(self.f.videofile()) # get file extension
video_data_array = self.f.load(directory=directory, extension=extension)
return frame_selector(video_data_array)
class MsuMfsdPadDatabase(PadDatabase):
"""
A high level implementation of the Database class for the MSU MFSD database.
"""
def __init__(
self,
protocol="grandtest", # grandtest is the default protocol for this database
original_directory=None,
original_extension=None,
annotation_directory=None,
annotation_extension='.json',
annotation_type='json',
**kwargs
):
"""
**Parameters:**
``protocol`` : :py:class:`str` or ``None``
The name of the protocol that defines the default experimental setup for this database.
``original_directory`` : :py:class:`str`
The directory where the original data of the database are stored.
``original_extension`` : :py:class:`str`
The file name extension of the original data.
``kwargs``
The arguments of the :py:class:`bob.bio.base.database.BioDatabase` base class constructor.
"""
from bob.db.msu_mfsd_mod import Database as LowLevelDatabase
self.db = LowLevelDatabase()
# Since the high level API expects different group names than what the low
# level API offers, you need to convert them when necessary
self.low_level_group_names = (
"train",
"devel",
"test",
) # group names in the low-level database interface
self.high_level_group_names = (
"train",
"dev",
"eval",
) # names are expected to be like that in objects() function
# Always use super to call parent class methods.
super(MsuMfsdPadDatabase, self).__init__(
name="msu-mfsd",
protocol=protocol,
original_directory=original_directory,
original_extension=original_extension,
**kwargs
)
@property
def original_directory(self):
return self.db.original_directory
@original_directory.setter
def original_directory(self, value):
self.db.original_directory = value
def objects(
self, groups=None, protocol=None, purposes=None, model_ids=None, **kwargs
):
"""
This function returns lists of MsuMfsdPadFile objects, which fulfill the given restrictions.
Keyword parameters:
``groups`` : :py:class:`str`
OR a list of strings.
The groups of which the clients should be returned.
Usually, groups are one or more elements of ('train', 'dev', 'eval')
``protocol`` : :py:class:`str`
The protocol for which the clients should be retrieved.
Note: this argument is not used in the code, because ``objects`` method of the
low-level BD interface of the MSU MFSD doesn't have ``protocol`` argument.
``purposes`` : :py:class:`str`
OR a list of strings.
The purposes for which File objects should be retrieved.
Usually it is either 'real' or 'attack'.
``model_ids``
This parameter is not supported in PAD databases yet.
**Returns:**
``files`` : [MsuMfsdPadFile]
A list of MsuMfsdPadFile objects.
"""
# Convert group names to low-level group names here.
groups = self.convert_names_to_lowlevel(
groups, self.low_level_group_names, self.high_level_group_names
)
# Since this database was designed for PAD experiments, nothing special
# needs to be done here.
files = self.db.objects(group=groups, cls=purposes, **kwargs)
files = [MsuMfsdPadFile(f) for f in files]
for f in files:
f.original_directory = self.original_directory
f.annotation_directory = self.annotation_directory
f.annotation_extension = self.annotation_extension
f.annotation_type = self.annotation_type
return files
def annotations(self, f):
"""
Return annotations for a given file object ``f``, which is an instance
of ``MsuMfsdPadFile`` defined in the HLDI of the MSU MFSD DB.
The ``load()`` method of ``MsuMfsdPadFile`` class (see above)
returns a video, therefore this method returns bounding-box annotations
for each video frame. The annotations are returned as dictionary of dictionaries.
**Parameters:**
``f`` : :py:class:`object`
An instance of ``MsuMfsdPadFile`` defined above.
**Returns:**
``annotations`` : :py:class:`dict`
A dictionary containing the annotations for each frame in the video.
Dictionary structure: ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``.
Where ``frameN_dict = {'topleft': (row, col), 'bottomright': (row, col)}``
is the dictionary defining the coordinates of the face bounding box in frame N.
"""
annots = f.f.bbx(
directory=self.original_directory
) # numpy array containing the face bounding box data for each video frame, returned data format described in the f.bbx() method of the low level interface
annotations = {} # dictionary to return
for frame_annots in annots:
topleft = (np.int(frame_annots[2]), np.int(frame_annots[1]))
bottomright = (
np.int(frame_annots[2] + frame_annots[4]),
np.int(frame_annots[1] + frame_annots[3]),
)
annotations[str(np.int(frame_annots[0]))] = {
"topleft": topleft,
"bottomright": bottomright,
}
return annotations
#!/usr/bin/env python
# encoding: utf-8
import numpy
from bob.bio.base.extractor import Extractor
from bob.core.log import setup
logger = setup("bob.pad.face")
from scipy.fftpack import rfft
class LTSS(Extractor, object):
"""Compute Long-term spectral statistics of a pulse signal.
The features are described in the following article:
H. Muckenhirn, P. Korshunov, M. Magimai-Doss, and S. Marcel
Long-Term Spectral Statistics for Voice Presentation Attack Detection,
IEEE/ACM Trans. Audio, Speech and Language Processing. vol 25, n. 11, 2017
Attributes
----------
window_size : :obj:`int`
The size of the window where FFT is computed
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
concat : :obj:`bool`
Flag if you would like to concatenate features from the 3 color channels
time : :obj:`int`
The length of the signal to consider (in seconds)
"""
def __init__(self, window_size=25, framerate=25, nfft=64, concat=False, debug=False, time=0, **kwargs):
"""Init function
Parameters
----------
window_size : :obj:`int`
The size of the window where FFT is computed
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
concat : :obj:`bool`
Flag if you would like to concatenate features from the 3 color channels
time : :obj:`int`
The length of the signal to consider (in seconds)
"""
super(LTSS, self).__init__(**kwargs)
self.framerate = framerate
# TODO: try to use window size as NFFT - Guillaume HEUSCH, 04-07-2018
self.nfft = nfft
self.debug = debug
self.window_size = window_size
self.concat = concat
self.time = time
def _get_ltss(self, signal):
"""Compute long term spectral statistics for a signal
Parameters
----------
signal : :py:class:`numpy.ndarray`
The signal
Returns
-------
:py:class:`numpy.ndarray`
The spectral statistics of the signal.
"""
window_stride = int(self.window_size / 2)
# log-magnitude of DFT coefficients
log_mags = []
# go through windows
for w in range(0, (signal.shape[0] - self.window_size), window_stride):
# n is even, as a consequence the fft is as follows [y(0), Re(y(1)), Im(y(1)), ..., Re(y(n/2))]
# i.e. each coefficient, except first and last, is represented by two numbers (real + imaginary)
fft = rfft(signal[w:w+self.window_size], n=self.nfft)
# the magnitude is the norm of the complex numbers, so its size is n/2 + 1
mags = numpy.zeros((int(self.nfft/2) + 1), dtype=numpy.float64)
# first coeff is real
if abs(fft[0]) < 1:
mags[0] = 1
else:
mags[0] = abs(fft[0])
# go through coeffs 2 to n/2
index = 1
for i in range(1, (fft.shape[0]-1), 2):
mags[index] = numpy.sqrt(fft[i]**2 + fft[i+1]**2)
if mags[index] < 1:
mags[index] = 1
index += 1
# last coeff is real too
if abs(fft[-1]) < 1:
mags[index] = 1
else:
mags[index] = abs(fft[-1])
log_mags.append(numpy.log(mags))
# build final feature
log_mags = numpy.array(log_mags)
mean = numpy.mean(log_mags, axis=0)
std = numpy.std(log_mags, axis=0)
ltss = numpy.concatenate([mean, std])
return ltss
def __call__(self, signal):
"""Computes the long-term spectral statistics for given pulse signals.
Parameters
----------
signal: numpy.ndarray
The signal
Returns
-------
feature: numpy.ndarray
the computed LTSS features
"""
# sanity check
if signal.ndim == 1:
if numpy.isnan(numpy.sum(signal)):
return
if signal.ndim == 2 and (signal.shape[1] == 3):
for i in range(signal.shape[1]):
if numpy.isnan(numpy.sum(signal[:, i])):
return
# truncate the signal according to time
if self.time > 0:
number_of_frames = self.time * self.framerate
# check that the truncated signal is not longer
# than the original one
if number_of_frames < signal.shape[0]:
if signal.ndim == 1:
signal = signal[:number_of_frames]
if signal.ndim == 2 and (signal.shape[1] == 3):
new_signal = numpy.zeros((number_of_frames, 3))
for i in range(signal.shape[1]):
new_signal[:, i] = signal[:number_of_frames, i]
signal = new_signal
else:
logger.warning("Sequence should be truncated to {}, but only contains {} => keeping original one".format(number_of_frames, signal.shape[0]))
# also, be sure that the window_size is not bigger that the signal
if self.window_size > int(signal.shape[0] / 2):
self.window_size = int(signal.shape[0] / 2)
logger.warning("Window size reduced to {}".format(self.window_size))
# we have a single pulse
if signal.ndim == 1:
feature = self._get_ltss(signal)
# pulse for the 3 color channels
if signal.ndim == 2 and (signal.shape[1] == 3):
if not self.concat:
feature = self._get_ltss(signal[:, 1])
else:
ltss = []
for i in range(signal.shape[1]):
ltss.append(self._get_ltss(signal[:, i]))
feature = numpy.concatenate([ltss[0], ltss[1], ltss[2]])
if numpy.isnan(numpy.sum(feature)):
logger.warn("Feature not extracted")
return
if numpy.sum(feature) == 0:
logger.warn("Feature not extracted")
return
return feature
#!/usr/bin/env python
# encoding: utf-8
import numpy
from bob.bio.base.extractor import Extractor
from bob.core.log import setup
logger = setup("bob.pad.face")
class LiSpectralFeatures(Extractor, object):
"""Compute features from pulse signals in the three color channels.
The features are described in the following article:
X. Li, J. Komulainen, G. Zhao, P-C Yuen and M. Pietikainen,
Generalized Face Anti-spoofing by Detecting Pulse From Face Videos
Intl Conf. on Pattern Recognition (ICPR), 2016.
Attributes
----------
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
"""
def __init__(self, framerate=25, nfft=512, debug=False, **kwargs):
"""Init function
Parameters
----------
framerate : :obj:`int`
The sampling frequency of the signal (i.e the framerate ...)
nfft : :obj:`int`
Number of points to compute the FFT
debug : :obj:`bool`
Plot stuff
"""
super(LiSpectralFeatures, self).__init__()
self.framerate = framerate
self.nfft = nfft
self.debug = debug
def __call__(self, signal):
"""Compute the frequency features for the given signal.
Parameters
----------
signal : :py:class:`numpy.ndarray`
The signal
Returns
-------
:py:class:`numpy.ndarray`
the computed features
"""
# sanity check
assert signal.ndim == 2 and signal.shape[1] == 3, "You should provide 3 pulse signals"
for i in range(3):
if numpy.isnan(numpy.sum(signal[:, i])):
return
feature = numpy.zeros(6)
# when keypoints have not been detected, the pulse is zero everywhere
# hence, no psd and no features
zero_counter = 0
for i in range(3):
if numpy.sum(signal[:, i]) == 0:
zero_counter += 1
if zero_counter == 3:
logger.warn("Feature is all zeros")
return feature
# get the frequency spectrum
spectrum_dim = int((self.nfft / 2) + 1)
ffts = numpy.zeros((3, spectrum_dim))
f = numpy.fft.fftfreq(self.nfft) * self.framerate
f = abs(f[:spectrum_dim])
for i in range(3):
ffts[i] = abs(numpy.fft.rfft(signal[:, i], n=self.nfft))
# find the max of the frequency spectrum in the range of interest
first = numpy.where(f > 0.7)[0]
last = numpy.where(f < 4)[0]
first_index = first[0]
last_index = last[-1]
range_of_interest = range(first_index, last_index + 1, 1)
# build the feature vector
for i in range(3):
total_power = numpy.sum(ffts[i, range_of_interest])
max_power = numpy.max(ffts[i, range_of_interest])
feature[i] = max_power
if total_power == 0:
feature[i+3] = 0
else:
feature[i+3] = max_power / total_power
# plot stuff, if asked for
if self.debug:
from matplotlib import pyplot
for i in range(3):
max_idx = numpy.argmax(ffts[i, range_of_interest])
f_max = f[range_of_interest[max_idx]]
logger.debug("Inferred HR = {}".format(f_max*60))
pyplot.plot(f, ffts[i], 'k')
xmax, xmin, ymax, ymin = pyplot.axis()
pyplot.vlines(f[range_of_interest[max_idx]], ymin, ymax, color='red')
pyplot.vlines(f[first_index], ymin, ymax, color='green')
pyplot.vlines(f[last_index], ymin, ymax, color='green')
pyplot.show()
if numpy.isnan(numpy.sum(feature)):
logger.warn("Feature not extracted")
return
return feature
#!/usr/bin/env python
# encoding: utf-8
import numpy
from bob.bio.base.extractor import Extractor
import logging
logger = logging.getLogger("bob.pad.face")
class NormalizeLength(Extractor, object):
"""
Normalize the length of feature vectors, such that
they all have the same dimensions
**Parameters:**
length: int
The final length of the final feature vector
requires_training: boolean
This extractor actually may requires "training".
The goal here is to retrieve the length of the shortest sequence
debug: boolean
Plot stuff
"""
def __init__(self, length=-1, debug=False, requires_training=True, **kwargs):
super(NormalizeLength, self).__init__(requires_training=requires_training, **kwargs)
self.length = length
self.debug = debug
def __call__(self, signal):
"""
Normalize the length of the signal
**Parameters:**
signal: numpy.array
The signal
**Returns:**
signal: numpy.array
the signal with the provided length
"""
# we have a single pulse signal
if signal.ndim == 1:
signal = signal[:self.length]
# we have 3 pulse signal (Li's preprocessing)
# in this case, return the signal corresponding to the green channel
if signal.ndim == 2 and (signal.shape[1] == 3):
signal = signal[:self.length, 1]
if numpy.isnan(numpy.sum(signal)):
return
if signal.shape[0] < self.length:
logger.debug("signal shorter than training shape: {} vs {}".format(signal.shape[0], self.length))
import sys
sys.exit()
tmp = numpy.zeros((self.length), dtype=signal.dtype)
tmp[:, signal.shape[0]]
signal = tmp
if self.debug:
from matplotlib import pyplot
pyplot.plot(signal, 'k')
pyplot.title('Signal truncated')
pyplot.show()
return signal
def train(self, training_data, extractor_file):
"""
This function determines the shortest length across the training set.
It will be used to normalize the length of all the sequences.
**Parameters:**
training_data : [object] or [[object]]
A list of *preprocessed* data that can be used for training the extractor.
Data will be provided in a single list, if ``split_training_features_by_client = False`` was specified in the constructor,
otherwise the data will be split into lists, each of which contains the data of a single (training-)client.
extractor_file : str
The file to write.
This file should be readable with the :py:meth:`load` function.
"""
self.length = 100000
for i in range(len(training_data)):
if training_data[i].shape[0] < self.length:
self.length = training_data[i].shape[0]
logger.info("Signals will be truncated to {} dimensions".format(self.length))
from bob.bio.video import FrameContainer
from bob.io.base import HDF5File