diff --git a/bob/pad/face/preprocessor/Chrom.py b/bob/pad/face/preprocessor/Chrom.py new file mode 100644 index 0000000000000000000000000000000000000000..3bd4b1ceda7ceb98217d74037891ec39dcf12f4f --- /dev/null +++ b/bob/pad/face/preprocessor/Chrom.py @@ -0,0 +1,222 @@ +import numpy + +import logging +logger = logging.getLogger("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +import bob.ip.facedetect +import bob.ip.skincolorfilter + +from bob.rppg.base.utils import crop_face +from bob.rppg.base.utils import build_bandpass_filter + +from bob.rppg.chrom.extract_utils import compute_mean_rgb +from bob.rppg.chrom.extract_utils import project_chrominance +from bob.rppg.chrom.extract_utils import compute_gray_diff +from bob.rppg.chrom.extract_utils import select_stable_frames + + +class Chrom(Preprocessor, object): + """ + This class extract the pulse signal from a video sequence. + The pulse is extracted according to the CHROM algorithm. + + **Parameters:** + + skin_threshold: float + The threshold for skin color probability + + skin_init: bool + If you want to re-initailize the skin color distribution at each frame + + framerate: int + The framerate of the video sequence. + + bp_order: int + The order of the bandpass filter + + window_size: int + The size of the window in the overlap-add procedure. + + motion: float + The percentage of frames you want to select where the + signal is "stable". 0 mean all the sequence. + + debug: boolean + Plot some stuff + """ + def __init__(self, skin_threshold=0.5, skin_init=False, framerate=25, bp_order=32, window_size=0, motion=0.0, debug=False, **kwargs): + + super(Chrom, self).__init__() + + self.skin_threshold = skin_threshold + self.skin_init = skin_init + self.framerate = framerate + self.bp_order = bp_order + self.window_size = window_size + self.motion = motion + self.debug = debug + + self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter() + + def __call__(self, frames, annotations): + """ + Compute the pulse signal for the given frame sequence + + **Parameters:** + + frames: :pyclass: `bob.bio.video.utils.FrameContainer` + Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer`` + for further details. + + annotations: :py:class:`dict` + A dictionary containing annotations of the face bounding box. + Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + **Returns:** + + pulse: numpy.array of size nb_frames + The pulse signal + """ + video = frames.as_array() + nb_frames = video.shape[0] + + chrom = numpy.zeros((nb_frames, 2), dtype='float64') + + # build the bandpass filter one and for all + bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False) + + counter = 0 + previous_bbox = None + for i, frame in enumerate(video): + + logger.debug("Processing frame {}/{}".format(counter, nb_frames)) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # get the face + try: + topleft = annotations[str(i)]['topleft'] + bottomright = annotations[str(i)]['bottomright'] + size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1]) + bbox = bob.ip.facedetect.BoundingBox(topleft, size) + face = crop_face(frame, bbox, bbox.size[1]) + except (KeyError, ZeroDivisionError) as e: + logger.warning("No annotations ... running face detection") + try: + bbox, quality = bob.ip.facedetect.detect_single_face(frame) + face = crop_face(frame, bbox, bbox.size[1]) + except: + bbox = previous_bbox + face = crop_face(frame, bbox, bbox.size[1]) + logger.warning("No detection, using bounding box from previous frame ...") + + # motion difference (if asked for) + if self.motion > 0.0 and (i < (nb_frames - 1)) and (counter > 0): + current = crop_face(frame, bbox, bbox.size[1]) + diff_motion[counter-1] = compute_gray_diff(face, current) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2)) + pyplot.show() + + # skin filter + if counter == 0 or self.skin_init: + self.skin_filter.estimate_gaussian_parameters(face) + logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance)) + skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold) + + if self.debug: + from matplotlib import pyplot + skin_mask_image = numpy.copy(face) + skin_mask_image[:, skin_mask] = 255 + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2)) + pyplot.show() + + # sometimes skin is not detected ! + if numpy.count_nonzero(skin_mask) != 0: + + # compute the mean rgb values of the skin pixels + r,g,b = compute_mean_rgb(face, skin_mask) + logger.debug("Mean color -> R = {0}, G = {1}, B = {2}".format(r,g,b)) + + # project onto the chrominance colorspace + chrom[counter] = project_chrominance(r, g, b) + logger.debug("Chrominance -> X = {0}, Y = {1}".format(chrom[counter][0], chrom[counter][1])) + + else: + logger.warn("No skin pixels detected in frame {0}, using previous value".format(i)) + # very unlikely, but it could happened and messed up all experiments (averaging of scores ...) + if counter == 0: + chrom[counter] = project_chrominance(128., 128., 128.) + else: + chrom[counter] = chrom[counter-1] + + + # keep the result of the last detection in case you cannot find a face in the next frame + previous_bbox = bbox + counter +=1 + + # select the most stable number of consecutive frames, if asked for + if self.motion > 0.0: + n_stable_frames_to_keep = int(self.motion * nb_frames) + logger.info("Number of stable frames kept for motion -> {0}".format(n_stable_frames_to_keep)) + index = select_stable_frames(diff_motion, n_stable_frames_to_keep) + logger.info("Stable segment -> {0} - {1}".format(index, index + n_stable_frames_to_keep)) + chrom = chrom[index:(index + n_stable_frames_to_keep),:] + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(2, sharex=True) + axarr[0].plot(range(chrom.shape[0]), chrom[:, 0], 'k') + axarr[0].set_title("X value in the chrominance subspace") + axarr[1].plot(range(chrom.shape[0]), chrom[:, 1], 'k') + axarr[1].set_title("Y value in the chrominance subspace") + pyplot.show() + + # now that we have the chrominance signals, apply bandpass + from scipy.signal import filtfilt + x_bandpassed = numpy.zeros(nb_frames, dtype='float64') + y_bandpassed = numpy.zeros(nb_frames, dtype='float64') + x_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 0]) + y_bandpassed = filtfilt(bandpass_filter, numpy.array([1]), chrom[:, 1]) + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(2, sharex=True) + axarr[0].plot(range(x_bandpassed.shape[0]), x_bandpassed, 'k') + axarr[0].set_title("X bandpassed") + axarr[1].plot(range(y_bandpassed.shape[0]), y_bandpassed, 'k') + axarr[1].set_title("Y bandpassed") + pyplot.show() + + # build the final pulse signal + alpha = numpy.std(x_bandpassed) / numpy.std(y_bandpassed) + pulse = x_bandpassed - alpha * y_bandpassed + + # overlap-add if window_size != 0 + if self.window_size > 0: + window_stride = self.window_size / 2 + for w in range(0, (len(pulse)-window_size), window_stride): + pulse[w:w+window_size] = 0.0 + xw = x_bandpassed[w:w+window_size] + yw = y_bandpassed[w:w+window_size] + alpha = numpy.std(xw) / numpy.std(yw) + sw = xw - alpha * yw + sw *= numpy.hanning(window_size) + pulse[w:w+window_size] += sw + + if self.debug: + from matplotlib import pyplot + f, axarr = pyplot.subplots(1) + pyplot.plot(range(pulse.shape[0]), pulse, 'k') + pyplot.title("Pulse signal") + pyplot.show() + + return pulse + diff --git a/bob/pad/face/preprocessor/CopyVideo.py b/bob/pad/face/preprocessor/CopyVideo.py deleted file mode 100644 index 7c6a4538236f9e9db090e574751c96ac2146b99f..0000000000000000000000000000000000000000 --- a/bob/pad/face/preprocessor/CopyVideo.py +++ /dev/null @@ -1,70 +0,0 @@ -import bob.io.base -import bob.io.video -from bob.bio.base.preprocessor import Preprocessor - -class CopyVideo(Preprocessor): - """ - Dummy class to load a video, and then write it as a bob.bio.video FrameContainer - - Used mainly with the Replay Mobile databases, where the low-level db interface - takes care of properly rotating the video - """ - def __init__(self, **kwargs): - super(CopyVideo, self).__init__(**kwargs) - - def __call__(self, frames, annotations): - """ - Just returns the video sequence - - **Parameters** - - ``frames`` : FrameContainer - Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer`` - for further details. - - ``annotations`` : :py:class:`dict` - A dictionary containing the annotations for each frame in the video. - Dictionary structure: ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``. - Where ``frameN_dict = {'topleft': (row, col), 'bottomright': (row, col)}`` - is the dictionary defining the coordinates of the face bounding box in frame N. - - **Returns:** - - ``frames`` : FrameContainer - The input frames, stored in the FrameContainer. - """ - return frames - - def write_data(self, frames, filename): - """ - Writes the given data (that has been generated using the __call__ function of this class) to file. - This method overwrites the write_data() method of the Preprocessor class. - - **Parameters:** - - ``frames`` : - data returned by the __call__ method of the class. - - ``filename`` : :py:class:`str` - name of the file. - """ - if frames: - bob.bio.video.preprocessor.Wrapper(Preprocessor()).write_data(frames, filename) - - def read_data(self, filename): - """ - Reads the preprocessed data from file. - This method overwrites the read_data() method of the Preprocessor class. - - **Parameters:** - - ``file_name`` : :py:class:`str` - name of the file. - - **Returns:** - - ``frames`` : :py:class:`bob.bio.video.FrameContainer` - Frames stored in the frame container. - """ - frames = bob.bio.video.preprocessor.Wrapper(Preprocessor()).read_data(filename) - return frames diff --git a/bob/pad/face/preprocessor/Li.py b/bob/pad/face/preprocessor/Li.py new file mode 100644 index 0000000000000000000000000000000000000000..4f6b3c2f4f2fe75b47ab472d012517b26114b807 --- /dev/null +++ b/bob/pad/face/preprocessor/Li.py @@ -0,0 +1,155 @@ +import numpy + +import logging +logger = logging.getLogger("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +from bob.rppg.base.utils import build_bandpass_filter +import bob.ip.dlib + +from bob.rppg.cvpr14.extract_utils import kp66_to_mask +from bob.rppg.cvpr14.extract_utils import compute_average_colors_mask +from bob.rppg.cvpr14.filter_utils import detrend +from bob.rppg.cvpr14.filter_utils import average + + +class Li(Preprocessor): + """ + This class extract the pulse signal from a video sequence. + + The pulse is extracted according to Li's CVPR 14 algorithm. + Note that this is a simplified version of the original + pulse extraction algorithms (mask detection in each + frame instead of tranking, no illumination correction, + no motion pruning) + + **Parameters:** + + indent: int + Indent (in percent of the face width) to apply to keypoints to get the mask. + + lamda_: int + the lamba value of the detrend filter + + window: int + The size of the window of the average filter + + framerate: int + The framerate of the video sequence. + + bp_order: int + The order of the bandpass filter + + debug: boolean + Plot some stuff + """ + def __init__(self, indent = 10, lambda_ = 300, window = 3, framerate = 25, bp_order = 32, debug=False, **kwargs): + + super(Li, self).__init__() + + self.indent = indent + self.lambda_ = lambda_ + self.window = window + self.framerate = framerate + self.bp_order = bp_order + self.debug = debug + + def __call__(self, frames, annotations): + """ + Compute the pulse signal for the given frame sequence + + **Parameters:** + + frames: :pyclass: `bob.bio.video.utils.FrameContainer` + Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer`` + for further details. + + annotations: :py:class:`dict` + A dictionary containing annotations of the face bounding box. + Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + **Returns:** + + pulse: numpy.array of size (nb_frame, 3) + The pulse signal in each color channel (RGB) + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the meancolor of the face along the sequence + face_color = numpy.zeros((nb_frames, 3), dtype='float64') + + # build the bandpass filter one and for all + bandpass_filter = build_bandpass_filter(self.framerate, self.bp_order, False) + + # landmarks detection + detector = bob.ip.dlib.DlibLandmarkExtraction() + + counter = 0 + previous_ldms = None + for i, frame in enumerate(video): + + logger.debug("Processing frame {}/{}".format(counter, nb_frames)) + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + # detect landmarks + try: + ldms = detector(frame) + except TypeError: + # looks like one video from replay mobile is upside down ! + rotated_shape = bob.ip.base.rotated_output_shape(frame, 180) + frame_rotated = numpy.ndarray(rotated_shape, dtype=numpy.float64) + from bob.ip.base import rotate + bob.ip.base.rotate(frame, frame_rotated, 180) + frame_rotated = frame_rotated.astype(numpy.uint8) + logger.warning("Rotating again ...") + try: + ldms = detector(frame_rotated) + except TypeError: + ldms = previous_ldms + # so do nothing ... + logger.warning("No mask detected in frame {}".format(i)) + face_color[i] = 0 + continue + frame = frame_rotated + + if self.debug: + from matplotlib import pyplot + display = numpy.copy(frame) + for p in ldms: + bob.ip.draw.plus(display, p, radius=5, color=(255, 0, 0)) + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(display, 2),2)) + pyplot.show() + + ldms = numpy.array(ldms) + mask_points, mask = kp66_to_mask(frame, ldms, self.indent, self.debug) + face_color[i] = compute_average_colors_mask(frame, mask, self.debug) + + previous_ldms = ldms + counter += 1 + + pulse = numpy.zeros((nb_frames, 3), dtype='float64') + for i in range(3): + # detrend + detrended = detrend(face_color[:, i], self.lambda_) + # average + averaged = average(detrended, self.window) + # bandpass + from scipy.signal import filtfilt + pulse[:, i] = filtfilt(bandpass_filter, numpy.array([1]), averaged) + + if self.debug: + from matplotlib import pyplot + for i in range(3): + f, ax = pyplot.subplots(2, sharex=True) + ax[0].plot(range(face_color.shape[0]), face_color[:, i], 'g') + ax[0].set_title('Original color signal') + ax[1].plot(range(face_color.shape[0]), pulse[:, i], 'g') + ax[1].set_title('Pulse signal') + pyplot.show() + + return pulse diff --git a/bob/pad/face/preprocessor/SSR.py b/bob/pad/face/preprocessor/SSR.py new file mode 100644 index 0000000000000000000000000000000000000000..bcd06e2a9f706a94737834ae765a469eae368ab9 --- /dev/null +++ b/bob/pad/face/preprocessor/SSR.py @@ -0,0 +1,160 @@ +import numpy + +import logging +logger = logging.getLogger("bob.pad.face") + +from bob.bio.base.preprocessor import Preprocessor + +import bob.ip.facedetect +import bob.ip.skincolorfilter + +from bob.rppg.base.utils import crop_face + +from bob.rppg.ssr.ssr_utils import get_eigen +from bob.rppg.ssr.ssr_utils import plot_eigenvectors +from bob.rppg.ssr.ssr_utils import build_P + + +class SSR(Preprocessor, object): + """ + This class extract the pulse signal from a video sequence. + The pulse is extracted according to the SSR algorithm. + + **Parameters:** + + skin_threshold: float + The threshold for skin color probability + + skin_init: bool + If you want to re-initailize the skin color distribution at each frame + + stride: int + The temporal stride. + + debug: boolean + Plot some stuff + + """ + def __init__(self, skin_threshold=0.5, skin_init=False, stride=25, debug=False, **kwargs): + + super(SSR, self).__init__() + + self.skin_threshold = skin_threshold + self.skin_init = skin_init + self.stride = stride + self.debug = debug + + self.skin_filter = bob.ip.skincolorfilter.SkinColorFilter() + + + def __call__(self, frames, annotations): + """ + Compute the pulse signal for the given frame sequence + + **Parameters:** + + frames: :pyclass: `bob.bio.video.utils.FrameContainer` + Video data stored in the FrameContainer, see ``bob.bio.video.utils.FrameContainer`` + for further details. + + annotations: :py:class:`dict` + A dictionary containing annotations of the face bounding box. + Dictionary must be as follows ``{'topleft': (row, col), 'bottomright': (row, col)}`` + + **Returns:** + + pulse: numpy.array of size nb_frames + The pulse signal + """ + video = frames.as_array() + nb_frames = video.shape[0] + + # the result -> the pulse signal + output_data = numpy.zeros(nb_frames, dtype='float64') + + # store the eigenvalues and the eigenvectors at each frame + eigenvalues = numpy.zeros((3, nb_frames), dtype='float64') + eigenvectors = numpy.zeros((3, 3, nb_frames), dtype='float64') + + counter = 0 + previous_bbox = None + previous_skin_pixels = None + + for i, frame in enumerate(video): + + logger.debug("Processing frame %d/%d...", i, nb_frames) + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(frame, 2),2)) + pyplot.show() + + + # get the face + try: + topleft = annotations[str(i)]['topleft'] + bottomright = annotations[str(i)]['bottomright'] + size = (bottomright[0]-topleft[0], bottomright[1]-topleft[1]) + bbox = bob.ip.facedetect.BoundingBox(topleft, size) + face = crop_face(frame, bbox, bbox.size[1]) + except (KeyError, ZeroDivisionError) as e: + logger.warning("No annotations ... running face detection") + try: + bbox, quality = bob.ip.facedetect.detect_single_face(frame) + face = crop_face(frame, bbox, bbox.size[1]) + except: + bbox = previous_bbox + face = crop_face(frame, bbox, bbox.size[1]) + logger.warning("No detection, using bounding box from previous frame ...") + + if self.debug: + from matplotlib import pyplot + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(face, 2),2)) + pyplot.show() + + # skin filter + if counter == 0 or self.skin_init: + self.skin_filter.estimate_gaussian_parameters(face) + logger.debug("Skin color parameters:\nmean\n{0}\ncovariance\n{1}".format(self.skin_filter.mean, self.skin_filter.covariance)) + + skin_mask = self.skin_filter.get_skin_mask(face, self.skin_threshold) + skin_pixels = face[:, skin_mask] + skin_pixels = skin_pixels.astype('float64') / 255.0 + + if self.debug: + from matplotlib import pyplot + skin_mask_image = numpy.copy(face) + skin_mask_image[:, skin_mask] = 255 + pyplot.title("skin pixels in frame {0}".format(i)) + pyplot.imshow(numpy.rollaxis(numpy.rollaxis(skin_mask_image, 2),2)) + pyplot.show() + + # nos skin pixels have ben detected ... using the previous ones + if skin_pixels.shape[1] == 0: + skin_pixels = previous_skin_pixels + logger.warn("No skin pixels detected, using the previous ones") + + # build c matrix and get eigenvectors and eigenvalues + eigenvalues[:, counter], eigenvectors[:, :, counter] = get_eigen(skin_pixels) + + if self.debug: + plot_eigenvectors(skin_pixels, eigenvectors[:, :, counter]) + + # build P and add it to the pulse signal + if counter >= self.stride: + tau = counter - self.stride + p = build_P(counter, self.stride, eigenvectors, eigenvalues) + output_data[tau:counter] += (p - numpy.mean(p)) + + previous_bbox = bbox + previous_skin_pixels = skin_pixels + counter += 1 + + if self.debug: + import matplotlib.pyplot as plt + fig = plt.figure() + ax = fig.add_subplot(111) + ax.plot(range(nb_frames), output_data) + plt.show() + + return output_data diff --git a/bob/pad/face/preprocessor/__init__.py b/bob/pad/face/preprocessor/__init__.py index 1ad3d4aafbc55206ed80d70bf618f78521a11567..ced731f794d72bd6c5d85656516eb27926df345e 100644 --- a/bob/pad/face/preprocessor/__init__.py +++ b/bob/pad/face/preprocessor/__init__.py @@ -3,7 +3,9 @@ from .ImageFaceCrop import ImageFaceCrop from .FrameDifference import FrameDifference from .VideoSparseCoding import VideoSparseCoding -from .CopyVideo import CopyVideo +from .Li import Li +from .Chrom import Chrom +from .SSR import SSR def __appropriate__(*args): """Says object was actually declared here, and not in the import module.