diff --git a/.gitignore b/.gitignore index f25d410d48fd4cbeb448e9cb88d26881828d107e..722dfac6807f6eb07f51e93d4f486817af4a5e9b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,7 @@ develop-eggs sphinx dist record.txt +results +submitted.sql3 +config +temp* diff --git a/bob/pad/face/config/preprocessor/filename.py b/bob/pad/face/config/preprocessor/filename.py index ab7fa9103d2a396ddf4ee7f66a9bbec1450c3648..9f13809c01701ab121a34e59b2ff6c9c7681f2b3 100644 --- a/bob/pad/face/config/preprocessor/filename.py +++ b/bob/pad/face/config/preprocessor/filename.py @@ -1,4 +1,7 @@ from bob.bio.base.preprocessor import Filename # This preprocessor does nothing, returning just the name of the file to extract the features from: + +# WARNING: if you use this, you should provide the preprocessed directory, as the database directory +# i.e. ./bin/spoof.py [config.py] --preprocessed-directory /idiap/group/biometric/databases/pad/replay/protocols/replayattack-database/ empty_preprocessor = Filename() diff --git a/bob/pad/face/database/__init__.py b/bob/pad/face/database/__init__.py index 021b5c5cb612a53316ffaffaa9d0d53745bbc724..c326c7ded5d3d45c290f290a5dede0c012ff5420 100644 --- a/bob/pad/face/database/__init__.py +++ b/bob/pad/face/database/__init__.py @@ -7,7 +7,6 @@ from .mifs import MIFSPadDatabase from .batl import BatlPadDatabase from .celeb_a import CELEBAPadDatabase - # gets sphinx autodoc done right - don't remove it def __appropriate__(*args): """Says object was actually declared here, and not in the import module. diff --git a/bob/pad/face/database/maskattack.py b/bob/pad/face/database/maskattack.py new file mode 100644 index 0000000000000000000000000000000000000000..a4d8daf72faa908428ff902a4bc0ef452abf387e --- /dev/null +++ b/bob/pad/face/database/maskattack.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +import os +import numpy as np +import bob.io.video +from bob.bio.video import FrameSelector, FrameContainer +from bob.pad.face.database import VideoPadFile # Used in MsuMfsdPadFile class +from bob.pad.base.database import PadDatabase + +class MaskAttackPadFile(VideoPadFile): + """ + A high level implementation of the File class for the 3DMAD database. + """ + + def __init__(self, f): + """ + **Parameters:** + + ``f`` : :py:class:`object` + An instance of the File class defined in the low level db interface + of the 3DMAD database, in the bob.db.maskattack.models.py file. + """ + + self.f = f + if f.is_real(): + attack_type = None + else: + attack_type = 'mask' + + super(MaskAttackPadFile, self).__init__( + client_id=f.client_id, + path=f.path, + attack_type=attack_type, + file_id=f.id) + + #========================================================================== + def load(self, directory=None, extension='.avi', frame_selector=FrameSelector(selection_style='all')): + """ + Overridden version of the load method defined in the ``VideoPadFile``. + + **Parameters:** + + ``directory`` : :py:class:`str` + String containing the path to the MSU MFSD database. + Default: None + + ``extension`` : :py:class:`str` + Extension of the video files in the MSU MFSD database. + Note: ``extension`` value is not used in the code of this method. + Default: None + + ``frame_selector`` : ``FrameSelector`` + The frame selector to use. + + **Returns:** + + ``video_data`` : FrameContainer + Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer`` + for further details. + """ + vfilename = self.make_path(directory, extension) + video = bob.io.video.reader(vfilename) + video_data_array = video.load() + + return frame_selector(video_data_array) + + +#============================================================================== +class MaskAttackPadDatabase(PadDatabase): + """ + A high level implementation of the Database class for the 3DMAD database. + """ + + def __init__( + self, + protocol=None, + original_directory=None, + original_extension='.avi', + **kwargs): + """ + **Parameters:** + + ``protocol`` : :py:class:`str` or ``None`` + The name of the protocol that defines the default experimental setup for this database. + + ``original_directory`` : :py:class:`str` + The directory where the original data of the database are stored. + + ``original_extension`` : :py:class:`str` + The file name extension of the original data. + + ``kwargs`` + The arguments of the :py:class:`bob.bio.base.database.BioDatabase` base class constructor. + """ + + from bob.db.maskattack import Database as LowLevelDatabase + + self.db = LowLevelDatabase() + + # Since the high level API expects different group names than what the low + # level API offers, you need to convert them when necessary + self.low_level_group_names = ( + 'world', 'dev', + 'test') # group names in the low-level database interface + self.high_level_group_names = ( + 'train', 'dev', + 'eval') # names are expected to be like that in objects() function + + # Always use super to call parent class methods. + super(MaskAttackPadDatabase, self).__init__( + name='maskattack', + protocol=protocol, + original_directory=original_directory, + original_extension=original_extension, + **kwargs) + + @property + def original_directory(self): + return self.db.original_directory + + @original_directory.setter + def original_directory(self, value): + self.db.original_directory = value + + #========================================================================== + def objects(self, + groups=None, + protocol=None, + purposes=None, + model_ids=None, + **kwargs): + """ + This function returns lists of MaskAttackPadFile objects, which fulfill the given restrictions. + + Keyword parameters: + + ``groups`` : :py:class:`str` + OR a list of strings. + The groups of which the clients should be returned. + Usually, groups are one or more elements of ('train', 'dev', 'eval') + + ``protocol`` : :py:class:`str` + The protocol for which the clients should be retrieved. + Note: this argument is not used in the code, because ``objects`` method of the + low-level BD interface of the MSU MFSD doesn't have ``protocol`` argument. + + ``purposes`` : :py:class:`str` + OR a list of strings. + The purposes for which File objects should be retrieved. + Usually it is either 'real' or 'attack'. + + ``model_ids`` + This parameter is not supported in PAD databases yet. + + **Returns:** + + ``files`` : [MsuMfsdPadFile] + A list of MsuMfsdPadFile objects. + """ + + # Convert group names to low-level group names here. + groups = self.convert_names_to_lowlevel( + groups, self.low_level_group_names, self.high_level_group_names) + # Since this database was designed for PAD experiments, nothing special + # needs to be done here. + + print("Objects method called with groups = {}, protocol = {}, purposes = {}, model_ids = {}".format(groups, protocol, purposes, model_ids)) + #print("Kwargs -> {}".format(**kwargs)) + #print("Translated groups = {}".frima) + + # for training + + # for dev + + # for eval + lowlevel_purposes = [] + if purposes == 'real': + lowlevel_purposes = ['trainReal', 'probeReal', 'classifyReal'] + else: + lowlevel_purposes = ['trainMask', 'probeMask', 'classifyMask'] + + #if groups == ['world']: + # lowlevel_purposes = ['trainMask'] + # if groups == ['world']: + # lowlevel_purposes = ['trainMask'] + #print(lowlevel_purposes) + files = self.db.objects(sets=groups, purposes=lowlevel_purposes, **kwargs) + + files = [MaskAttackPadFile(f) for f in files] + + return files + + #========================================================================== + def annotations(self, file): + """ + Return annotations for a given file object ``f``, which is an instance + of ``MsuMfsdPadFile`` defined in the HLDI of the MSU MFSD DB. + The ``load()`` method of ``MsuMfsdPadFile`` class (see above) + returns a video, therefore this method returns bounding-box annotations + for each video frame. The annotations are returned as dictionary of dictionaries. + + **Parameters:** + + ``f`` : :py:class:`object` + An instance of ``MsuMfsdPadFile`` defined above. + + **Returns:** + + ``annotations`` : :py:class:`dict` + A dictionary containing the annotations for each frame in the video. + Dictionary structure: ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``. + Where ``frameN_dict = {'topleft': (row, col), 'bottomright': (row, col)}`` + is the dictionary defining the coordinates of the face bounding box in frame N. + """ + return None + + #annots = f.f.bbx( + # directory=self.original_directory + #) # numpy array containing the face bounding box data for each video frame, returned data format described in the f.bbx() method of the low level interface + + #annotations = {} # dictionary to return + + #for frame_annots in annots: + + # topleft = (np.int(frame_annots[2]), np.int(frame_annots[1])) + # bottomright = (np.int(frame_annots[2] + frame_annots[4]), + # np.int(frame_annots[1] + frame_annots[3])) + + # annotations[str(np.int(frame_annots[0]))] = { + # 'topleft': topleft, + # 'bottomright': bottomright + # } + + #return annotations + + #def model_with_ids_protocol(groups=None, protocol=None): + # pass diff --git a/bob/pad/face/extractor/LTSS.py b/bob/pad/face/extractor/LTSS.py new file mode 100644 index 0000000000000000000000000000000000000000..c1077a8888af2d87088d2d900b1ee5731f49decf --- /dev/null +++ b/bob/pad/face/extractor/LTSS.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.bio.base.extractor import Extractor + +from bob.core.log import setup +logger = setup("bob.pad.face") + +from scipy.fftpack import rfft + +class LTSS(Extractor, object): + """Compute Long-term spectral statistics of a pulse signal. + + The features are described in the following article: + + H. Muckenhirn, P. Korshunov, M. Magimai-Doss, and S. Marcel + Long-Term Spectral Statistics for Voice Presentation Attack Detection, + IEEE/ACM Trans. Audio, Speech and Language Processing. vol 25, n. 11, 2017 + + Attributes + ---------- + window_size : :obj:`int` + The size of the window where FFT is computed + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + concat : :obj:`bool` + Flag if you would like to concatenate features from the 3 color channels + time : :obj:`int` + The length of the signal to consider (in seconds) + + """ + def __init__(self, window_size=25, framerate=25, nfft=64, concat=False, debug=False, time=0, **kwargs): + """Init function + + Parameters + ---------- + window_size : :obj:`int` + The size of the window where FFT is computed + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + concat : :obj:`bool` + Flag if you would like to concatenate features from the 3 color channels + time : :obj:`int` + The length of the signal to consider (in seconds) + + """ + super(LTSS, self).__init__() + self.framerate = framerate + self.nfft = nfft + self.debug = debug + self.window_size = window_size + self.concat = concat + self.time = time + + def _get_ltss(self, signal): + """Compute long term spectral statistics for a signal + + Parameters + ---------- + signal : :py:class:`numpy.ndarray` + The signal + + Returns + ------- + :py:class:`numpy.ndarray` + The spectral statistics of the signal. + + """ + window_stride = int(self.window_size / 2) + + # log-magnitude of DFT coefficients + log_mags = [] + + # go through windows + for w in range(0, (signal.shape[0] - self.window_size), window_stride): + fft = rfft(signal[w:w+self.window_size], n=self.nfft) + mags = numpy.zeros(int(self.nfft/2), dtype=numpy.float64) + + # XXX : bug was here (no clipping) + if abs(fft[0]) < 1: + mags[0] = 1 + else: + mags[0] = abs(fft[0]) + # XXX + + index = 1 + for i in range(1, (fft.shape[0]-1), 2): + mags[index] = numpy.sqrt(fft[i]**2 + fft[i+1]**2) + if mags[index] < 1: + mags[index] = 1 + index += 1 + log_mags.append(numpy.log(mags)) + + log_mags = numpy.array(log_mags) + mean = numpy.mean(log_mags, axis=0) + std = numpy.std(log_mags, axis=0) + ltss = numpy.concatenate([mean, std]) + return ltss + + + def __call__(self, signal): + """Computes the long-term spectral statistics for given pulse signals. + + Parameters + ---------- + signal: numpy.ndarray + The signal + + Returns + ------- + feature: numpy.ndarray + the computed LTSS features + + """ + # sanity check + if signal.ndim == 1: + if numpy.isnan(numpy.sum(signal)): + return + if signal.ndim == 2 and (signal.shape[1] == 3): + for i in range(signal.shape[1]): + if numpy.isnan(numpy.sum(signal[:, i])): + return + + # truncate the signal according to time + if self.time > 0: + number_of_frames = self.time * self.framerate + + # check that the truncated signal is not longer + # than the original one + if number_of_frames < signal.shape[0]: + + if signal.ndim == 1: + signal = signal[:number_of_frames] + if signal.ndim == 2 and (signal.shape[1] == 3): + new_signal = numpy.zeros((number_of_frames, 3)) + for i in range(signal.shape[1]): + new_signal[:, i] = signal[:number_of_frames, i] + signal = new_signal + else: + logger.warning("Sequence should be truncated to {}, but only contains {} => keeping original one".format(number_of_frames, signal.shape[0])) + + # also, be sure that the window_size is not bigger that the signal + if self.window_size > int(signal.shape[0] / 2): + self.window_size = int(signal.shape[0] / 2) + logger.warning("Window size reduced to {}".format(self.window_size)) + + # we have a single pulse + if signal.ndim == 1: + feature = self._get_ltss(signal) + + # pulse for the 3 color channels + if signal.ndim == 2 and (signal.shape[1] == 3): + + if not self.concat: + feature = self._get_ltss(signal[:, 1]) + else: + ltss = [] + for i in range(signal.shape[1]): + ltss.append(self._get_ltss(signal[:, i])) + feature = numpy.concatenate([ltss[0], ltss[1], ltss[2]]) + + if numpy.isnan(numpy.sum(feature)): + logger.warn("Feature not extracted") + return + if numpy.sum(feature) == 0: + logger.warn("Feature not extracted") + return + + return feature diff --git a/bob/pad/face/extractor/LiFeatures.py b/bob/pad/face/extractor/LiFeatures.py new file mode 100644 index 0000000000000000000000000000000000000000..0e25a0b04bf3cc78ba3bb34ab58b25c7a88cec8c --- /dev/null +++ b/bob/pad/face/extractor/LiFeatures.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.bio.base.extractor import Extractor + +from bob.core.log import setup +logger = setup("bob.pad.face") + + +class LiFeatures(Extractor, object): + """Compute features from pulse signals in the three color channels. + + The features are described in the following article: + + X. Li, J. Komulainen, G. Zhao, P-C Yuen and M. Pietikainen, + Generalized Face Anti-spoofing by Detecting Pulse From Face Videos + Intl Conf. on Pattern Recognition (ICPR), 2016. + + Attributes + ---------- + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + + """ + def __init__(self, framerate=25, nfft=512, debug=False, **kwargs): + """Init function + + Parameters + ---------- + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + + """ + super(LiFeatures, self).__init__() + self.framerate = framerate + self.nfft = nfft + self.debug = debug + + + def __call__(self, signal): + """Compute the frequency features for the given signal. + + Parameters + ---------- + signal : :py:class:`numpy.ndarray` + The signal + + Returns + ------- + :py:class:`numpy.ndarray` + the computed features + + """ + # sanity check + assert signal.ndim == 2 and signal.shape[1] == 3, "You should provide 3 pulse signals" + for i in range(3): + if numpy.isnan(numpy.sum(signal[:, i])): + return + + feature = numpy.zeros(6) + + # when keypoints have not been detected, the pulse is zero everywhere + # hence, no psd and no features + zero_counter = 0 + for i in range(3): + if numpy.sum(signal[:, i]) == 0: + zero_counter += 1 + if zero_counter == 3: + logger.warn("Feature is all zeros") + return feature + + # get the frequency spectrum + spectrum_dim = int((self.nfft / 2) + 1) + ffts = numpy.zeros((3, spectrum_dim)) + f = numpy.fft.fftfreq(self.nfft) * self.framerate + f = abs(f[:spectrum_dim]) + for i in range(3): + ffts[i] = abs(numpy.fft.rfft(signal[:, i], n=self.nfft)) + + # find the max of the frequency spectrum in the range of interest + first = numpy.where(f > 0.7)[0] + last = numpy.where(f < 4)[0] + first_index = first[0] + last_index = last[-1] + range_of_interest = range(first_index, last_index + 1, 1) + + # build the feature vector + for i in range(3): + total_power = numpy.sum(ffts[i, range_of_interest]) + max_power = numpy.max(ffts[i, range_of_interest]) + feature[i] = max_power + if total_power == 0: + feature[i+3] = 0 + else: + feature[i+3] = max_power / total_power + + # plot stuff, if asked for + if self.debug: + from matplotlib import pyplot + for i in range(3): + max_idx = numpy.argmax(ffts[i, range_of_interest]) + f_max = f[range_of_interest[max_idx]] + logger.debug("Inferred HR = {}".format(f_max*60)) + pyplot.plot(f, ffts[i], 'k') + xmax, xmin, ymax, ymin = pyplot.axis() + pyplot.vlines(f[range_of_interest[max_idx]], ymin, ymax, color='red') + pyplot.vlines(f[first_index], ymin, ymax, color='green') + pyplot.vlines(f[last_index], ymin, ymax, color='green') + pyplot.show() + + if numpy.isnan(numpy.sum(feature)): + logger.warn("Feature not extracted") + return + + return feature diff --git a/bob/pad/face/extractor/NormalizeLength.py b/bob/pad/face/extractor/NormalizeLength.py new file mode 100644 index 0000000000000000000000000000000000000000..21a42ff302b4b0334ff1f0850a2b16f94d490213 --- /dev/null +++ b/bob/pad/face/extractor/NormalizeLength.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.bio.base.extractor import Extractor + +import logging +logger = logging.getLogger("bob.pad.face") + + +class NormalizeLength(Extractor, object): + """ + Normalize the length of feature vectors, such that + they all have the same dimensions + + **Parameters:** + + length: int + The final length of the final feature vector + + requires_training: boolean + This extractor actually may requires "training". + The goal here is to retrieve the length of the shortest sequence + + debug: boolean + Plot stuff + """ + def __init__(self, length=-1, debug=False, requires_training=True, **kwargs): + + super(NormalizeLength, self).__init__(requires_training=requires_training, **kwargs) + + self.length = length + self.debug = debug + + def __call__(self, signal): + """ + Normalize the length of the signal + + **Parameters:** + + signal: numpy.array + The signal + + **Returns:** + + signal: numpy.array + the signal with the provided length + """ + # we have a single pulse signal + if signal.ndim == 1: + signal = signal[:self.length] + + # we have 3 pulse signal (Li's preprocessing) + # in this case, return the signal corresponding to the green channel + if signal.ndim == 2 and (signal.shape[1] == 3): + signal = signal[:self.length, 1] + + if numpy.isnan(numpy.sum(signal)): + return + + if signal.shape[0] < self.length: + logger.debug("signal shorter than training shape: {} vs {}".format(signal.shape[0], self.length)) + import sys + sys.exit() + tmp = numpy.zeros((self.length), dtype=signal.dtype) + tmp[:, signal.shape[0]] + signal = tmp + + if self.debug: + from matplotlib import pyplot + pyplot.plot(signal, 'k') + pyplot.title('Signal truncated') + pyplot.show() + + return signal + + def train(self, training_data, extractor_file): + """ + This function determines the shortest length across the training set. + It will be used to normalize the length of all the sequences. + + **Parameters:** + + training_data : [object] or [[object]] + A list of *preprocessed* data that can be used for training the extractor. + Data will be provided in a single list, if ``split_training_features_by_client = False`` was specified in the constructor, + otherwise the data will be split into lists, each of which contains the data of a single (training-)client. + + extractor_file : str + The file to write. + This file should be readable with the :py:meth:`load` function. + """ + self.length = 100000 + for i in range(len(training_data)): + if training_data[i].shape[0] < self.length: + self.length = training_data[i].shape[0] + logger.info("Signals will be truncated to {} dimensions".format(self.length)) diff --git a/bob/pad/face/extractor/PPGSecure.py b/bob/pad/face/extractor/PPGSecure.py new file mode 100644 index 0000000000000000000000000000000000000000..1865ec69967f4fed4cff73289d153f3258317d57 --- /dev/null +++ b/bob/pad/face/extractor/PPGSecure.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.bio.base.extractor import Extractor + +from bob.core.log import setup +logger = setup("bob.pad.face") + + +class PPGSecure(Extractor, object): + """Extract frequency spectra from pulse signals. + + The feature are extracted according to what is described in + the following article: + + E.M Nowara, A. Sabharwal and A. Veeraraghavan, + "PPGSecure: Biometric Presentation Attack Detection using Photoplethysmograms", + IEEE Intl Conf. on Automatic Face and Gesture Recognition, 2017. + + Attributes + ---------- + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + + """ + def __init__(self, framerate=25, nfft=32, debug=False, **kwargs): + """Init function + + Parameters + ---------- + framerate : :obj:`int` + The sampling frequency of the signal (i.e the framerate ...) + nfft : :obj:`int` + Number of points to compute the FFT + debug : :obj:`bool` + Plot stuff + + """ + super(PPGSecure, self).__init__(**kwargs) + self.framerate = framerate + self.nfft = nfft + self.debug = debug + + + def __call__(self, signal): + """Compute and concatenate frequency spectra for the given signals. + + Parameters + ---------- + signal : :py:class:`numpy.ndarray` + The signal + + Returns + ------- + :py:class:`numpy.ndarray` + the computed FFT features + + """ + # sanity check + assert signal.shape[1] == 5, "You should provide 5 pulses" + if numpy.isnan(numpy.sum(signal)): + return + + output_dim = int((self.nfft / 2) + 1) + + # get the frequencies + f = numpy.fft.fftfreq(self.nfft) * self.framerate + + # we have 5x3 pulse signals, in different regions across 3 channels + ffts = numpy.zeros((5, output_dim)) + for i in range(5): + ffts[i] = abs(numpy.fft.rfft(signal[:, i], n=self.nfft)) + + fft = numpy.concatenate([ffts[0], ffts[1], ffts[2], ffts[3], ffts[4]]) + + if self.debug: + from matplotlib import pyplot + pyplot.plot(range(output_dim*5), fft, 'k') + pyplot.title('Concatenation of spectra') + pyplot.show() + + if numpy.isnan(numpy.sum(fft)): + logger.warn("Feature not extracted") + return + if numpy.sum(fft) == 0: + logger.warn("Feature not extracted") + return + + return fft diff --git a/bob/pad/face/extractor/__init__.py b/bob/pad/face/extractor/__init__.py index a7b3d4ca18607b0e82b925402964e8ecf265adf3..452450f4c8e12a7d295c201e378a35fdac9ef4d0 100644 --- a/bob/pad/face/extractor/__init__.py +++ b/bob/pad/face/extractor/__init__.py @@ -2,6 +2,9 @@ from .LBPHistogram import LBPHistogram from .ImageQualityMeasure import ImageQualityMeasure from .FrameDiffFeatures import FrameDiffFeatures +from .LiFeatures import LiFeatures +from .LTSS import LTSS +from .PPGSecure import PPGSecure def __appropriate__(*args): """Says object was actually declared here, and not in the import module. diff --git a/bob/pad/face/preprocessor/Chrom.py b/bob/pad/face/preprocessor/Chrom.py new file mode 100644 index 0000000000000000000000000000000000000000..987492ea9c1d5423c4403aa7e6bc06f48def87be --- /dev/null +++ b/bob/pad/face/preprocessor/Chrom.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.core.log import setup +logger = setup("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +import bob.ip.facedetect +import bob.ip.skincolorfilter + +from bob.rppg.base.utils import crop_face +from bob.rppg.base.utils import build_bandpass_filter + +from bob.rppg.chrom.extract_utils import compute_mean_rgb +from bob.rppg.chrom.extract_utils import project_chrominance +from bob.rppg.chrom.extract_utils import compute_gray_diff +from bob.rppg.chrom.extract_utils import select_stable_frames + + +class Chrom(Preprocessor, object): + """Extract pulse signal from a video sequence. + + The pulse is extracted according to the CHROM algorithm. + + See the documentation of `bob.rppg.base` + + Attributes + ---------- + skin_threshold : :obj:`float` + The threshold for skin color probability + skin_init : :obj:`bool` + If you want to re-initailize the skin color distribution at each frame + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + window_size : :obj:`int` + The size of the window in the overlap-add procedure. + motion : :obj:`float` + The percentage of frames you want to select where the + signal is "stable". 0 mean all the sequence. + debug : :obj:`bool` + Plot some stuff + skin_filter : :py:class:`bob.ip.skincolorfilter.SkinColorFilter` + The skin color filter + + """ + + def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, debug=False, **kwargs): + """Init function + + Parameters + ---------- + skin_threshold : :obj:`float` + The threshold for skin color probability + skin_init : :obj:`bool` + If you want to re-initailize the skin color distribution at each frame + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + window_size : :obj:`int` + The size of the window in the overlap-add procedure. + motion : :obj:`float` + The percentage of frames you want to select where the + signal is "stable". 0 mean all the sequence. + debug : :obj:`bool` + Plot some stuff + + """ + super(Chrom, self).__init__() + self.skin_threshold = skin_threshold + self.skin_init = skin_init + self.framerate = framerate + self.bp_order = bp_order + self.window_size = window_size + self.motion = motion + self.debug = debug + self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter() + + def __call__(self, frames, annotations): + """Computes the pulse signal for the given frame sequence + + Parameters + ---------- + frames : :py:class:`bob.bio.video.utils.FrameContainer` + video data + annotations : :py:class:`dict` + the face bounding box, as follows: ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + Returns + ------- + :obj:`numpy.ndarray` + The pulse signal + + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the pulse + chrom = numpy.zeros((nb_frames, 2), dtype='float64') + + # build the bandpass filter + bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, plot=False) + + counter = 0 + previous_bbox = None + for i, frame in enumerate(video): + + logger.debug("Processing frame {}/{}".format(counter, nb_frames)) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # get the face + try: + topleft = annotations[str(i)]['topleft'] + bottomright = annotations[str(i)]['bottomright'] + size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1]) + bbox = bob.ip.facedetect.BoundingBox(topleft, size) + face = crop_face(frame, bbox, bbox.size[1]) + except (KeyError, ZeroDivisionError, TypeError) as e: + logger.warning("No annotations ... running face detection") + try: + bbox, quality = bob.ip.facedetect.detect_single_face(frame) + face = crop_face(frame, bbox, bbox.size[1]) + except: + bbox = previous_bbox + face = crop_face(frame, bbox, bbox.size[1]) + logger.warning("No detection, using bounding box from previous frame ...") + + # motion difference (if asked for) + if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0): + current = crop_face(frame, bbox, bbox.size[1]) + diff_motion[counter-1] = compute_gray_diff(face, current) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2)) + pyplot.show() + + # skin filter + if counter == 0 or self.skin_init: + self.skin_filter.estimate_gaussian_parameters(face) + logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance)) + skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold) + + if self.debug: + from matplotlib import pyplot + skin_mask_image = numpy.copy(face) + skin_mask_image[:, skin_mask] = 255 + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2)) + pyplot.show() + + # sometimes skin is not detected ! + if numpy.count_nonzero(skin_mask) != 0: + + # compute the mean rgb values of the skin pixels + r,g,b = compute_mean_rgb(face, skin_mask) + logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b)) + + # project onto the chrominance colorspace + chrom[counter] = project_chrominance(r, g, b) + logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1])) + + else: + logger.warn("No skin pixels detected in frame {0}, using previous value".format(i)) + # very unlikely, but it could happened and messed up all experiments (averaging of scores ...) + if counter == 0: + chrom[counter] = project_chrominance(128., 128., 128.) + else: + chrom[counter] = chrom[counter-1] + + + # keep the result of the last detection in case you cannot find a face in the next frame + previous_bbox = bbox + counter +=1 + + # select the most stable number of consecutive frames, if asked for + if self.motion > 0.0: + n_stable_frames_to_keep = int(self.motion * nb_frames) + logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep)) + index = select_stable_frames(diff_motion, n_stable_frames_to_keep) + logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep)) + chrom = chrom[index:(index + n_stable_frames_to_keep),:] + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(2, sharex=True) + axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k') + axarr[0].set_title("X value in the chrominance subspace") + axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k') + axarr[1].set_title("Y value in the chrominance subspace") + pyplot.show() + + # now that we have the chrominance signals, apply bandpass + from scipy.signal import filtfilt + x_bandpassed = numpy.zeros(nb_frames, dtype='float64') + y_bandpassed = numpy.zeros(nb_frames, dtype='float64') + x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0]) + y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1]) + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(2, sharex=True) + axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k') + axarr[0].set_title("X bandpassed") + axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k') + axarr[1].set_title("Y bandpassed") + pyplot.show() + + # build the final pulse signal + alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed) + pulse = x_bandpassed - alpha * y_bandpassed + + # overlap-add if window_size != 0 + if self.window_size > 0: + window_stride = self.window_size / 2 + for w in range(0, (len(pulse)-window_size), window_stride): + pulse[w:w+window_size] = 0.0 + xw = x_bandpassed[w:w+window_size] + yw = y_bandpassed[w:w+window_size] + alpha = numpy.std(xw) / numpy.std(yw) + sw = xw - alpha * yw + sw *= numpy.hanning(window_size) + pulse[w:w+window_size] += sw + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(1) + pyplot.plot(range(pulse.shape[0]), pulse, 'k') + pyplot.title("Pulse signal") + pyplot.show() + + return pulse diff --git a/bob/pad/face/preprocessor/Li.py b/bob/pad/face/preprocessor/Li.py new file mode 100644 index 0000000000000000000000000000000000000000..5f54a1bb5e14b7a690262bb5a4751b29028d6518 --- /dev/null +++ b/bob/pad/face/preprocessor/Li.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.core.log import setup +logger = setup("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +from bob.rppg.base.utils import build_bandpass_filter +import bob.ip.dlib + +from bob.rppg.cvpr14.extract_utils import kp66_to_mask +from bob.rppg.cvpr14.extract_utils import compute_average_colors_mask +from bob.rppg.cvpr14.filter_utils import detrend +from bob.rppg.cvpr14.filter_utils import average + + +class Li(Preprocessor): + """Extract pulse signal from a video sequence. + + The pulse is extracted according to Li's CVPR 14 algorithm. + + See the documentation of `bob.rppg.base` + + Note that this is a simplified version of the original + pulse extraction algorithms (mask detection in each + frame instead of tracking, no illumination correction, + no motion pruning) + + Attributes + ---------- + indent : :obj:`int` + Indent (in percent of the face width) to apply to keypoints to get the mask. + lamda_ : :obj:`int` + the lamba value of the detrend filter + window : :obj:`int` + The size of the window of the average filter + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + debug : :obj:`bool` + Plot some stuff + + """ + + def __init__(self, indent = 10, lambda_ = 300, window = 3, framerate = 25, bp_order = 32, debug=False, **kwargs): + """Init function + + Parameters + ---------- + indent : :obj:`int` + Indent (in percent of the face width) to apply to keypoints to get the mask. + lamda_ : :obj:`int` + the lamba value of the detrend filter + window : :obj:`int` + The size of the window of the average filter + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + debug : :obj:`bool` + Plot some stuff + + """ + super(Li, self).__init__(**kwargs) + self.indent = indent + self.lambda_ = lambda_ + self.window = window + self.framerate = framerate + self.bp_order = bp_order + self.debug = debug + + def __call__(self, frames, annotations=None): + """Computes the pulse signal for the given frame sequence + + Parameters + ---------- + frames: :py:class:`bob.bio.video.utils.FrameContainer` + video data + annotations: :py:class:`dict` + the face bounding box, as follows: ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + Returns + ------- + :obj:`numpy.ndarray` + The pulse signal, in each color channel (RGB) + + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the mean color of the face along the sequence + face_color = numpy.zeros((nb_frames, 3), dtype='float64') + + # build the bandpass filter + bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, plot=False) + + # landmarks detection + detector = bob.ip.dlib.DlibLandmarkExtraction() + + counter = 0 + previous_ldms = None + for i, frame in enumerate(video): + + logger.debug("Processing frame {}/{}".format(counter, nb_frames)) + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # detect landmarks + try: + ldms = detector(frame) + except TypeError: + logger.warning("Exception caught -> problems with landmarks") + # looks like one video from replay mobile is upside down ! + rotated_shape = bob.ip.base.rotated_output_shape(frame, 180) + frame_rotated = numpy.ndarray(rotated_shape, dtype=numpy.float64) + from bob.ip.base import rotate + bob.ip.base.rotate(frame, frame_rotated, 180) + frame_rotated = frame_rotated.astype(numpy.uint8) + logger.warning("Rotating again ...") + try: + ldms = detector(frame_rotated) + except TypeError: + ldms = previous_ldms + # so do nothing ... + logger.warning("No mask detected in frame {}".format(i)) + face_color[i] = 0 + continue + frame = frame_rotated + + # landmarks have not been detected: use the one from previous frame + if ldms is None: + ldms = previous_ldms + logger.warning("Frame {}: no landmarks detected, using the ones from previous frame".format(i)) + + if self.debug: + from matplotlib import pyplot + display = numpy.copy(frame) + for p in ldms: + bob.ip.draw.plus(display, p, radius=5, color=(255, 0, 0)) + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(display, 2),2)) + pyplot.show() + + ldms = numpy.array(ldms) + mask_points, mask = kp66_to_mask(frame, ldms, self.indent, self.debug) + face_color[i] = compute_average_colors_mask(frame, mask, self.debug) + + previous_ldms = ldms + counter += 1 + + pulse = numpy.zeros((nb_frames, 3), dtype='float64') + for i in range(3): + # detrend + detrended = detrend(face_color[:, i], self.lambda_) + # average + averaged = average(detrended, self.window) + # bandpass + from scipy.signal import filtfilt + pulse[:, i] = filtfilt(bandpass_filter, numpy.array([1]), averaged) + + if self.debug: + colors = ['r', 'g', 'b'] + from matplotlib import pyplot + for i in range(3): + f, ax = pyplot.subplots(2, sharex=True) + ax[0].plot(range(face_color.shape[0]), face_color[:, i], colors[i]) + ax[0].set_title('Original color signal') + ax[1].plot(range(face_color.shape[0]), pulse[:, i], colors[i]) + ax[1].set_title('Pulse signal') + pyplot.show() + + return pulse diff --git a/bob/pad/face/preprocessor/PPGSecure.py b/bob/pad/face/preprocessor/PPGSecure.py new file mode 100644 index 0000000000000000000000000000000000000000..36b201d1554368f9911a27f96776d88a067015b0 --- /dev/null +++ b/bob/pad/face/preprocessor/PPGSecure.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.core.log import setup +logger = setup("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +from bob.rppg.base.utils import build_bandpass_filter +import bob.ip.dlib + +from bob.rppg.cvpr14.extract_utils import get_mask +from bob.rppg.cvpr14.extract_utils import compute_average_colors_mask + + +class PPGSecure(Preprocessor): + """ + This class extract the pulse signal from a video sequence. + + The pulse is extracted according to what is described in + the following article: + + E.M Nowara, A. Sabharwal and A. Veeraraghavan, + "PPGSecure: Biometric Presentation Attack Detection using Photoplethysmograms", + IEEE Intl Conf. on Automatic Face and Gesture Recognition, 2017. + + Attributes + ---------- + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + debug : :obj:`bool` + Plot some stuff + + """ + def __init__(self, framerate=25, bp_order=32, debug=False, **kwargs): + """Init function + + Parameters + ---------- + framerate : :obj:`int` + The framerate of the video sequence. + bp_order : :obj:`int` + The order of the bandpass filter + debug : :obj:`bool` + Plot some stuff + + """ + super(PPGSecure, self).__init__(**kwargs) + self.framerate = framerate + self.bp_order = bp_order + self.debug = debug + + # build the bandpass filter + self.bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, min_freq=0.5, max_freq=5, plot=False) + + # landmarks detector + self.detector = bob.ip.dlib.DlibLandmarkExtraction() + + def __call__(self, frames, annotations): + """Compute the pulse signal for the given frame sequence + + Parameters + ---------- + frames: :py:class:`bob.bio.video.utils.FrameContainer` + video data + annotations: :py:class:`dict` + the face bounding box, as follows: ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + Returns + ------- + :obj:`numpy.ndarray` + The pulse signal, in each color channel (RGB) + + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the mean of the green color of the different ROIs along the sequence + green_mean = numpy.zeros((nb_frames, 5), dtype='float64') + + previous_ldms = None + for i, frame in enumerate(video): + + logger.debug("Processing frame {}/{}".format(i, nb_frames)) + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # detect landmarks + try: + ldms = self.detector(frame) + except TypeError: + # looks like one video from replay mobile is upside down ! + rotated_shape = bob.ip.base.rotated_output_shape(frame, 180) + frame_rotated = numpy.ndarray(rotated_shape, dtype=numpy.float64) + from bob.ip.base import rotate + bob.ip.base.rotate(frame, frame_rotated, 180) + frame_rotated = frame_rotated.astype(numpy.uint8) + logger.warning("Rotating again ...") + try: + ldms = self.detector(frame_rotated) + except TypeError: + ldms = previous_ldms + # so do nothing ... + logger.warning("No mask detected in frame {}".format(i)) + green_mean[i, :] = 0 + continue + frame = frame_rotated + + # landmarks have not been detected: use the one from previous frame + if ldms is None: + ldms = previous_ldms + logger.warning("Frame {}: no landmarks detected, using the ones from previous frame".format(i)) + + if self.debug: + from matplotlib import pyplot + display = numpy.copy(frame) + for p in ldms: + bob.ip.draw.plus(display, p, radius=5, color=(255, 0, 0)) + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(display, 2),2)) + pyplot.show() + + ldms = numpy.array(ldms) + + # get the mask and the green value in the different ROIs + masks = self._get_masks(frame, ldms) + for k in range(5): + green_mean[i, k] = compute_average_colors_mask(frame, masks[k], self.debug)[1] + + previous_ldms = ldms + + pulses = numpy.zeros((nb_frames, 5), dtype='float64') + for k in range(5): + # substract mean + green_mean[:, k] = green_mean[:, k] - numpy.mean(green_mean[:, k]) + # bandpass + from scipy.signal import filtfilt + pulses[:, k] = filtfilt(self.bandpass_filter, numpy.array([1]), green_mean[:, k]) + + if self.debug: + from matplotlib import pyplot + for k in range(5): + f, ax = pyplot.subplots(2, sharex=True) + ax[0].plot(range(green_mean.shape[0]), green_mean[:, k], 'g') + ax[0].set_title('Original color signal') + ax[1].plot(range(green_mean.shape[0]), pulses[:, k], 'g') + ax[1].set_title('Pulse signal') + pyplot.show() + + return pulses + + + def _get_masks(self, image, ldms): + """Get the 5 masks for rPPG signal extraction + + Parameters + ---------- + ldms: numpy.ndarray + The landmarks, as retrieved by bob.ip.dlib.DlibLandmarkExtraction() + + Returns + ------- + masks: :py:obj:`list` of numpy.ndarray + A list containing the different mask as a boolean array + + """ + masks = [] + + # mask 1: forehead + # defined by 12 points: upper eyebrows (points 18 to 27) + # plus two additional points: + # - above 18, at a distance of (18-27)/2 + # - above 27, at a distance of (18-27)/2 + # + # Note 0 -> y, 1 -> x + mask_points = [] + for k in range(17, 27): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + above_20_x = int(ldms[19, 1]) + above_20_y = int(ldms[19, 0]) - int(abs(ldms[17, 1] - ldms[26, 1]) / 3) + above_25_x = int(ldms[24, 1]) + above_25_y = int(ldms[24, 0]) - int(abs(ldms[17, 1] - ldms[26, 1]) / 3) + mask_points.append([above_25_y, above_25_x]) + mask_points.append([above_20_y, above_20_x]) + masks.append(get_mask(image, mask_points)) + + # mask 2: right cheek (left-hand side when looking at the screen) + # defined by points 1-7 + 49 + 32 + 42 + mask_points = [] + for k in range(7): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + mask_points.append([int(ldms[48, 0]), int(ldms[48, 1])]) + mask_points.append([int(ldms[31, 0]), int(ldms[31, 1])]) + mask_points.append([int(ldms[41, 0]), int(ldms[41, 1])]) + masks.append(get_mask(image, mask_points)) + + # mask 3: left cheek + # defined by points 17-11 + 55 + 36 + 47 + mask_points = [] + for k in range(16, 10, -1): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + mask_points.append([int(ldms[54, 0]), int(ldms[54, 1])]) + mask_points.append([int(ldms[35, 0]), int(ldms[35, 1])]) + mask_points.append([int(ldms[46, 0]), int(ldms[46, 1])]) + masks.append(get_mask(image, mask_points)) + + # mask 4: right above shoulder + mask_points = [] + mask_points.append([int(ldms[2, 0]), int(ldms[2, 1] - 10)]) + mask_points.append([int(ldms[2, 0]), int(ldms[2, 1] - 60)]) + mask_points.append([int(ldms[2, 0] + 50), int(ldms[2, 1] - 60)]) + mask_points.append([int(ldms[2, 0] + 50), int(ldms[2, 1] - 10)]) + masks.append(get_mask(image, mask_points)) + + # mask 5: left above shoulder + mask_points = [] + mask_points.append([int(ldms[14, 0]), int(ldms[14, 1] + 10)]) + mask_points.append([int(ldms[14, 0]), int(ldms[14, 1] + 60)]) + mask_points.append([int(ldms[14, 0] + 50), int(ldms[14, 1] + 60)]) + mask_points.append([int(ldms[14, 0] + 50), int(ldms[14, 1] + 10)]) + masks.append(get_mask(image, mask_points)) + + return masks diff --git a/bob/pad/face/preprocessor/SSR.py b/bob/pad/face/preprocessor/SSR.py new file mode 100644 index 0000000000000000000000000000000000000000..4c5f702acb4299d1a01cb73b05d3311aa2d51ac7 --- /dev/null +++ b/bob/pad/face/preprocessor/SSR.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.core.log import setup +logger = setup("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +import bob.ip.facedetect +import bob.ip.skincolorfilter + +from bob.rppg.base.utils import crop_face + +from bob.rppg.ssr.ssr_utils import get_eigen +from bob.rppg.ssr.ssr_utils import plot_eigenvectors +from bob.rppg.ssr.ssr_utils import build_P + + +class SSR(Preprocessor, object): + """Extract pulse signal from a video sequence. + + The pulse is extracted according to the SSR algorithm. + + See the documentation of :py:module::`bob.rppg.base` + + Attributes + ---------- + skin_threshold: float + The threshold for skin color probability + skin_init: bool + If you want to re-initailize the skin color distribution at each frame + stride: int + The temporal stride. + debug: boolean + Plot some stuff + skin_filter: :py:class::`bob.ip.skincolorfilter.SkinColorFilter` + The skin color filter + + """ + + def __init__(self, skin_threshold=0.5, skin_init=False, stride=25, debug=False, **kwargs): + """Init function + + Parameters + ---------- + skin_threshold: float + The threshold for skin color probability + skin_init: bool + If you want to re-initailize the skin color distribution at each frame + stride: int + The temporal stride. + debug: boolean + Plot some stuff + + """ + super(SSR, self).__init__() + self.skin_threshold = skin_threshold + self.skin_init = skin_init + self.stride = stride + self.debug = debug + self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter() + + + def __call__(self, frames, annotations): + """Computes the pulse signal for the given frame sequence + + Parameters + ---------- + frames: :py:class:`bob.bio.video.utils.FrameContainer` + video data + annotations: :py:class:`dict` + the face bounding box, as follows: ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + Returns + ------- + pulse: numpy.ndarray + The pulse signal + + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the result -> the pulse signal + output_data = numpy.zeros(nb_frames, dtype='float64') + + # store the eigenvalues and the eigenvectors at each frame + eigenvalues = numpy.zeros((3, nb_frames), dtype='float64') + eigenvectors = numpy.zeros((3, 3, nb_frames), dtype='float64') + + counter = 0 + previous_bbox = None + previous_skin_pixels = None + + for i, frame in enumerate(video): + + logger.debug("Processing frame %d/%d...", i, nb_frames) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # get the face + try: + topleft = annotations[str(i)]['topleft'] + bottomright = annotations[str(i)]['bottomright'] + size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1]) + bbox = bob.ip.facedetect.BoundingBox(topleft, size) + face = crop_face(frame, bbox, bbox.size[1]) + except (KeyError, ZeroDivisionError, TypeError) as e: + logger.warning("No annotations ... running face detection") + try: + bbox, quality = bob.ip.facedetect.detect_single_face(frame) + face = crop_face(frame, bbox, bbox.size[1]) + except: + bbox = previous_bbox + face = crop_face(frame, bbox, bbox.size[1]) + logger.warning("No detection, using bounding box from previous frame ...") + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2)) + pyplot.show() + + # skin filter + if counter == 0 or self.skin_init: + self.skin_filter.estimate_gaussian_parameters(face) + logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance)) + + skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold) + skin_pixels = face[:, skin_mask] + skin_pixels = skin_pixels.astype('float64') / 255.0 + + if self.debug: + from matplotlib import pyplot + skin_mask_image = numpy.copy(face) + skin_mask_image[:, skin_mask] = 255 + pyplot.title("skin pixels in frame {0}".format(i)) + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2)) + pyplot.show() + + # no skin pixels have ben detected ... using the previous ones + if skin_pixels.shape[1] == 0: + skin_pixels = previous_skin_pixels + logger.warn("No skin pixels detected, using the previous ones") + + # build c matrix and get eigenvectors and eigenvalues + eigenvalues[:, counter], eigenvectors[:, :, counter] = get_eigen(skin_pixels) + + if self.debug: + plot_eigenvectors(skin_pixels, eigenvectors[:, :, counter]) + + # build P and add it to the pulse signal + if counter >= self.stride: + tau = counter - self.stride + p = build_P(counter, self.stride, eigenvectors, eigenvalues) + output_data[tau:counter] += (p - numpy.mean(p)) + + previous_bbox = bbox + previous_skin_pixels = skin_pixels + counter += 1 + + if self.debug: + import matplotlib.pyplot as plt + fig = plt.figure() + ax = fig.add_subplot(111) + ax.plot(range(nb_frames), output_data) + plt.show() + + return output_data diff --git a/bob/pad/face/preprocessor/__init__.py b/bob/pad/face/preprocessor/__init__.py index 5f43d3e2fcdacb09d1feed125c727d2c0a9f7234..651b372cf42308e68357b884b809cedb522e8703 100644 --- a/bob/pad/face/preprocessor/__init__.py +++ b/bob/pad/face/preprocessor/__init__.py @@ -2,6 +2,10 @@ from .FaceCropAlign import FaceCropAlign from .FrameDifference import FrameDifference from .VideoSparseCoding import VideoSparseCoding +from .Li import Li +from .Chrom import Chrom +from .SSR import SSR +from .PPGSecure import PPGSecure def __appropriate__(*args): """Says object was actually declared here, and not in the import module. diff --git a/conda/meta.yaml b/conda/meta.yaml index 7918a1a946d2574713733593aa86d230dbcca183..39c2553363bbbcb2382c2f504269e8957138b210 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -20,6 +20,8 @@ requirements: host: - python {{ python }} - setuptools {{ setuptools }} + - six {{ six }} + - numpy >=1.11 - bob.extension - bob.bio.base - bob.io.base @@ -33,12 +35,14 @@ requirements: - bob.learn.libsvm - bob.ip.dlib - bob.ip.facelandmarks + - bob.rppg.base run: - python - setuptools - six - - numpy + - numpy >=1.11 - scikit-learn + - bob.rppg.base test: imports: @@ -60,6 +64,7 @@ test: - bob.db.replaymobile - bob.db.msu_mfsd_mod - bob.db.mobio + - bob.rppg.base about: home: https://www.idiap.ch/software/bob/ diff --git a/doc/extra-intersphinx.txt b/doc/extra-intersphinx.txt index 676c4b7fd7e64d84246b0eaeabdf3fae5a8a095f..9c34975adc4e620d2baeb39c9414edb1656d81c3 100644 --- a/doc/extra-intersphinx.txt +++ b/doc/extra-intersphinx.txt @@ -1 +1,2 @@ bob.bio.base +bob.ip.skincolorfilter diff --git a/requirements.txt b/requirements.txt index afa566eb706bc577fc6fc313a3fa4397a762677e..ea78408c78857284bdab6cef684c56b659add5e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,4 @@ bob.ip.facelandmarks bob.learn.libsvm bob.learn.linear scikit-learn - +bob.rppg.base