Skip to content
Snippets Groups Projects

WIP: rPPG as features for PAD

Merged Guillaume HEUSCH requested to merge rppg into master
2 unresolved threads
4 files
+ 0
563
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 0
226
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.facedetect
import bob.ip.base
import bob.ip.skincolorfilter
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.base.utils import crop_face
from bob.rppg.base.utils import build_bandpass_filter
from bob.rppg.chrom.extract_utils import compute_mean_rgb
from bob.rppg.chrom.extract_utils import project_chrominance
from bob.rppg.chrom.extract_utils import compute_gray_diff
from bob.rppg.chrom.extract_utils import select_stable_frames
class Chrom(Extractor, object):
"""
Extract pulse signal according to the CHROM algorithm
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
window_size: int
The size of the window in the overlap-add procedure.
motion: float
The percentage of frames you want to select where the
signal is "stable". 0 mean all the sequence.
debug: boolean
Plot some stuff
"""
def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, debug=False, **kwargs):
super(Chrom, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.framerate = framerate
self.bp_order = bp_order
self.window_size = window_size
self.motion = motion
self.debug = debug
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database
and not using any high or low-level db packages (so beware).
**Returns:**
pulse: numpy.array
The pulse signal
"""
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
else:
video = frames
video = video.as_array()
nb_frames = video.shape[0]
chrom = numpy.zeros((nb_frames, 2), dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
counter = 0
previous_bbox = None
for i, frame in enumerate(video):
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
except:
bbox = previous_bbox
logger.warning("Using bounding box from previous frame ...")
# motion difference (if asked for)
if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0):
current = crop_face(frame, bbox, bbox.size[1])
diff_motion[counter-1] = compute_gray_diff(face, current)
face = crop_face(frame, bbox, bbox.size[1])
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
if self.debug:
from matplotlib import pyplot
skin_mask_image = numpy.copy(face)
skin_mask_image[:, skin_mask] = 255
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
pyplot.show()
# sometimes skin is not detected !
if numpy.count_nonzero(skin_mask) != 0:
# compute the mean rgb values of the skin pixels
r,g,b = compute_mean_rgb(face, skin_mask)
logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b))
# project onto the chrominance colorspace
chrom[counter] = project_chrominance(r, g, b)
logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1]))
else:
logger.warn("No skin pixels detected in frame {0}, using previous value".format(i))
# very unlikely, but it could happened and messed up all experiments (averaging of scores ...)
if counter == 0:
chrom[counter] = project_chrominance(128., 128., 128.)
else:
chrom[counter] = chrom[counter-1]
# keep the result of the last detection in case you cannot find a face in the next frame
previous_bbox = bbox
counter +=1
# select the most stable number of consecutive frames, if asked for
if self.motion > 0.0:
n_stable_frames_to_keep = int(self.motion * nb_frames)
logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep))
index = select_stable_frames(diff_motion, n_stable_frames_to_keep)
logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep))
chrom = chrom[index:(index + n_stable_frames_to_keep),:]
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k')
axarr[0].set_title("X value in the chrominance subspace")
axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k')
axarr[1].set_title("Y value in the chrominance subspace")
pyplot.show()
# now that we have the chrominance signals, apply bandpass
from scipy.signal import filtfilt
x_bandpassed = numpy.zeros(nb_frames, dtype='float64')
y_bandpassed = numpy.zeros(nb_frames, dtype='float64')
x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0])
y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1])
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k')
axarr[0].set_title("X bandpassed")
axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k')
axarr[1].set_title("Y bandpassed")
pyplot.show()
# build the final pulse signal
alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed)
pulse = x_bandpassed - alpha * y_bandpassed
# overlap-add if window_size != 0
if self.window_size > 0:
window_stride = self.window_size / 2
for w in range(0, (len(pulse)-window_size), window_stride):
pulse[w:w+window_size] = 0.0
xw = x_bandpassed[w:w+window_size]
yw = y_bandpassed[w:w+window_size]
alpha = numpy.std(xw) / numpy.std(yw)
sw = xw - alpha * yw
sw *= numpy.hanning(window_size)
pulse[w:w+window_size] += sw
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(1)
pyplot.plot(range(pulse.shape[0]), pulse, 'k')
pyplot.title("Pulse signal")
pyplot.show()
return pulse
Loading