Skip to content
Snippets Groups Projects
Commit fa5b2087 authored by Guillaume HEUSCH's avatar Guillaume HEUSCH
Browse files

[extractor] removed all the pulse extractor

parent 99126097
No related branches found
No related tags found
1 merge request!53WIP: rPPG as features for PAD
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.facedetect
import bob.ip.base
import bob.ip.skincolorfilter
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.base.utils import crop_face
from bob.rppg.base.utils import build_bandpass_filter
from bob.rppg.chrom.extract_utils import compute_mean_rgb
from bob.rppg.chrom.extract_utils import project_chrominance
from bob.rppg.chrom.extract_utils import compute_gray_diff
from bob.rppg.chrom.extract_utils import select_stable_frames
class Chrom(Extractor, object):
"""
Extract pulse signal according to the CHROM algorithm
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
window_size: int
The size of the window in the overlap-add procedure.
motion: float
The percentage of frames you want to select where the
signal is "stable". 0 mean all the sequence.
debug: boolean
Plot some stuff
"""
def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, debug=False, **kwargs):
super(Chrom, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.framerate = framerate
self.bp_order = bp_order
self.window_size = window_size
self.motion = motion
self.debug = debug
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database
and not using any high or low-level db packages (so beware).
**Returns:**
pulse: numpy.array
The pulse signal
"""
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
else:
video = frames
video = video.as_array()
nb_frames = video.shape[0]
chrom = numpy.zeros((nb_frames, 2), dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
counter = 0
previous_bbox = None
for i, frame in enumerate(video):
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
except:
bbox = previous_bbox
logger.warning("Using bounding box from previous frame ...")
# motion difference (if asked for)
if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0):
current = crop_face(frame, bbox, bbox.size[1])
diff_motion[counter-1] = compute_gray_diff(face, current)
face = crop_face(frame, bbox, bbox.size[1])
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
if self.debug:
from matplotlib import pyplot
skin_mask_image = numpy.copy(face)
skin_mask_image[:, skin_mask] = 255
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
pyplot.show()
# sometimes skin is not detected !
if numpy.count_nonzero(skin_mask) != 0:
# compute the mean rgb values of the skin pixels
r,g,b = compute_mean_rgb(face, skin_mask)
logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b))
# project onto the chrominance colorspace
chrom[counter] = project_chrominance(r, g, b)
logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1]))
else:
logger.warn("No skin pixels detected in frame {0}, using previous value".format(i))
# very unlikely, but it could happened and messed up all experiments (averaging of scores ...)
if counter == 0:
chrom[counter] = project_chrominance(128., 128., 128.)
else:
chrom[counter] = chrom[counter-1]
# keep the result of the last detection in case you cannot find a face in the next frame
previous_bbox = bbox
counter +=1
# select the most stable number of consecutive frames, if asked for
if self.motion > 0.0:
n_stable_frames_to_keep = int(self.motion * nb_frames)
logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep))
index = select_stable_frames(diff_motion, n_stable_frames_to_keep)
logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep))
chrom = chrom[index:(index + n_stable_frames_to_keep),:]
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k')
axarr[0].set_title("X value in the chrominance subspace")
axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k')
axarr[1].set_title("Y value in the chrominance subspace")
pyplot.show()
# now that we have the chrominance signals, apply bandpass
from scipy.signal import filtfilt
x_bandpassed = numpy.zeros(nb_frames, dtype='float64')
y_bandpassed = numpy.zeros(nb_frames, dtype='float64')
x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0])
y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1])
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k')
axarr[0].set_title("X bandpassed")
axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k')
axarr[1].set_title("Y bandpassed")
pyplot.show()
# build the final pulse signal
alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed)
pulse = x_bandpassed - alpha * y_bandpassed
# overlap-add if window_size != 0
if self.window_size > 0:
window_stride = self.window_size / 2
for w in range(0, (len(pulse)-window_size), window_stride):
pulse[w:w+window_size] = 0.0
xw = x_bandpassed[w:w+window_size]
yw = y_bandpassed[w:w+window_size]
alpha = numpy.std(xw) / numpy.std(yw)
sw = xw - alpha * yw
sw *= numpy.hanning(window_size)
pulse[w:w+window_size] += sw
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(1)
pyplot.plot(range(pulse.shape[0]), pulse, 'k')
pyplot.title("Pulse signal")
pyplot.show()
return pulse
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.base
import bob.ip.dlib
import bob.ip.draw
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.cvpr14.extract_utils import kp66_to_mask
from bob.rppg.cvpr14.extract_utils import compute_average_colors_mask
from bob.rppg.cvpr14.filter_utils import detrend
from bob.rppg.cvpr14.filter_utils import average
from bob.rppg.base.utils import build_bandpass_filter
class Li(Extractor, object):
"""
Extract pulse signal according to Li's CVPR 14 algorithm.
Note that this is a simplified version of the original
pulse extraction algorithms (mask detection in each
frame instead of tranking, no illumination correction,
no motion pruning)
**Parameters:**
indent: int
Indent (in percent of the face width) to apply to keypoints to get the mask.
lamda_: int
the lamba value of the detrend filter
window: int
The size of the window of the average filter
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
"""
def __init__(self, indent = 10, lambda_ = 300, window = 3, framerate = 25, bp_order = 32, debug=False, **kwargs):
super(Li, self).__init__()
self.indent = indent
self.lambda_ = lambda_
self.window = window
self.framerate = framerate
self.bp_order = bp_order
self.debug = debug
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database.
and not using any high or low-level db packages (so beware).
**Returns:**
pulse: numpy.array
The pulse signal
"""
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
else:
video = frames
video = video.as_array()
nb_frames = video.shape[0]
# the mean green color of the face along the sequence
face_color = numpy.zeros(nb_frames, dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
# landmarks detection
detector = bob.ip.dlib.DlibLandmarkExtraction()
counter = 0
previous_ldms = None
for i, frame in enumerate(video):
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
# detect landmarks
try:
ldms = detector(frame)
except TypeError:
####################
# looks like some images from replay mobile are upside down !
# looks like bob.ip.rotate has a color issue
####################
#rotated_shape = bob.ip.base.rotated_output_shape(frame, 180)
#frame_rotated = numpy.ndarray(rotated_shape, dtype=numpy.float64)
#from bob.ip.base import rotate
#bob.ip.base.rotate(frame, frame_rotated, 180)
#logger.warning("Rotating again ...")
#try:
# ldms = detector(frame_rotated)
#except TypeError:
# ldms = previous_ldms
#frame = frame_rotated
# so do nothing ...
logger.warning("No mask detected in frame {}".format(i))
face_color[i] = 0
continue
if self.debug:
from matplotlib import pyplot
display = numpy.copy(frame)
for p in ldms:
bob.ip.draw.plus(display, p, radius=5, color=(255, 0, 0))
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(display, 2),2))
pyplot.show()
ldms = numpy.array(ldms)
mask_points, mask = kp66_to_mask(frame, ldms, self.indent, False)
face_color[i] = compute_average_colors_mask(frame, mask, False)
previous_ldms = ldms
counter += 1
# detrend
detrended = detrend(face_color, self.lambda_)
# average
averaged = average(detrended, self.window)
# bandpass
from scipy.signal import filtfilt
bandpassed = filtfilt(bandpass_filter, numpy.array([1]), averaged)
if self.debug:
from matplotlib import pyplot
f, ax = pyplot.subplots(4, sharex=True)
ax[0].plot(range(face_color.shape[0]), face_color, 'g')
ax[0].set_title('Original signal')
ax[1].plot(range(face_color.shape[0]), detrended, 'g')
ax[1].set_title('After detrending')
ax[2].plot(range(face_color.shape[0]), averaged, 'g')
ax[2].set_title('After averaging')
ax[3].plot(range(face_color.shape[0]), bandpassed, 'g')
ax[3].set_title('Bandpassed signal')
pyplot.show()
return bandpassed
#!/usr/bin/env python
# encoding: utf-8
import six
import numpy
import bob.bio.video
from bob.bio.base.extractor import Extractor
from bob.pad.face.extractor import VideoDataLoader
import bob.ip.facedetect
import bob.ip.base
import bob.ip.skincolorfilter
import logging
logger = logging.getLogger("bob.pad.face")
from bob.rppg.base.utils import crop_face
from bob.rppg.ssr.ssr_utils import get_eigen
from bob.rppg.ssr.ssr_utils import plot_eigenvectors
from bob.rppg.ssr.ssr_utils import build_P
class SSR(Extractor, object):
"""
Extract pulse signal according to the SSR algorithm
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
stride: int
The temporal stride.
debug: boolean
Plot some stuff
"""
def __init__(self, skin_threshold=0.5, skin_init=False, stride=25, debug=False, **kwargs):
super(SSR, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.stride = stride
self.debug = debug
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: FrameContainer or string.
Video data stored in the FrameContainer,
see ``bob.bio.video.utils.FrameContainer`` for further details.
If string, the name of the file to load the video data from is
defined in it. String is possible only when empty preprocessor is
used. In this case video data is loaded directly from the database
and not using any high or low-level db packages (so beware).
**Returns:**
pulse: numpy.array
The pulse signal
"""
if isinstance(frames, six.string_types):
video_loader = VideoDataLoader()
video = video_loader(frames)
else:
video = frames
video = video.as_array()
nb_frames = video.shape[0]
# the result -> the pulse signal
output_data = numpy.zeros(nb_frames, dtype='float64')
# store the eigenvalues and the eigenvectors at each frame
eigenvalues = numpy.zeros((3, nb_frames), dtype='float64')
eigenvectors = numpy.zeros((3, 3, nb_frames), dtype='float64')
counter = 0
previous_bbox = None
previous_skin_pixels = None
for i, frame in enumerate(video):
logger.debug("Processing frame %d/%d...", i, nb_frames)
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
except:
bbox = previous_bbox
logger.warning("Using bounding box from previous frame ...")
face = crop_face(frame, bbox, bbox.size[1])
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
skin_pixels = face[:, skin_mask]
skin_pixels = skin_pixels.astype('float64') / 255.0
if self.debug:
from matplotlib import pyplot
skin_mask_image = numpy.copy(face)
skin_mask_image[:, skin_mask] = 255
pyplot.title("skin pixels in frame {0}".format(i))
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
pyplot.show()
# nos skin pixels have ben detected ... using the previous ones
if skin_pixels.shape[1] == 0:
skin_pixels = previous_skin_pixels
logger.warn("No skin pixels detected, using the previous ones")
# build c matrix and get eigenvectors and eigenvalues
eigenvalues[:, counter], eigenvectors[:, :, counter] = get_eigen(skin_pixels)
if self.debug:
plot_eigenvectors(skin_pixels, eigenvectors[:, :, counter])
# build P and add it to the pulse signal
if counter >= self.stride:
tau = counter - self.stride
p = build_P(counter, self.stride, eigenvectors, eigenvalues)
output_data[tau:counter] += (p - numpy.mean(p))
previous_bbox = bbox
previous_skin_pixels = skin_pixels
counter += 1
if self.debug:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(range(nb_frames), output_data)
plt.show()
return output_data
......@@ -5,11 +5,6 @@ from .VideoDataLoader import VideoDataLoader
from .VideoQualityMeasure import VideoQualityMeasure
from .FrameDiffFeatures import FrameDiffFeatures
from .Chrom import Chrom
from .SSR import SSR
from .Li import Li
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment