Skip to content
Snippets Groups Projects
Commit 2af856af authored by Guillaume HEUSCH's avatar Guillaume HEUSCH
Browse files

[extractor] added CHROM and SSR pulse extractor

parent 8959cf45
No related branches found
No related tags found
1 merge request!53WIP: rPPG as features for PAD
Pipeline #
from bob.bio.base.preprocessor import Filename from bob.bio.base.preprocessor import Filename
# This preprocessor does nothing, returning just the name of the file to extract the features from: # This preprocessor does nothing, returning just the name of the file to extract the features from:
# WARNING: if you use this, you should provide the preprocessed directory, as the database directory
# i.e. ./bin/spoof.py [config.py] --preprocessed-directory /idiap/group/biometric/databases/pad/replay/protocols/replayattack-database/
empty_preprocessor = Filename() empty_preprocessor = Filename()
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.facedetect
import bob.ip.base
import bob.ip.skincolorfilter
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.base.utils import crop_face
from bob.rppg.base.utils import build_bandpass_filter
from bob.rppg.chrom.extract_utils import compute_mean_rgb
from bob.rppg.chrom.extract_utils import project_chrominance
from bob.rppg.chrom.extract_utils import compute_gray_diff
from bob.rppg.chrom.extract_utils import select_stable_frames
class Chrom(Extractor, object):
"""
Extract pulse signal according to the CHROM algorithm
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
window_size: int
The size of the window in the overlap-add procedure.
motion: float
The percentage of frames you want to select where the
signal is "stable". 0 mean all the sequence.
"""
def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, **kwargs):
super(Chrom, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.framerate = framerate
self.bp_order = bp_order
self.window_size = window_size
self.motion = motion
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database.
**Returns:**
pulse: FrameContainer
Quality Measures for each frame stored in the FrameContainer.
"""
# load video based on the filename
assert isinstance(frames, six.string_types)
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
video = video.as_array()
nb_frames = video.shape[0]
output_data = numpy.zeros(nb_frames, dtype='float64')
chrom = numpy.zeros((nb_frames, 2), dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
counter = 0
previous_bbox = None
for i, frame in enumerate(video):
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
except:
bbox = previous_bbox
logger.warning("Using bounding box from previous frame ...")
# motion difference (if asked for)
if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0):
current = crop_face(frame, bbox, bbox.size[1])
diff_motion[counter-1] = compute_gray_diff(face, current)
face = crop_face(frame, bbox, bbox.size[1])
#from matplotlib import pyplot
#pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
#pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
#from matplotlib import pyplot
#skin_mask_image = numpy.copy(face)
#skin_mask_image[:, skin_mask] = 255
#pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
#pyplot.show()
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
# sometimes skin is not detected !
if numpy.count_nonzero(skin_mask) != 0:
# compute the mean rgb values of the skin pixels
r,g,b = compute_mean_rgb(face, skin_mask)
logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b))
# project onto the chrominance colorspace
chrom[counter] = project_chrominance(r, g, b)
logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1]))
else:
logger.warn("No skin pixels detected in frame {0}, using previous value".format(i))
# very unlikely, but it could happened and messed up all experiments (averaging of scores ...)
if counter == 0:
chrom[counter] = project_chrominance(128., 128., 128.)
else:
chrom[counter] = chrom[counter-1]
# keep the result of the last detection in case you cannot find a face in the next frame
previous_bbox = bbox
counter +=1
# select the most stable number of consecutive frames, if asked for
if self.motion > 0.0:
n_stable_frames_to_keep = int(self.motion * nb_frames)
logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep))
index = select_stable_frames(diff_motion, n_stable_frames_to_keep)
logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep))
chrom = chrom[index:(index + n_stable_frames_to_keep),:]
#from matplotlib import pyplot
#f, axarr = pyplot.subplots(2, sharex=True)
#axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k')
#axarr[0].set_title("X value in the chrominance subspace")
#axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k')
#axarr[1].set_title("Y value in the chrominance subspace")
#pyplot.show()
# now that we have the chrominance signals, apply bandpass
from scipy.signal import filtfilt
x_bandpassed = numpy.zeros(nb_frames, dtype='float64')
y_bandpassed = numpy.zeros(nb_frames, dtype='float64')
x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0])
y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1])
#from matplotlib import pyplot
#f, axarr = pyplot.subplots(2, sharex=True)
#axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k')
#axarr[0].set_title("X bandpassed")
#axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k')
#axarr[1].set_title("Y bandpassed")
#pyplot.show()
# build the final pulse signal
alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed)
pulse = x_bandpassed - alpha * y_bandpassed
# overlap-add if window_size != 0
if self.window_size > 0:
window_stride = self.window_size / 2
for w in range(0, (len(pulse)-window_size), window_stride):
pulse[w:w+window_size] = 0.0
xw = x_bandpassed[w:w+window_size]
yw = y_bandpassed[w:w+window_size]
alpha = numpy.std(xw) / numpy.std(yw)
sw = xw - alpha * yw
sw *= numpy.hanning(window_size)
pulse[w:w+window_size] += sw
#from matplotlib import pyplot
#f, axarr = pyplot.subplots(1)
#pyplot.plot(range(pulse.shape[0]), pulse, 'k')
#pyplot.title("Pulse signal")
#pyplot.show()
#output_data = pulse
return pulse
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.facedetect
import bob.ip.base
import bob.ip.skincolorfilter
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.base.utils import crop_face
from bob.rppg.ssr.ssr_utils import get_eigen
from bob.rppg.ssr.ssr_utils import plot_eigenvectors
from bob.rppg.ssr.ssr_utils import build_P
class SSR(Extractor, object):
"""
Extract pulse signal according to the SSR algorithm
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
stride: int
The temporal stride.
"""
def __init__(self, skin_threshold=0.5, skin_init=False, stride=25, **kwargs):
super(SSR, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.stride = stride
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database.
**Returns:**
pulse: FrameContainer
Quality Measures for each frame stored in the FrameContainer.
"""
# load video based on the filename
assert isinstance(frames, six.string_types)
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
video = video.as_array()
nb_frames = video.shape[0]
# the result -> the pulse signal
output_data = numpy.zeros(nb_frames, dtype='float64')
# store the eigenvalues and the eigenvectors at each frame
eigenvalues = numpy.zeros((3, nb_frames), dtype='float64')
eigenvectors = numpy.zeros((3, 3, nb_frames), dtype='float64')
### LET'S GO
#XXX
plot = True
counter = 0
previous_bbox = None
for i, frame in enumerate(video):
logger.debug("Processing frame %d/%d...", i, nb_frames)
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
except:
bbox = previous_bbox
logger.warning("Using bounding box from previous frame ...")
face = crop_face(frame, bbox, bbox.size[1])
#from matplotlib import pyplot
#pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
#pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
skin_pixels = face[:, skin_mask]
#from matplotlib import pyplot
#skin_mask_image = numpy.copy(face)
#skin_mask_image[:, skin_mask] = 255
#pyplot.title("skin pixels in frame {0}".format(i))
#pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
#pyplot.show()
skin_pixels = skin_pixels.astype('float64') / 255.0
# build c matrix and get eigenvectors and eigenvalues
eigenvalues[:, counter], eigenvectors[:, :, counter] = get_eigen(skin_pixels)
#plot_eigenvectors(skin_pixels, eigenvectors[:, :, counter])
# build P and add it to the pulse signal
if counter >= self.stride:
tau = counter - self.stride
p = build_P(counter, self.stride, eigenvectors, eigenvalues)
output_data[tau:counter] += (p - numpy.mean(p))
counter += 1
# plot the pulse signal
#import matplotlib.pyplot as plt
#fig = plt.figure()
#ax = fig.add_subplot(111)
#ax.plot(range(nb_frames), output_data)
#plt.show()
return output_data
...@@ -5,6 +5,9 @@ from .VideoDataLoader import VideoDataLoader ...@@ -5,6 +5,9 @@ from .VideoDataLoader import VideoDataLoader
from .VideoQualityMeasure import VideoQualityMeasure from .VideoQualityMeasure import VideoQualityMeasure
from .FrameDiffFeatures import FrameDiffFeatures from .FrameDiffFeatures import FrameDiffFeatures
from .Chrom import Chrom
from .SSR import SSR
def __appropriate__(*args): def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module. """Says object was actually declared here, and not in the import module.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment