Skip to content
Snippets Groups Projects
Commit 023c6c00 authored by Guillaume HEUSCH's avatar Guillaume HEUSCH
Browse files

Merge branch 'preproc_update' into 'master'

MC preprocessor

See merge request !81
parents 495035df d9ca395b
No related branches found
No related tags found
1 merge request!81MC preprocessor
Pipeline #26055 passed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 6 14:14:28 2018
@author: Olegs Nikisins
"""
# =============================================================================
# Import what is needed here:
from bob.bio.base.preprocessor import Preprocessor
import numpy as np
# =============================================================================
class BlockPatch(Preprocessor, object):
"""
This class is designed to extract patches from the ROI in the input image.
The ROI/block to extract patches from is defined by the top-left and
bottom-right coordinates of the bounding box. Patches can be extracted
from the loactions of the nodes of the uniform grid.
Size of the grid cell is defined by the step parameter.
Patches are of the square shape, and the number of extracted patches is
equal to the number of nodes. All possible patches will be extracted from
the ROI. If ROI is not defined, the entire image will be considered as ROI.
**Parameters:**
``patch_size`` : :py:class:`int`
The size of the square patch to extract from image.
The dimensionality of extracted patches:
``num_channels x patch_size x patch_size``, where ``num_channels`` is
the number of channels in the input image.
``step`` : :py:class:`int`
Defines the size of the cell of the uniform grid to extract patches
from. Patches will be extracted from the locations of the grid nodes.
``use_annotations_flag`` : bool
A flag defining if annotations should be used in the call method.
If ``False``, patches from the whole image will be extracted.
If ``True``, patches from the ROI defined by the annotations will be
extracted,
Default: True.
"""
# ==========================================================================
def __init__(self, patch_size,
step,
use_annotations_flag = True):
super(BlockPatch, self).__init__(patch_size=patch_size,
step=step,
use_annotations_flag=use_annotations_flag)
self.patch_size = patch_size
self.step = step
self.use_annotations_flag = use_annotations_flag
# ==========================================================================
def __call__(self, image, annotations=None):
"""
This class is designed to extract patches from the ROI in the input
image. ROI is defined by the ``annotations`` argument. If
annotations are not given, patches will be extracted from the whole
image.
**Parameters:**
``image`` : 2D to ND :py:class:`numpy.ndarray`
Input image (gray-scale, RGB or multi-spectral).
The expected dimensionality of the image is:
``num_channels x w x h``.
``annotations`` : [] or None
A list containing annotations of bounding box defining ROI.
``annotations[0] = [x_top_left, y_top_left]``
``annotations[1] = [x_bottom_right, y_bottom_right]``
Non-integer annotations are also allowed.
**Returns:**
``patches_array`` : 2D :py:class:`numpy.ndarray`
An array containing flattened patches. To get a list of patches
with original dimensions you can do the following:
``patches_reconstructed = [patch.reshape(n_channels, patch_size, patch_size) for patch in patches_array]``.
"""
if not self.use_annotations_flag:
annotations = None # don't use annotations
"""
Get the ROI:
"""
if annotations is not None:
x1 = np.max([0, np.int(annotations[0][0])])
x2 = np.min([np.int(annotations[1][0]), image.shape[-1]])
y1 = np.max([0, np.int(annotations[0][1])])
y2 = np.min([np.int(annotations[1][1]), image.shape[-2]])
if len(image.shape) == 2: # for gray-scale images
roi = image[y1:y2, x1:x2]
elif len(image.shape) == 3: # for multi-spectral images
roi = image[:,y1:y2, x1:x2]
else: # input data of higher dimensions is not handled
return None
else: # if annotations are not defined
roi = image
"""
Get patches from ROI:
"""
n_blocks_x = np.int((roi.shape[-1] - self.patch_size)/self.step + 1) # Number of full patches horizontally
n_blocks_y = np.int((roi.shape[-2] - self.patch_size)/self.step + 1) # Number of full patches vertically
patch_indices_x = np.arange(n_blocks_x)*self.step # Horizontal starting indices of the patches
patch_indices_y = np.arange(n_blocks_y)*self.step # Vorizontal starting indices of the patches
# Function to get vertical blocks from image, given starting indices of the blocks:
get_vert_block = lambda im, x_vec : [im[:, x:x+self.patch_size] if len(im.shape)==2 else im[:, :, x:x+self.patch_size] for x in x_vec]
# Function to get horizontal blocks from image, given starting indices of the blocks:
get_hor_block = lambda im, y_vec : [im[y:y+self.patch_size, :] if len(im.shape)==2 else im[:, y:y+self.patch_size, :] for y in y_vec]
# Get all the patches from ROI, patches are returned row-wise:
patches = [hor_block for vert_block in get_vert_block(roi, patch_indices_x) for hor_block in get_hor_block(vert_block, patch_indices_y)]
if not patches: # if no patches can be computed
return None
patches_array = np.vstack([np.ndarray.flatten(patch) for patch in patches])
return patches_array
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 22 15:38:28 2018
@author: Olegs Nikisins
"""
# =============================================================================
# Import what is needed here:
from bob.bio.base.preprocessor import Preprocessor
import bob.bio.video
import numpy as np
from bob.bio.video.utils import FrameSelector
from bob.bio.video.preprocessor import Wrapper
import bob.ip.dlib
import cv2
from skimage import morphology
from bob.bio.video.utils.FrameContainer import FrameContainer
# =============================================================================
def get_face_contour_mask(image, filt_size = None, erode_flag = False, crop_flag = False):
"""
This function computes the binary mask for the facial region using
landmarks of the face contour. The following steps are executed:
1. Facial landmarks are detected in the input facial image.
2. Binary mask of the face region is computed using landmarks
corresponding to face contour.
3. Mask can next be cropped, like if it was filtered with a filter
of the size ``(filt_size x filt_size)``.
4. Binary erosion can also be applied to the mask. The disk structuring
element is used for dilation.
The diameter of the disk is ``filt_size``.
**Parameters:**
``image`` : 2D to ND :py:class:`numpy.ndarray`
Input image (gray-scale, RGB or multi-spectral).
The expected dimensionality of the image is:
``num_channels x w x h``.
``filt_size`` : int
If given, a binary mask can be cropped like if it was fileter
with a filter of size ``(filt_size x filt_size)``.
Also, the mask can be eroded with disk of the size ``filt_size``.
``erode_flag`` : bool
If set to ``True``, a binary mask will be eroded with a disk
structuring element of the diameter ``filt_size``.
``crop_flag`` : bool
If set to ``True``, a binary mask will be cropped like if it
was fileter with a filter of size ``(filt_size x filt_size)``.
**Returns:**
``mask`` : 2D :py:class:`numpy.ndarray`
A binary maks of the face region.
"""
# landmarks
detector = bob.ip.dlib.DlibLandmarkExtraction()
points = detector(image)
# mask_rgb = 1-np.transpose(np.any(image==255, axis=0)).astype(np.int)
if points is not None:
temp_mask = np.zeros((image.shape[-2], image.shape[-1]))
# face_contour = points[0:27] # this is a complete contour
# face_contour = points[:16] # this is a lower half-face
face_contour = points[:16] # this is a lower half-face
# go vertical from to the top of image from lower half of the face:
face_contour = [(0, face_contour[0][1])] + face_contour + [(0, face_contour[-1][1])]
hull = cv2.convexHull(np.array(face_contour))
mask = cv2.drawContours(temp_mask, [hull], 0, 1, -1)
# mask = mask*mask_rgb
else:
mask = np.ones((image.shape[-2], image.shape[-1]))
# mask = mask*mask_rgb
if filt_size is not None and erode_flag:
selem = morphology.disk(filt_size)
# invert and dilate the mask to obtain erosion without black border:
mask = morphology.binary_dilation(1-mask, selem=selem)
# invert the mask back
mask = 1-mask
if filt_size is not None and crop_flag:
start = np.int(np.floor(filt_size/2.))
end = np.int(np.ceil(filt_size/2.))
mask = mask[start:mask.shape[0]-end+1, start:mask.shape[1]-end+1]
return mask
class VideoFaceCropAlignBlockPatch(Preprocessor, object):
"""
This class is designed to first detect, crop and align face in all input
channels, and then to extract patches from the ROI in the cropped faces.
The computation flow is the following:
1. Detect, crop and align facial region in all input channels.
2. Concatenate all channels forming a single multi-channel video data.
3. Extract multi-channel patches from the ROI of the multi-channel video data.
4. Vectorize extracted patches.
**Parameters:**
``preprocessors`` : :py:class:`dict`
A dictionary containing preprocessors for all channels. Dictionary
structure is the following:
``{channel_name_1: bob.bio.video.preprocessor.Wrapper, ``
``channel_name_2: bob.bio.video.preprocessor.Wrapper, ...}``
Note: video, not image, preprocessors are expected.
``channel_names`` : [str]
A list of chanenl names. Channels will be processed in this order.
``return_multi_channel_flag`` : bool
If this flag is set to ``True``, a multi-channel video data will be
returned. Otherwise, patches extracted from ROI of the video are
returned.
Default: ``False``.
``block_patch_preprocessor`` : object
An instance of the ``bob.pad.face.preprocessor.BlockPatch`` class,
which is used to extract multi-spectral patches from ROI of the facial
region.
``get_face_contour_mask_dict`` : dict or None
Kwargs for the ``get_face_contour_mask()`` function. See description
of this function for more details. If not ``None``, a binary mask of
the face will be computed. Patches outside of the mask are set to zero.
Default: None
``append_mask_flag`` : bool
If set to ``True``, mask will be flattened and concatenated to output
array of patches. NOTE: mame sure extractor is capable of handling this
case in case you set this flag to ``True``.
Default: ``False``
``feature_extractor`` : object
An instance of the feature extractor to be applied to the patches.
Default is ``None``, meaning that **patches** are returned by the
preprocessor, and no feature extraction is applied.
Defining ``feature_extractor`` instance can be usefull, for example,
when saving the pathes is taking too much memory.
Note, that ``feature_extractor`` should be able to process
FrameContainers.
Default: ``None``
"""
# =========================================================================
def __init__(self, preprocessors,
channel_names,
return_multi_channel_flag = False,
block_patch_preprocessor = None,
get_face_contour_mask_dict = None,
append_mask_flag = False,
feature_extractor = None):
super(VideoFaceCropAlignBlockPatch, self).__init__(preprocessors = preprocessors,
channel_names = channel_names,
return_multi_channel_flag = return_multi_channel_flag,
block_patch_preprocessor = block_patch_preprocessor,
get_face_contour_mask_dict = get_face_contour_mask_dict,
append_mask_flag = append_mask_flag,
feature_extractor = feature_extractor)
self.preprocessors = preprocessors
self.channel_names = channel_names
self.return_multi_channel_flag = return_multi_channel_flag
self.block_patch_preprocessor = block_patch_preprocessor
self.get_face_contour_mask_dict = get_face_contour_mask_dict
self.append_mask_flag = append_mask_flag
self.feature_extractor = feature_extractor
# =========================================================================
def __call__(self, frames, annotations):
"""
This function is designed to first detect, crop and align face in all
input channels, and then to extract patches from the ROI in the
cropped faces.
The computation flow is the following:
1. Detect, crop and align facial region in all input channels.
2. Concatenate all channels forming a single multi-channel video data.
3. Extract multi-channel patches from the ROI of the multi-channel video
data.
4. Vectorize extracted patches.
5. If ``feature_extractor`` is defined, the extractor will be applied
to the patches. By default, no extractor is applied.
**Parameters:**
``frames`` : :py:class:`dict`
A dictionary containing FrameContainers for multiple channels.
``annotations`` : :py:class:`dict`
A dictionary containing annotations for
each frame in the video.
Dictionary structure (non-SWIR channels):
``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``.
Where
``frameN_dict`` contains coordinates of the
face bounding box and landmarks in frame N.
Also, ``annotations`` dictionary is expected to have a key namely
``face_roi``. This key point to annotations defining ROI in the
facial region. ROI is annotated as follows:
``annotations['face_roi'][0] = [x_top_left, y_top_left]``
``annotations['face_roi'][1] = [x_bottom_right, y_bottom_right]``
**Returns:**
FrameContainer
Contains either multi-channel preprocessed data, or patches
extracted from this data. The output is controlled by
``return_multi_channel_flag`` of this class.
"""
# If an input is a FrameContainer convert it to the dictionary with the key from the self.channel_names:
if isinstance(frames, FrameContainer):
frames = dict(zip(self.channel_names, [frames]))
# Preprocess all channels:
preprocessed = [self.preprocessors[channel](frames[channel], annotations) for channel in self.channel_names]
if None in preprocessed:
return None
# convert all channels to arrays:
preprocessed_arrays = [item.as_array() for item in preprocessed]
# Convert arrays of dimensionality 3 to 4 if necessary:
preprocessed_arrays = [np.expand_dims(item, axis=1) if len(item.shape)==3 else item for item in preprocessed_arrays]
# Concatenate streams channel-wise:
preprocessed_arrays = np.concatenate(preprocessed_arrays, axis=1)
# Convert to frame container:
preprocessed_fc = bob.bio.video.FrameContainer() # initialize the FrameContainer
[preprocessed_fc.add(idx, item) for idx, item in enumerate(preprocessed_arrays)]
if self.return_multi_channel_flag:
return preprocessed_fc
if self.block_patch_preprocessor is not None:
frame_selector = FrameSelector(selection_style = "all")
video_block_patch = Wrapper(preprocessor = self.block_patch_preprocessor,
frame_selector = frame_selector)
else:
return None
if 'face_roi' in annotations: # if ROI annotations are given
roi_annotations={}
roi_annotations['0'] = annotations['face_roi']
else: # extract patches from the whole image
roi_annotations = None
patches = video_block_patch(frames = preprocessed_fc, annotations = roi_annotations)
# compute face masks if needed:
if self.get_face_contour_mask_dict is not None:
patches_masked = bob.bio.video.FrameContainer() # initialize the FrameContainer
for idx, (frame, frame_patches) in enumerate(zip(preprocessed_arrays, patches)):
# here we assume that first three slices 0:3 correspond to RGB image:
mask = get_face_contour_mask(image = frame[0:3, :, :], **self.get_face_contour_mask_dict)
if mask is not None:
mask = mask.flatten()
if self.append_mask_flag:
patches_masked.add(idx, np.c_[frame_patches[1], mask])
else:
patches_masked.add(idx, np.transpose(np.transpose(frame_patches[1])*mask))
patches = patches_masked
# Features can be extracted in the preprocessing stage, if feature extractor is given.
# For example, this can be used, when memory needed for saving the patches is too big.
if self.feature_extractor is not None:
features = self.feature_extractor(patches)
return features
return patches
# =========================================================================
def write_data(self, frames, file_name):
"""
Writes the given data (that has been generated using the __call__
function of this class) to file. This method overwrites the write_data()
method of the Preprocessor class.
**Parameters:**
``frames`` :
data returned by the __call__ method of the class.
``file_name`` : :py:class:`str`
name of the file.
"""
self.preprocessors[self.channel_names[0]].write_data(frames, file_name)
# =========================================================================
def read_data(self, file_name):
"""
Reads the preprocessed data from file.
This method overwrites the read_data() method of the Preprocessor class.
**Parameters:**
``file_name`` : :py:class:`str`
name of the file.
**Returns:**
``frames`` : :py:class:`bob.bio.video.FrameContainer`
Frames stored in the frame container.
"""
frames = self.preprocessors[self.channel_names[0]].read_data(file_name)
return frames
from .FaceCropAlign import FaceCropAlign
from .FrameDifference import FrameDifference
from .VideoSparseCoding import VideoSparseCoding
from .VideoFaceCropAlignBlockPatch import VideoFaceCropAlignBlockPatch
from .BlockPatch import BlockPatch
from .LiPulseExtraction import LiPulseExtraction
from .Chrom import Chrom
......@@ -33,5 +35,7 @@ __appropriate__(
Chrom,
SSR,
PPGSecure,
VideoFaceCropAlignBlockPatch,
BlockPatch,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
......@@ -42,6 +42,12 @@ from ..preprocessor.FaceCropAlign import detect_face_landmarks_in_image
from bob.bio.video.preprocessor import Wrapper
from ..preprocessor import VideoFaceCropAlignBlockPatch
from bob.bio.video.utils import FrameSelector
from ..preprocessor import BlockPatch
def test_detect_face_landmarks_in_image_mtcnn():
......@@ -214,6 +220,95 @@ def test_video_face_crop():
assert np.sum(faces[-1][1]) == 1238664
# =============================================================================
def test_video_face_crop_align_block_patch():
"""
Test VideoFaceCropAlignBlockPatch preprocessor.
"""
# =========================================================================
# prepare the test data:
image = load(datafile('test_image.png', 'bob.pad.face.test'))
annotations = None
video, annotations = convert_image_to_video_data(image, annotations, 2)
mc_video = {}
mc_video["color_1"] = video
mc_video["color_2"] = video
mc_video["color_3"] = video
# =========================================================================
# Initialize the VideoFaceCropAlignBlockPatch.
# names of the channels to process:
_channel_names = ['color_1', 'color_2', 'color_3']
# dictionary containing preprocessors for all channels:
_preprocessors = {}
"""
All channels are color, so preprocessors for all of them are identical.
"""
FACE_SIZE = 128 # The size of the resulting face
RGB_OUTPUT_FLAG = False # BW output
USE_FACE_ALIGNMENT = True # use annotations
MAX_IMAGE_SIZE = None # no limiting here
FACE_DETECTION_METHOD = "mtcnn" # use ANNOTATIONS
MIN_FACE_SIZE = 50 # skip small faces
_image_preprocessor = FaceCropAlign(face_size = FACE_SIZE,
rgb_output_flag = RGB_OUTPUT_FLAG,
use_face_alignment = USE_FACE_ALIGNMENT,
max_image_size = MAX_IMAGE_SIZE,
face_detection_method = FACE_DETECTION_METHOD,
min_face_size = MIN_FACE_SIZE)
_frame_selector = FrameSelector(selection_style = "all")
_preprocessor_rgb = Wrapper(preprocessor = _image_preprocessor,
frame_selector = _frame_selector)
_preprocessors[_channel_names[0]] = _preprocessor_rgb
_preprocessors[_channel_names[1]] = _preprocessor_rgb
_preprocessors[_channel_names[2]] = _preprocessor_rgb
"""
The instance of the BlockPatch preprocessor.
"""
PATCH_SIZE = 64
STEP = 32
_block_patch = BlockPatch(patch_size = PATCH_SIZE,
step = STEP,
use_annotations_flag = False)
preprocessor = VideoFaceCropAlignBlockPatch(preprocessors = _preprocessors,
channel_names = _channel_names,
return_multi_channel_flag = True,
block_patch_preprocessor = _block_patch)
# =========================================================================
# pre-process the data and assert the result:
data_preprocessed = preprocessor(frames = mc_video, annotations = annotations)
assert len(data_preprocessed) == 2
assert data_preprocessed[0][1].shape == (3, 128, 128)
assert data_preprocessed[1][1].shape == (3, 128, 128)
preprocessor.return_multi_channel_flag = False # now extract patches
data_preprocessed = preprocessor(frames = mc_video, annotations = annotations)
assert len(data_preprocessed) == 2
assert data_preprocessed[0][1].shape == (9, 12288)
assert data_preprocessed[1][1].shape == (9, 12288)
#==============================================================================
def test_frame_difference():
"""
......@@ -388,7 +483,7 @@ def test_preprocessor_LiPulseExtraction():
image = load(datafile('test_image.png', 'bob.pad.face.test'))
annotations = {'topleft': (95, 155), 'bottomright': (215, 265)}
video, annotations = convert_image_to_video_data(image, annotations, 100)
preprocessor = LiPulseExtraction(debug=False)
pulse = preprocessor(video, annotations)
assert pulse.shape == (100, 3)
......@@ -401,7 +496,7 @@ def test_preprocessor_Chrom():
image = load(datafile('test_image.png', 'bob.pad.face.test'))
annotations = {'topleft': (95, 155), 'bottomright': (215, 265)}
video, annotations = convert_image_to_video_data(image, annotations, 100)
preprocessor = Chrom(debug=False)
pulse = preprocessor(video, annotations)
assert pulse.shape[0] == 100
......@@ -414,7 +509,7 @@ def test_preprocessor_PPGSecure():
image = load(datafile('test_image.png', 'bob.pad.face.test'))
annotations = {'topleft': (456, 212), 'bottomright': (770, 500)}
video, annotations = convert_image_to_video_data(image, annotations, 100)
preprocessor = PPGPreprocessor(debug=False)
pulse = preprocessor(video, annotations)
assert pulse.shape == (100, 5)
......@@ -423,29 +518,29 @@ def test_preprocessor_PPGSecure():
def test_preprocessor_SSR():
""" Test the pulse extraction using SSR algorithm.
"""
image = load(datafile('test_image.png', 'bob.pad.face.test'))
annotations = {'topleft': (95, 155), 'bottomright': (215, 265)}
video, annotations = convert_image_to_video_data(image, annotations, 100)
preprocessor = SSR(debug=False)
pulse = preprocessor(video, annotations)
assert pulse.shape[0] == 100
def test_extractor_LTSS():
""" Test Long Term Spectrum Statistics (LTSS) Feature Extractor
""" Test Long Term Spectrum Statistics (LTSS) Feature Extractor
"""
# "pulse" in 3 color channels
data = np.random.random((200, 3))
extractor = LTSS(concat=True)
features = extractor(data)
# n = number of FFT coefficients (default is 64)
# (n/2 + 1) * 2 (mean and std) * 3 (colors channels)
assert features.shape[0] == 33*2*3
extractor = LTSS(concat=False)
features = extractor(data)
# only one "channel" is considered
......@@ -453,23 +548,23 @@ def test_extractor_LTSS():
def test_extractor_LiSpectralFeatures():
""" Test Li's ICPR 2016 Spectral Feature Extractor
""" Test Li's ICPR 2016 Spectral Feature Extractor
"""
# "pulse" in 3 color channels
data = np.random.random((200, 3))
extractor = LiSpectralFeatures()
features = extractor(data)
assert features.shape[0] == 6
assert features.shape[0] == 6
def test_extractor_PPGSecure():
""" Test PPGSecure Spectral Feature Extractor
""" Test PPGSecure Spectral Feature Extractor
"""
# 5 "pulses"
# 5 "pulses"
data = np.random.random((200, 5))
extractor = PPGExtractor()
features = extractor(data)
# n = number of FFT coefficients (default is 32)
......
......@@ -42,6 +42,8 @@ requirements:
- six
- numpy >=1.11
- scikit-learn
- scikit-image
- opencv
test:
imports:
......
......@@ -17,3 +17,4 @@ bob.learn.libsvm
bob.learn.linear
scikit-learn
bob.rppg.base >= 2.0.0
scikit-image
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment