diff --git a/bob/pad/face/extractor/PPGSecure.py b/bob/pad/face/extractor/PPGSecure.py new file mode 100644 index 0000000000000000000000000000000000000000..09db66e7dc810f1988f87b874ec2d5d488d5f24c --- /dev/null +++ b/bob/pad/face/extractor/PPGSecure.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# encoding: utf-8 + +import numpy + +from bob.bio.base.extractor import Extractor + +import logging +logger = logging.getLogger("bob.pad.face") + + +class PPGSecure(Extractor, object): + """ + This class extract the frequency features from pulse signals. + + The feature are extracted according to what is described in + the following article: + + @InProceedings{nowara-afgr-2017, + Author = {E. M. Nowara and A. Sabharwal and A. Veeraraghavan}, + Title = {P{PGS}ecure: {B}iometric {P}resentation {A}ttack + {D}etection {U}sing {P}hotopletysmograms}, + BookTitle = {I{EEE} {I}ntl {C}onf on {A}utomatic {F}ace and + {G}esture {R}ecognition ({AFGR})}, + Volume = {}, + Number = {}, + Pages = {56-62}, + issn = {}, + seq-number = {69}, + year = 2017 + } + + **Parameters:** + + framerate: int + The sampling frequency of the signal (i.e the framerate ...) + + nfft: int + Number of points to compute the FFT + + debug: boolean + Plot stuff + """ + def __init__(self, framerate=25, nfft=32, debug=False, **kwargs): + + super(PPGSecure, self).__init__(**kwargs) + + self.framerate = framerate + self.nfft = nfft + self.debug = debug + + + def __call__(self, signal): + """ + Compute the frequency spectrum for the given signal. + + **Parameters:** + + signal: numpy.array + The signal + + **Returns:** + + freq: numpy.array + the frequency spectrum + """ + # sanity check + assert signal.shape[1] == 5, "You should provide 5 pulses" + if numpy.isnan(numpy.sum(signal)): + return + + output_dim = int((self.nfft / 2) + 1) + + # get the frequencies + f = numpy.fft.fftfreq(self.nfft) * self.framerate + + # we have 5 pulse signal (Li's preprocessing) + ffts = numpy.zeros((5, output_dim)) + for i in range(5): + ffts[i] = abs(numpy.fft.rfft(signal[:, i], n=self.nfft)) + + fft = numpy.concatenate([ffts[0], ffts[1], ffts[2], ffts[3], ffts[4]]) + + if self.debug: + from matplotlib import pyplot + pyplot.plot(range(output_dim*5), fft, 'k') + pyplot.title('Concatenation of spectra') + pyplot.show() + + if numpy.isnan(numpy.sum(fft)): + logger.warn("Feature not extracted") + return + if numpy.sum(fft) == 0: + logger.warn("Feature not extracted") + return + + + return fft diff --git a/bob/pad/face/extractor/__init__.py b/bob/pad/face/extractor/__init__.py index 2b046e0061ee7fe7a437a162ddcc25db71fcff6c..fabde901e00d34fe73435f707873eb336d010c18 100644 --- a/bob/pad/face/extractor/__init__.py +++ b/bob/pad/face/extractor/__init__.py @@ -10,6 +10,7 @@ from .FreqFeatures import FreqFeatures from .NormalizeLength import NormalizeLength from .FFTFeatures import FFTFeatures from .LTSS import LTSS +from .PPGSecure import PPGSecure def __appropriate__(*args): """Says object was actually declared here, and not in the import module. diff --git a/bob/pad/face/preprocessor/PPGSecure.py b/bob/pad/face/preprocessor/PPGSecure.py index cd62465a91cb657567bc43fd17cfcd135190ca82..868b909ec7f963746204f7ddca21ce35da1baf03 100644 --- a/bob/pad/face/preprocessor/PPGSecure.py +++ b/bob/pad/face/preprocessor/PPGSecure.py @@ -35,9 +35,6 @@ class PPGSecure(Preprocessor): **Parameters:** - indent: int - Indent (in pixels) to apply to keypoints to get the masks. - framerate: int The framerate of the video sequence. @@ -47,14 +44,19 @@ class PPGSecure(Preprocessor): debug: boolean Plot some stuff """ - def __init__(self, indent = 10, framerate = 25, bp_order = 32, debug=False, **kwargs): + def __init__(self, framerate=25, bp_order=32, debug=False, **kwargs): super(PPGSecure, self).__init__(**kwargs) - self.indent = indent self.framerate = framerate self.bp_order = bp_order self.debug = debug + + # build the bandpass filter one and for all + self.bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, min_freq=0.5, max_freq=5, plot=False) + + # landmarks detection + self.detector = bob.ip.dlib.DlibLandmarkExtraction() def __call__(self, frames, annotations): """ @@ -81,17 +83,10 @@ class PPGSecure(Preprocessor): # the mean of the green color of the different ROIs along the sequence green_mean = numpy.zeros((nb_frames, 5), dtype='float64') - # build the bandpass filter one and for all - bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, min_freq=0.5, max_freq=5, plot=False) - - # landmarks detection - detector = bob.ip.dlib.DlibLandmarkExtraction() - - counter = 0 previous_ldms = None for i, frame in enumerate(video): - logger.debug("Processing frame {}/{}".format(counter, nb_frames)) + logger.debug("Processing frame {}/{}".format(i, nb_frames)) if self.debug: from matplotlib import pyplot pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) @@ -99,7 +94,7 @@ class PPGSecure(Preprocessor): # detect landmarks try: - ldms = detector(frame) + ldms = self.detector(frame) except TypeError: # looks like one video from replay mobile is upside down ! rotated_shape = bob.ip.base.rotated_output_shape(frame, 180) @@ -109,12 +104,12 @@ class PPGSecure(Preprocessor): frame_rotated = frame_rotated.astype(numpy.uint8) logger.warning("Rotating again ...") try: - ldms = detector(frame_rotated) + ldms = self.detector(frame_rotated) except TypeError: ldms = previous_ldms # so do nothing ... logger.warning("No mask detected in frame {}".format(i)) - face_color[i] = 0 + green_mean[i, :] = 0 continue frame = frame_rotated @@ -128,38 +123,35 @@ class PPGSecure(Preprocessor): ldms = numpy.array(ldms) - masks = self._get_masks(ldms) - import sys - sys.exit() - #for i in range(5): - # face_color[i] = compute_average_colors_mask(frame, mask, self.debug) - - #previous_ldms = ldms - #counter += 1 - - #pulse = numpy.zeros((nb_frames, 3), dtype='float64') - #for i in range(3): - # # detrend - # detrended = detrend(face_color[:, i], self.lambda_) - # # average - # averaged = average(detrended, self.window) - # # bandpass - # from scipy.signal import filtfilt - # pulse[:, i] = filtfilt(bandpass_filter, numpy.array([1]), averaged) - - #if self.debug: - # from matplotlib import pyplot - # for i in range(3): - # f, ax = pyplot.subplots(2, sharex=True) - # ax[0].plot(range(face_color.shape[0]), face_color[:, i], 'g') - # ax[0].set_title('Original color signal') - # ax[1].plot(range(face_color.shape[0]), pulse[:, i], 'g') - # ax[1].set_title('Pulse signal') - # pyplot.show() - - #return pulse - - def _get_masks(ldms): + # get the mask and the green value in the different ROIs + masks = self._get_masks(frame, ldms) + for k in range(5): + green_mean[i, k] = compute_average_colors_mask(frame, masks[k], self.debug)[1] + + previous_ldms = ldms + + pulses = numpy.zeros((nb_frames, 5), dtype='float64') + for k in range(5): + # substract mean + green_mean[:, k] = green_mean[:, k] - numpy.mean(green_mean[:, k]) + # bandpass + from scipy.signal import filtfilt + pulses[:, k] = filtfilt(self.bandpass_filter, numpy.array([1]), green_mean[:, k]) + + if self.debug: + from matplotlib import pyplot + for k in range(5): + f, ax = pyplot.subplots(2, sharex=True) + ax[0].plot(range(green_mean.shape[0]), green_mean[:, k], 'g') + ax[0].set_title('Original color signal') + ax[1].plot(range(green_mean.shape[0]), pulses[:, k], 'g') + ax[1].set_title('Pulse signal') + pyplot.show() + + return pulses + + + def _get_masks(self, image, ldms): """ get the 5 masks for rPPG signal extraction **Parameters** @@ -171,16 +163,62 @@ class PPGSecure(Preprocessor): masks: boolean """ + masks = [] # mask 1: forehead # defined by 12 points: upper eyebrows (points 18 to 27) # plus two additional points: # - above 18, at a distance of (18-27)/2 # - above 27, at a distance of (18-27)/2 - print(ldms) - print(ldms.shape) + # + # Note 0 -> y, 1 -> x + mask_points = [] + for k in range(17, 27): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + above_20_x = int(ldms[19, 1]) + above_20_y = int(ldms[19, 0]) - int(abs(ldms[17, 1] - ldms[26, 1]) / 3) + above_25_x = int(ldms[24, 1]) + above_25_y = int(ldms[24, 0]) - int(abs(ldms[17, 1] - ldms[26, 1]) / 3) + mask_points.append([above_25_y, above_25_x]) + mask_points.append([above_20_y, above_20_x]) + masks.append(get_mask(image, mask_points)) + + # mask 2: right cheek (left-hand side when looking at the screen) + # defined by points 1-7 + 49 + 32 + 42 mask_points = [] - for i in range(17, 28): - mask_points.append([int(keypoints[k, 0]), int(keypoints[k, 1])])) + for k in range(7): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + mask_points.append([int(ldms[48, 0]), int(ldms[48, 1])]) + mask_points.append([int(ldms[31, 0]), int(ldms[31, 1])]) + mask_points.append([int(ldms[41, 0]), int(ldms[41, 1])]) + masks.append(get_mask(image, mask_points)) + + # mask 3: left cheek + # defined by points 17-11 + 55 + 36 + 47 + mask_points = [] + for k in range(16, 10, -1): + mask_points.append([int(ldms[k, 0]), int(ldms[k, 1])]) + mask_points.append([int(ldms[54, 0]), int(ldms[54, 1])]) + mask_points.append([int(ldms[35, 0]), int(ldms[35, 1])]) + mask_points.append([int(ldms[46, 0]), int(ldms[46, 1])]) + masks.append(get_mask(image, mask_points)) + + # mask 4: right above shoulder + mask_points = [] + mask_points.append([int(ldms[2, 0]), int(ldms[2, 1] - 10)]) + mask_points.append([int(ldms[2, 0]), int(ldms[2, 1] - 60)]) + mask_points.append([int(ldms[2, 0] + 50), int(ldms[2, 1] - 60)]) + mask_points.append([int(ldms[2, 0] + 50), int(ldms[2, 1] - 10)]) + masks.append(get_mask(image, mask_points)) + + # mask 5: left above shoulder + mask_points = [] + mask_points.append([int(ldms[14, 0]), int(ldms[14, 1] + 10)]) + mask_points.append([int(ldms[14, 0]), int(ldms[14, 1] + 60)]) + mask_points.append([int(ldms[14, 0] + 50), int(ldms[14, 1] + 60)]) + mask_points.append([int(ldms[14, 0] + 50), int(ldms[14, 1] + 10)]) + masks.append(get_mask(image, mask_points)) + + return masks