Skip to content
Snippets Groups Projects
Commit ce463a0c authored by Guillaume HEUSCH's avatar Guillaume HEUSCH
Browse files

[preprocessor] added all the pulse algorithms as preprocessor, and removed the dummy one

parent fa5b2087
No related branches found
No related tags found
1 merge request!53WIP: rPPG as features for PAD
import numpy
import logging
logger = logging.getLogger("bob.pad.face")
from bob.bio.base.preprocessor import Preprocessor
import bob.ip.facedetect
import bob.ip.skincolorfilter
from bob.rppg.base.utils import crop_face
from bob.rppg.base.utils import build_bandpass_filter
from bob.rppg.chrom.extract_utils import compute_mean_rgb
from bob.rppg.chrom.extract_utils import project_chrominance
from bob.rppg.chrom.extract_utils import compute_gray_diff
from bob.rppg.chrom.extract_utils import select_stable_frames
class Chrom(Preprocessor, object):
"""
This class extract the pulse signal from a video sequence.
The pulse is extracted according to the CHROM algorithm.
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
window_size: int
The size of the window in the overlap-add procedure.
motion: float
The percentage of frames you want to select where the
signal is "stable". 0 mean all the sequence.
debug: boolean
Plot some stuff
"""
def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, debug=False, **kwargs):
super(Chrom, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.framerate = framerate
self.bp_order = bp_order
self.window_size = window_size
self.motion = motion
self.debug = debug
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames, annotations):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: :pyclass: `bob.bio.video.utils.FrameContainer`
Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer``
for further details.
annotations: :py:class:`dict`
A dictionary containing annotations of the face bounding box.
Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}``
**Returns:**
pulse: numpy.array of size nb_frames
The pulse signal
"""
video = frames.as_array()
nb_frames = video.shape[0]
chrom = numpy.zeros((nb_frames, 2), dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
counter = 0
previous_bbox = None
for i, frame in enumerate(video):
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
# get the face
try:
topleft = annotations[str(i)]['topleft']
bottomright = annotations[str(i)]['bottomright']
size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1])
bbox = bob.ip.facedetect.BoundingBox(topleft, size)
face = crop_face(frame, bbox, bbox.size[1])
except (KeyError, ZeroDivisionError) as e:
logger.warning("No annotations ... running face detection")
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
face = crop_face(frame, bbox, bbox.size[1])
except:
bbox = previous_bbox
face = crop_face(frame, bbox, bbox.size[1])
logger.warning("No detection, using bounding box from previous frame ...")
# motion difference (if asked for)
if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0):
current = crop_face(frame, bbox, bbox.size[1])
diff_motion[counter-1] = compute_gray_diff(face, current)
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
if self.debug:
from matplotlib import pyplot
skin_mask_image = numpy.copy(face)
skin_mask_image[:, skin_mask] = 255
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
pyplot.show()
# sometimes skin is not detected !
if numpy.count_nonzero(skin_mask) != 0:
# compute the mean rgb values of the skin pixels
r,g,b = compute_mean_rgb(face, skin_mask)
logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b))
# project onto the chrominance colorspace
chrom[counter] = project_chrominance(r, g, b)
logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1]))
else:
logger.warn("No skin pixels detected in frame {0}, using previous value".format(i))
# very unlikely, but it could happened and messed up all experiments (averaging of scores ...)
if counter == 0:
chrom[counter] = project_chrominance(128., 128., 128.)
else:
chrom[counter] = chrom[counter-1]
# keep the result of the last detection in case you cannot find a face in the next frame
previous_bbox = bbox
counter +=1
# select the most stable number of consecutive frames, if asked for
if self.motion > 0.0:
n_stable_frames_to_keep = int(self.motion * nb_frames)
logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep))
index = select_stable_frames(diff_motion, n_stable_frames_to_keep)
logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep))
chrom = chrom[index:(index + n_stable_frames_to_keep),:]
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k')
axarr[0].set_title("X value in the chrominance subspace")
axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k')
axarr[1].set_title("Y value in the chrominance subspace")
pyplot.show()
# now that we have the chrominance signals, apply bandpass
from scipy.signal import filtfilt
x_bandpassed = numpy.zeros(nb_frames, dtype='float64')
y_bandpassed = numpy.zeros(nb_frames, dtype='float64')
x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0])
y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1])
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(2, sharex=True)
axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k')
axarr[0].set_title("X bandpassed")
axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k')
axarr[1].set_title("Y bandpassed")
pyplot.show()
# build the final pulse signal
alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed)
pulse = x_bandpassed - alpha * y_bandpassed
# overlap-add if window_size != 0
if self.window_size > 0:
window_stride = self.window_size / 2
for w in range(0, (len(pulse)-window_size), window_stride):
pulse[w:w+window_size] = 0.0
xw = x_bandpassed[w:w+window_size]
yw = y_bandpassed[w:w+window_size]
alpha = numpy.std(xw) / numpy.std(yw)
sw = xw - alpha * yw
sw *= numpy.hanning(window_size)
pulse[w:w+window_size] += sw
if self.debug:
from matplotlib import pyplot
f, axarr = pyplot.subplots(1)
pyplot.plot(range(pulse.shape[0]), pulse, 'k')
pyplot.title("Pulse signal")
pyplot.show()
return pulse
import bob.io.base
import bob.io.video
from bob.bio.base.preprocessor import Preprocessor
class CopyVideo(Preprocessor):
"""
Dummy class to load a video, and then write it as a bob.bio.video FrameContainer
Used mainly with the Replay Mobile databases, where the low-level db interface
takes care of properly rotating the video
"""
def __init__(self, **kwargs):
super(CopyVideo, self).__init__(**kwargs)
def __call__(self, frames, annotations):
"""
Just returns the video sequence
**Parameters**
``frames`` : FrameContainer
Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer``
for further details.
``annotations`` : :py:class:`dict`
A dictionary containing the annotations for each frame in the video.
Dictionary structure: ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``.
Where ``frameN_dict = {'topleft': (row, col), 'bottomright': (row, col)}``
is the dictionary defining the coordinates of the face bounding box in frame N.
**Returns:**
``frames`` : FrameContainer
The input frames, stored in the FrameContainer.
"""
return frames
def write_data(self, frames, filename):
"""
Writes the given data (that has been generated using the __call__ function of this class) to file.
This method overwrites the write_data() method of the Preprocessor class.
**Parameters:**
``frames`` :
data returned by the __call__ method of the class.
``filename`` : :py:class:`str`
name of the file.
"""
if frames:
bob.bio.video.preprocessor.Wrapper(Preprocessor()).write_data(frames, filename)
def read_data(self, filename):
"""
Reads the preprocessed data from file.
This method overwrites the read_data() method of the Preprocessor class.
**Parameters:**
``file_name`` : :py:class:`str`
name of the file.
**Returns:**
``frames`` : :py:class:`bob.bio.video.FrameContainer`
Frames stored in the frame container.
"""
frames = bob.bio.video.preprocessor.Wrapper(Preprocessor()).read_data(filename)
return frames
import numpy
import logging
logger = logging.getLogger("bob.pad.face")
from bob.bio.base.preprocessor import Preprocessor
from bob.rppg.base.utils import build_bandpass_filter
import bob.ip.dlib
from bob.rppg.cvpr14.extract_utils import kp66_to_mask
from bob.rppg.cvpr14.extract_utils import compute_average_colors_mask
from bob.rppg.cvpr14.filter_utils import detrend
from bob.rppg.cvpr14.filter_utils import average
class Li(Preprocessor):
"""
This class extract the pulse signal from a video sequence.
The pulse is extracted according to Li's CVPR 14 algorithm.
Note that this is a simplified version of the original
pulse extraction algorithms (mask detection in each
frame instead of tranking, no illumination correction,
no motion pruning)
**Parameters:**
indent: int
Indent (in percent of the face width) to apply to keypoints to get the mask.
lamda_: int
the lamba value of the detrend filter
window: int
The size of the window of the average filter
framerate: int
The framerate of the video sequence.
bp_order: int
The order of the bandpass filter
debug: boolean
Plot some stuff
"""
def __init__(self, indent = 10, lambda_ = 300, window = 3, framerate = 25, bp_order = 32, debug=False, **kwargs):
super(Li, self).__init__()
self.indent = indent
self.lambda_ = lambda_
self.window = window
self.framerate = framerate
self.bp_order = bp_order
self.debug = debug
def __call__(self, frames, annotations):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: :pyclass: `bob.bio.video.utils.FrameContainer`
Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer``
for further details.
annotations: :py:class:`dict`
A dictionary containing annotations of the face bounding box.
Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}``
**Returns:**
pulse: numpy.array of size (nb_frame, 3)
The pulse signal in each color channel (RGB)
"""
video = frames.as_array()
nb_frames = video.shape[0]
# the meancolor of the face along the sequence
face_color = numpy.zeros((nb_frames, 3), dtype='float64')
# build the bandpass filter one and for all
bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False)
# landmarks detection
detector = bob.ip.dlib.DlibLandmarkExtraction()
counter = 0
previous_ldms = None
for i, frame in enumerate(video):
logger.debug("Processing frame {}/{}".format(counter, nb_frames))
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
# detect landmarks
try:
ldms = detector(frame)
except TypeError:
# looks like one video from replay mobile is upside down !
rotated_shape = bob.ip.base.rotated_output_shape(frame, 180)
frame_rotated = numpy.ndarray(rotated_shape, dtype=numpy.float64)
from bob.ip.base import rotate
bob.ip.base.rotate(frame, frame_rotated, 180)
frame_rotated = frame_rotated.astype(numpy.uint8)
logger.warning("Rotating again ...")
try:
ldms = detector(frame_rotated)
except TypeError:
ldms = previous_ldms
# so do nothing ...
logger.warning("No mask detected in frame {}".format(i))
face_color[i] = 0
continue
frame = frame_rotated
if self.debug:
from matplotlib import pyplot
display = numpy.copy(frame)
for p in ldms:
bob.ip.draw.plus(display, p, radius=5, color=(255, 0, 0))
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(display, 2),2))
pyplot.show()
ldms = numpy.array(ldms)
mask_points, mask = kp66_to_mask(frame, ldms, self.indent, self.debug)
face_color[i] = compute_average_colors_mask(frame, mask, self.debug)
previous_ldms = ldms
counter += 1
pulse = numpy.zeros((nb_frames, 3), dtype='float64')
for i in range(3):
# detrend
detrended = detrend(face_color[:, i], self.lambda_)
# average
averaged = average(detrended, self.window)
# bandpass
from scipy.signal import filtfilt
pulse[:, i] = filtfilt(bandpass_filter, numpy.array([1]), averaged)
if self.debug:
from matplotlib import pyplot
for i in range(3):
f, ax = pyplot.subplots(2, sharex=True)
ax[0].plot(range(face_color.shape[0]), face_color[:, i], 'g')
ax[0].set_title('Original color signal')
ax[1].plot(range(face_color.shape[0]), pulse[:, i], 'g')
ax[1].set_title('Pulse signal')
pyplot.show()
return pulse
import numpy
import logging
logger = logging.getLogger("bob.pad.face")
from bob.bio.base.preprocessor import Preprocessor
import bob.ip.facedetect
import bob.ip.skincolorfilter
from bob.rppg.base.utils import crop_face
from bob.rppg.ssr.ssr_utils import get_eigen
from bob.rppg.ssr.ssr_utils import plot_eigenvectors
from bob.rppg.ssr.ssr_utils import build_P
class SSR(Preprocessor, object):
"""
This class extract the pulse signal from a video sequence.
The pulse is extracted according to the SSR algorithm.
**Parameters:**
skin_threshold: float
The threshold for skin color probability
skin_init: bool
If you want to re-initailize the skin color distribution at each frame
stride: int
The temporal stride.
debug: boolean
Plot some stuff
"""
def __init__(self, skin_threshold=0.5, skin_init=False, stride=25, debug=False, **kwargs):
super(SSR, self).__init__()
self.skin_threshold = skin_threshold
self.skin_init = skin_init
self.stride = stride
self.debug = debug
self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter()
def __call__(self, frames, annotations):
"""
Compute the pulse signal for the given frame sequence
**Parameters:**
frames: :pyclass: `bob.bio.video.utils.FrameContainer`
Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer``
for further details.
annotations: :py:class:`dict`
A dictionary containing annotations of the face bounding box.
Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}``
**Returns:**
pulse: numpy.array of size nb_frames
The pulse signal
"""
video = frames.as_array()
nb_frames = video.shape[0]
# the result -> the pulse signal
output_data = numpy.zeros(nb_frames, dtype='float64')
# store the eigenvalues and the eigenvectors at each frame
eigenvalues = numpy.zeros((3, nb_frames), dtype='float64')
eigenvectors = numpy.zeros((3, 3, nb_frames), dtype='float64')
counter = 0
previous_bbox = None
previous_skin_pixels = None
for i, frame in enumerate(video):
logger.debug("Processing frame %d/%d...", i, nb_frames)
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2))
pyplot.show()
# get the face
try:
topleft = annotations[str(i)]['topleft']
bottomright = annotations[str(i)]['bottomright']
size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1])
bbox = bob.ip.facedetect.BoundingBox(topleft, size)
face = crop_face(frame, bbox, bbox.size[1])
except (KeyError, ZeroDivisionError) as e:
logger.warning("No annotations ... running face detection")
try:
bbox, quality = bob.ip.facedetect.detect_single_face(frame)
face = crop_face(frame, bbox, bbox.size[1])
except:
bbox = previous_bbox
face = crop_face(frame, bbox, bbox.size[1])
logger.warning("No detection, using bounding box from previous frame ...")
if self.debug:
from matplotlib import pyplot
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2))
pyplot.show()
# skin filter
if counter == 0 or self.skin_init:
self.skin_filter.estimate_gaussian_parameters(face)
logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance))
skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold)
skin_pixels = face[:, skin_mask]
skin_pixels = skin_pixels.astype('float64') / 255.0
if self.debug:
from matplotlib import pyplot
skin_mask_image = numpy.copy(face)
skin_mask_image[:, skin_mask] = 255
pyplot.title("skin pixels in frame {0}".format(i))
pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2))
pyplot.show()
# nos skin pixels have ben detected ... using the previous ones
if skin_pixels.shape[1] == 0:
skin_pixels = previous_skin_pixels
logger.warn("No skin pixels detected, using the previous ones")
# build c matrix and get eigenvectors and eigenvalues
eigenvalues[:, counter], eigenvectors[:, :, counter] = get_eigen(skin_pixels)
if self.debug:
plot_eigenvectors(skin_pixels, eigenvectors[:, :, counter])
# build P and add it to the pulse signal
if counter >= self.stride:
tau = counter - self.stride
p = build_P(counter, self.stride, eigenvectors, eigenvalues)
output_data[tau:counter] += (p - numpy.mean(p))
previous_bbox = bbox
previous_skin_pixels = skin_pixels
counter += 1
if self.debug:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(range(nb_frames), output_data)
plt.show()
return output_data
......@@ -3,7 +3,9 @@ from .ImageFaceCrop import ImageFaceCrop
from .FrameDifference import FrameDifference
from .VideoSparseCoding import VideoSparseCoding
from .CopyVideo import CopyVideo
from .Li import Li
from .Chrom import Chrom
from .SSR import SSR
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment