diff --git a/bob/paper/nir_patch_pooling/config/mlfp.py b/bob/paper/nir_patch_pooling/config/mlfp.py index 79b1c84b388460ee19459e2eb98c43cb1f4060ce..bcd0e2aa5483ab41b903fcf87e5c488abb5b78ea 100644 --- a/bob/paper/nir_patch_pooling/config/mlfp.py +++ b/bob/paper/nir_patch_pooling/config/mlfp.py @@ -1,20 +1,23 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- """ -Database configuration for NIR data from MLFP database for detection of -mask-based presentation attacks + Database configuration for NIR data from MLFP database for detection of + mask-based presentation attacks. """ from bob.paper.nir_patch_pooling.database import MLFPDatabase from bob.extension import rc +PROTOCOL = "cv1" + +protocol = PROTOCOL + database = MLFPDatabase( - original_directory=rc["bob.db.mlfp.directory"], - annotation_directory=rc["bob.db.mlfp.annotation_directory"], - ) + original_directory=rc["bob.db.mlfp.directory"], + annotation_directory=rc["bob.db.mlfp.annotation_directory"], + protocol=PROTOCOL, + ) groups = ["train", "dev"] #------------------------------------------------------------------------------ - diff --git a/bob/paper/nir_patch_pooling/config/patch_pooling_lr.py b/bob/paper/nir_patch_pooling/config/patch_pooling_lr.py index d8f89b861231b3ef9ec9762f29834d074e015f2e..1ce053330241c2cb69db353310494022549090c1 100644 --- a/bob/paper/nir_patch_pooling/config/patch_pooling_lr.py +++ b/bob/paper/nir_patch_pooling/config/patch_pooling_lr.py @@ -1,15 +1,14 @@ # -*- coding: utf-8 -*- """ -Configuration file to run PatchPooling + LR classifier for Face PAD -toward detection of mask attacks in NIR. - + Configuration file to run PatchPooling + LR classifier for Face PAD + toward detection of mask attacks in NIR. """ #------------------------------------------------------------------------------ +import os sub_directory = "pooling_lr" - # define preprocessor: from bob.pad.face.preprocessor import FaceCropAlign @@ -22,20 +21,24 @@ FACE_SIZE = 128 RGB_OUTPUT_FLAG = False USE_FACE_ALIGNMENT = True ALIGNMENT_TYPE = "lightcnn" +MIN_FACE_SIZE = 50 NORMALIZATION_FUNCTION = _norm_func -NORMALIZATION_FUNCTION_KWARGS = {'n_sigma':4.0, 'norm_method':'MAD'} +NORMALIZATION_FUNCTION_KWARGS = {'n_sigma':3.0, 'norm_method':'MAD'} -_image_preprocessor = FaceCropAlign(face_size=FACE_SIZE, - rgb_output_flag=RGB_OUTPUT_FLAG, - use_face_alignment=USE_FACE_ALIGNMENT, - alignment_type=ALIGNMENT_TYPE, - normalization_function = NORMALIZATION_FUNCTION, - normalization_function_kwargs = NORMALIZATION_FUNCTION_KWARGS) +_image_preprocessor = FaceCropAlign( + face_size=FACE_SIZE, + rgb_output_flag=RGB_OUTPUT_FLAG, + use_face_alignment=USE_FACE_ALIGNMENT, + min_face_size=MIN_FACE_SIZE, + alignment_type=ALIGNMENT_TYPE, + #normalization_function = NORMALIZATION_FUNCTION, + #normalization_function_kwargs = NORMALIZATION_FUNCTION_KWARGS, + ) -_frame_selector = FrameSelector(selection_style = "all") +_frame_selector = FrameSelector(selection_style="all") -preprocessor = Wrapper(preprocessor = _image_preprocessor, - frame_selector = _frame_selector) +preprocessor = Wrapper(preprocessor=_image_preprocessor,\ + frame_selector=_frame_selector) #------------------------------------------------------------------------------ @@ -43,10 +46,9 @@ preprocessor = Wrapper(preprocessor = _image_preprocessor, from bob.paper.nir_patch_pooling.extractor import PatchPoolingCNN from bob.bio.video.extractor import Wrapper -from bob.extension import rc -import os +from bob.extension import rc as _rc -_model_directory = rc["lightcnn9.model_directory"] +_model_directory = _rc["lightcnn9.model_directory"] _model_name = "LightCNN_9Layers_checkpoint.pth.tar" _model_file = os.path.join(_model_directory, _model_name) @@ -55,8 +57,8 @@ if not os.path.exists(_model_file): the download instructions from README".format(_model_directory)) exit(0) -extractor = Wrapper(PatchPoolingCNN(model_file=_model_file), - frame_selector = _frame_selector) +extractor = Wrapper(PatchPoolingCNN(model_file=_model_file),\ + frame_selector=_frame_selector) #------------------------------------------------------------------------------ @@ -69,5 +71,3 @@ C = 1.0 algorithm = LogRegr(C=C, frame_level_scores_flag=True) #------------------------------------------------------------------------------ - - diff --git a/bob/paper/nir_patch_pooling/config/run.sh b/bob/paper/nir_patch_pooling/config/run.sh deleted file mode 100755 index 9659f6c7ce32068e7a5e85a45c4f29ce6f8bc45a..0000000000000000000000000000000000000000 --- a/bob/paper/nir_patch_pooling/config/run.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -set -ex - -echo "Run experiments from ICIP 2020 paper" -echo "-----------------------------------" - -expt="wmca" - -base_directory="/idiap/temp/kkotwal/test_icip/expt" - -# expt: WMCA -if [[ "${expt}" == "wmca" ]]; then - output_directory="${base_directory}/wmca" - db_config="wmca_mask.py" - protocol="grandtest" - -elif [[ "${expt}" == "mlfp" ]]; then - output_directory="${base_directory}/mlfp" - db_config="mlfp.py" - protocol="cv1" - -else - echo "Unknown expt" - exit -1 -fi - - -cmd="spoof.py ${db_config} patch_pooling_lr.py -s ${output_directory} -vv --grid idiap" -#echo ${cmd} -$cmd - -cmd="bob pad metrics -v -e ${output_directory}/${protocol}/scores/scores-{dev,eval}" -echo ${cmd} -#$cmd - -#-------------- diff --git a/bob/paper/nir_patch_pooling/config/wmca_mask.py b/bob/paper/nir_patch_pooling/config/wmca_mask.py index 6c0d475bbca1e4dcbd6a34feea2abb2c1e7f2dbe..2c86f0d9f02452939aa1d0930496cca60c853561 100644 --- a/bob/paper/nir_patch_pooling/config/wmca_mask.py +++ b/bob/paper/nir_patch_pooling/config/wmca_mask.py @@ -1,20 +1,19 @@ - +# """ -Configuration for PAD experiments using NIR channel of WMCA. -This specific protocol/database selects only *mask* attacks and bona-fide -presentations. + Configuration for PAD experiments using NIR channel of WMCA. + This specific protocol/database selects only *mask* attacks and bona-fide + presentations. """ from bob.paper.nir_patch_pooling.database import WMCAMask from bob.extension import rc - PROTOCOL = "grandtest" database = WMCAMask( - protocol = PROTOCOL, - original_directory = rc["bob.db.wmca_mask.directory"], - annotation_directory = rc["bob.db.wmca_mask.annotation_directory"], + protocol=PROTOCOL, + original_directory=rc["bob.db.wmca_mask.directory"], + annotation_directory=rc["bob.db.wmca_mask.annotation_directory"], ) groups = ["train", "dev", "eval"] @@ -22,5 +21,3 @@ groups = ["train", "dev", "eval"] protocol = PROTOCOL #------------------------------------------------------------------------------ - - diff --git a/bob/paper/nir_patch_pooling/database/mlfp.py b/bob/paper/nir_patch_pooling/database/mlfp.py index 56cf92523458ac08e62590aa0e27d54e27bdee82..1b0e006fe2d3bcb9b450cfb37ed2188d771a14c0 100644 --- a/bob/paper/nir_patch_pooling/database/mlfp.py +++ b/bob/paper/nir_patch_pooling/database/mlfp.py @@ -1,97 +1,83 @@ # -*- coding: utf-8 -*- """ -Implementation of database interface for MLFP dataset. -This protocol caters to only NIR subset of MLFP dataset. -@author: Ketan Kotwal - + Implementation of database interface for MLFP dataset. + This protocol caters to only NIR subset of MLFP dataset. """ # Imports +import json +import os +import logging +import pkg_resources + from bob.pad.base.database import FileListPadDatabase from bob.pad.face.database import VideoPadFile from bob.bio.video import FrameSelector, FrameContainer from bob.io.base import HDF5File -import json -import numpy as np -import os, h5py -import pkg_resources - -import logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) #------------------------------------------------------------------------------ class File(VideoPadFile): - + """Defines a File class for MLFP objects.""" + def __init__(self, attack_type, client_id, path, file_id=None): + """Initialize the file object with details.""" super(File, self).__init__(attack_type, client_id, path, file_id) #------------------------------------------------------------------------------ - def load(self, directory=None, extension=None, - frame_selector=FrameSelector(selection_style='all')): - - path = self.make_path(directory=directory, extension=extension) + def load(self, directory=None, extension=None,\ + frame_selector=FrameSelector(selection_style='all')): + """Load the file into FrameContainer.""" + + path = self.make_path(directory=directory, extension=extension) with HDF5File(path) as f: return FrameContainer(hdf5=f) - - ''' - hdf_file = h5py.File(path, "r") - - fc = FrameContainer() - frame_keys = list(hdf_file.keys()) - frame_keys.remove("FrameIndexes") - - for idx, frame_data in enumerate(frame_keys): - frame = hdf_file[frame_data]["array"].value - fc.add(idx, frame, None) - - hdf_file.close() - - return fc - ''' + #------------------------------------------------------------------------------ class MLFPDatabase(FileListPadDatabase): - """ - A high level implementation of the Database class for NIR content of the - MLFP dataset. - """ + """A high level implementation of the Database class for NIR content of the + MLFP dataset.""" def __init__( self, - name = "MLFP NIR", - original_directory = None, - original_extension = ".hdf5", - annotation_directory = None, - pad_file_class = File, + name="MLFP NIR", + original_directory=None, + original_extension=".hdf5", + annotation_directory=None, + protocol="cv1", + pad_file_class=File, **kwargs ): + """Instantiate the MLFP database.""" - - filelists_directory = pkg_resources.resource_filename( __name__, "/lists/mlfp/") + filelists_directory = pkg_resources.resource_filename(__name__, "/lists/mlfp/") self.filelists_directory = filelists_directory + self.protocol = protocol super(MLFPDatabase, self).__init__( - filelists_directory = filelists_directory, - name = name, - original_directory = original_directory, - original_extension = original_extension, - annotation_directory = annotation_directory, - pad_file_class = pad_file_class, - **kwargs, + filelists_directory=filelists_directory, + name=name, + original_directory=original_directory, + original_extension=original_extension, + annotation_directory=annotation_directory, + protocol=protocol, + pad_file_class=pad_file_class, + **kwargs ) self.annotation_directory = annotation_directory - logger.info("Dataset: {}".format(self.name)) + logger.info("Database: {}".format(self.name)) logger.info("Original directory: {}; Annotation directory: {}"\ .format(self.original_directory, self.annotation_directory)) @@ -99,32 +85,29 @@ class MLFPDatabase(FileListPadDatabase): def annotations(self, f): - """ - Returns annotations for a given file object ``f``. - Annotations must be precomputed. - """ + """Return annotations for a given file object ``f``. + Annotations must be precomputed.""" if self.annotation_directory is None: raise ValueError("Annotation Directory is not provided.") - file_path = os.path.join(self.annotation_directory, f.path + ".json") + file_path = os.path.join(self.annotation_directory, f.path+".json") - if os.path.isfile(file_path): + if os.path.isfile(file_path): with open(file_path, "r") as json_file: annotations = json.load(json_file) - + if not annotations: # if dictionary is empty logger.warning("Empty annotations for %s", f.path) return None return annotations - - else: + + else: logger.warning("Annotation file for %s does not exist.\ (Overall path: %s)", f.path, file_path) return None #------------------------------------------------------------------------------ - diff --git a/bob/paper/nir_patch_pooling/database/wmca_mask.py b/bob/paper/nir_patch_pooling/database/wmca_mask.py index 3824bcf87a1475479b1b41e397795a18ecc28f6e..d2c5d336453c40e09eee689fa4e75a4dee9f0c15 100644 --- a/bob/paper/nir_patch_pooling/database/wmca_mask.py +++ b/bob/paper/nir_patch_pooling/database/wmca_mask.py @@ -1,94 +1,89 @@ # -*- coding: utf-8 -*- -""" -Implementation of dataset interface of WMCA Masks for PAD. -@author: Ketan Kotwal - -""" +"""Implementation of dataset interface of WMCA Masks for PAD.""" # Imports -from bob.pad.base.database import FileListPadDatabase -from bob.pad.face.database.batl import BatlPadFile -from bob.db.batl.models import VideoFile -from bob.extension import rc -from bob.bio.video.utils import FrameSelector - import os import json - +import logging import pkg_resources -import logging +from bob.pad.base.database import FileListPadDatabase +from bob.pad.face.database.batl import BatlPadFile +from bob.db.batl.models import VideoFile + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) #------------------------------------------------------------------------------ - class File(VideoFile): + """File class for WMCA objects.""" + def __init__(self, path, client_id, session_id, presenter_id, type_id, pai_id): + """Initialize the file objects with category and type details.""" super(File, self).__init__( - path = path, - client_id = client_id, - session_id = session_id, - presenter_id = presenter_id, - type_id = type_id, - pai_id = pai_id + path=path, + client_id=client_id, + session_id=session_id, + presenter_id=presenter_id, + type_id=type_id, + pai_id=pai_id ) self.id = path #------------------------------------------------------------------------------ - + class WMCAMask(FileListPadDatabase): - """ - A high level implementation of the Database class for the WMCA Mask PAD - database. - """ + """A high level implementation of the Database class for the WMCA Mask PAD + database.""" def __init__( self, - name = "WMCAMask", - original_directory = None, - original_extension = ".h5", - protocol = "grandtest", - annotation_directory = None, - pad_file_class = BatlPadFile, - low_level_pad_file_class = File, - **kwargs, - ): - + name="WMCAMask", + original_directory=None, + original_extension=".h5", + protocol="grandtest", + annotation_directory=None, + pad_file_class=BatlPadFile, + low_level_pad_file_class=File, + **kwargs): """ - **Parameters:** + Parameters + ----------- - ``original_directory`` : str or None + original_directory : str or None original directory refers to the location of WMCA (or WMCA Mask) - parent directory + parent directory. - ``original_extension`` : str or None - extension of original data + original_extension : str or None + extension of original data. - ``protocol`` : str + protocol : str (default: ``grandtest``) The protocol for which the clients should be retrieved. - Default: 'grandtest-nir-50'. + + annotation_directory : str or None + annotation directory refers to the location of precomputed + annotations. """ - filelists_directory = pkg_resources.resource_filename( __name__, "lists/wmca_mask/") + filelists_directory = pkg_resources.resource_filename(__name__, "lists/wmca_mask/") self.filelists_directory = filelists_directory # init the parent class using super. super(WMCAMask, self).__init__( - filelists_directory = filelists_directory, - name = name, - protocol = protocol, - original_directory = original_directory, - original_extension = original_extension, - pad_file_class = low_level_pad_file_class, - annotation_directory = annotation_directory, + filelists_directory=filelists_directory, + name=name, + protocol=protocol, + original_directory=original_directory, + original_extension=original_extension, + pad_file_class=low_level_pad_file_class, + annotation_directory=annotation_directory, **kwargs) self.low_level_pad_file_class = low_level_pad_file_class @@ -105,6 +100,7 @@ class WMCAMask(FileListPadDatabase): # override the _make_pad function in bob.pad.base def _make_pad(self, files): + """Convert the file to PAD format.""" video_pad_files = [] @@ -117,30 +113,31 @@ class WMCAMask(FileListPadDatabase): presenter_id = int(info.split("_")[2]) type_id = int(info.split("_")[3]) pai_id = int(info.split("_")[4]) - - video_file = self.low_level_pad_file_class(path = path, - client_id = client_id, session_id = session_id, - presenter_id = presenter_id, type_id = type_id, - pai_id = pai_id) - + + video_file = self.low_level_pad_file_class(path=path,\ + client_id=client_id, session_id=session_id,\ + presenter_id=presenter_id, type_id=type_id,\ + pai_id=pai_id) + video_pad_files.append(video_file) return video_pad_files #------------------------------------------------------------------------------ - def objects(self, groups=None, protocol=None, purposes=None, - model_ids=None, **kwargs): + def objects(self, groups=None, protocol=None, purposes=None,\ + model_ids=None, **kwargs): + """Retrieve file objects as per requirements.""" # default the parameters if the values are not provided if protocol is None: - protocol = self.protocol + protocol = self.protocol if groups is None: groups = ["train", "dev", "eval"] # obtain the file list using the parent class's functionality - files = super(WMCAMask, self).objects(groups=groups, protocol=protocol, + files = super(WMCAMask, self).objects(groups=groups, protocol=protocol,\ purposes=purposes, model_ids=model_ids, **kwargs) # create objects for each file where the class is BATLPadFile @@ -149,53 +146,36 @@ class WMCAMask(FileListPadDatabase): stream_type = "infrared" # nir => infrared num_frames = 50 - files = [self.pad_file_class(f=f, stream_type=stream_type, + files = [self.pad_file_class(f=f, stream_type=stream_type,\ max_frames=num_frames) for f in files] - return files + return files #------------------------------------------------------------------------------ def annotations(self, f): """ - Returns annotations for a given file object ``f``. - Annotations must be precomputed. - - **Parameters:** - - ``f`` : :py:class:`object` - An instance of file object defined above. - - **Returns:** - - ``annotations`` : :py:class:`dict` - A dictionary containing annotations for - each frame in the video. - Dictionary structure: - ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``. - Where - ``frameN_dict`` contains coordinates of the - face bounding box and landmarks in frame N. - """ + Return annotations for a given file object ``f``. + Annotations must be precomputed.""" if self.annotation_directory is None: raise ValueError("Annotation Directory is not provided.") - file_path = os.path.join(self.annotation_directory, f.f.path + ".json") + file_path = os.path.join(self.annotation_directory, f.f.path+".json") - if os.path.isfile(file_path): + if os.path.isfile(file_path): with open(file_path, "r") as json_file: annotations = json.load(json_file) - + if not annotations: # if dictionary is empty logger.warning("Empty annotations for %s", f.path) return None return annotations - - else: - logger.warning("Annotation file for %s does not exist. (Overall path: %s)", f.path, file_path) + + else: + logger.warning("Annotation file for %s does not exist", f.path) return None #------------------------------------------------------------------------------ diff --git a/bob/paper/nir_patch_pooling/extractor/patch_pooling_cnn.py b/bob/paper/nir_patch_pooling/extractor/patch_pooling_cnn.py index 33e807d19c227ca4ddcf0e7e8e5eccb7aba2ec5b..0a5ac37fde2d7ae7f5c33242910fe694e83fde32 100644 --- a/bob/paper/nir_patch_pooling/extractor/patch_pooling_cnn.py +++ b/bob/paper/nir_patch_pooling/extractor/patch_pooling_cnn.py @@ -1,93 +1,70 @@ # -*- coding: utf-8 -*- -""" -Implementation of Patch Pooling CNN feature extractor with LightCNN-9 backbone -@author: Ketan Kotwal -""" +""" Implementation of Patch Pooling CNN feature extractor with LightCNN-9 + backbone.""" # Imports -import numpy as np import torch +import logging +import numpy as np +from collections import OrderedDict + from PIL import Image import torch.nn.functional as F from torch.autograd import Variable import torchvision.transforms as transforms + from bob.paper.nir_patch_pooling.extractor import LightCNN9 from bob.bio.base.extractor import Extractor -import logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) #------------------------------------------------------------------------------ class PatchPoolingCNN(Extractor): + """Class to extract `patch pooled` features from the final convolutional + layer of LightCNN9 (MFM5 layer).""" - """ - The class implements extraction of patch pooled features from the final - convolutional layer of LightCNN9 (MFM5 layer). - """ - def __init__(self, model_file=None, patch_stride=4, num_classes=79077): - + """Initialize with default parameters.""" + Extractor.__init__(self, skip_extractor_training=True) self.network = LightCNN9Patch() self.patch_stride = patch_stride - - # load the model into network. + + # load the model into network. cp = torch.load(model_file, map_location="cpu") cp = cp["state_dict"] - + # checked if pre-trained model was saved using nn.DataParallel saved_with_nn_parallel = any("module" in x for x in list(cp.keys())) - ''' - saved_with_data_parallel = False - for k, v in cp.items(): - if("module" in k): - saved_with_data_parallel = True - break - ''' - # if DataParallel format, remove module term if(saved_with_nn_parallel): - from collections import OrderedDict new_state_dict = OrderedDict() - + for k, v in cp.items(): k_new = k[7:] new_state_dict[k_new] = v - + self.network.load_state_dict(new_state_dict) else: self.network.load_state_dict(cp) - + self.network.eval() # image pre-processing - self.data_transform = transforms.Compose([transforms.Resize(size=128), + self.data_transform = transforms.Compose([transforms.Resize(size=128),\ transforms.ToTensor()]) #------------------------------------------------------------------------------ def __call__(self, image): + """ Extract features from an image.""" - """ Extract features from an image - - Parameters - ---------- - image : 2D :py:class:`numpy.ndarray` (floats) - The grayscale image to extract the features from. Its size must be 128x128 - - Returns - ------- - feature : :py:class:`numpy.ndarray` (floats) - The extracted features as a 1d array of size 256 - - """ - # torchvision.transforms expect a numpy array of size HxWxC pil_image = Image.fromarray(image.astype(np.uint8)) input_image = self.data_transform(pil_image) @@ -96,7 +73,7 @@ class PatchPoolingCNN(Extractor): input_image = Variable(input_image) # obtain the features (to be pooled) from forward pass of network - _ , features = self.network(input_image) + _, features = self.network(input_image) # pool features through patch-level processing features = self.conv_to_patch(features) @@ -107,6 +84,7 @@ class PatchPoolingCNN(Extractor): #------------------------------------------------------------------------------ def conv_to_patch(self, features): + """Pooling of features over local patches.""" stride = self.patch_stride pooled_features = torch.zeros(1, stride*stride*features.shape[1]) @@ -122,31 +100,26 @@ class PatchPoolingCNN(Extractor): # normalize the vector of pooled features pooled_features = pooled_features/stride/stride - - return pooled_features -#------------------------------------------------------------------------------ + return pooled_features #------------------------------------------------------------------------------ -# class LightCNN9Patch: it inherits the LightCNN-9 class from bob, -# and returns the last conv layer features and embeddings. class LightCNN9Patch(LightCNN9): + """Implementation of Patch-formatted LightCNN0.""" def __init__(self): - + # do not change the init - super(LightCNN9Patch, self).__init__() + super(LightCNN9Patch, self).__init__() #------------------------------------------------------------------------------ def forward(self, x): - """ - Propagate data through the network, and return the output of final - conv layer. - """ + """Propagate data through the network, and return the output of final + conv layer.""" conv_out = self.features(x) x = conv_out.view(conv_out.size(0), -1) @@ -156,6 +129,3 @@ class LightCNN9Patch(LightCNN9): return out, conv_out #------------------------------------------------------------------------------ - - - diff --git a/bob/paper/nir_patch_pooling/script/annotate_database.py b/bob/paper/nir_patch_pooling/script/annotate_database.py index 013d3be8d8932206ef8a334e16259be9c14145e6..bab3cd3136ca366abf985630afd9b51814c7a790 100755 --- a/bob/paper/nir_patch_pooling/script/annotate_database.py +++ b/bob/paper/nir_patch_pooling/script/annotate_database.py @@ -1,142 +1,190 @@ -# -# Script to generate face annotations for NIR data -# @author: Ketan Kotwal -# -#------------------------------------------------------------------------------ +""" + Desc: Script to generate face annotations for NIR data + Author: Ketan Kotwal +""" +#------------------------------------------------------------------------------ # imports +import json +import sys +import os +import numpy as np from bob.pad.face.preprocessor.FaceCropAlign import detect_face_landmarks_in_image -from bob.bio.video import FrameContainer from bob.io.base import create_directories_safe -import numpy as np -import json -import os, sys -from bob.paper.nir_patch_pooling.config.wmca_mask import database as d1 +from bob.ip.color import rgb_to_gray +from bob.ip.facelandmarks import detect_landmarks +from bob.paper.nir_patch_pooling.config.wmca_mask import database as d_wmca +from bob.paper.nir_patch_pooling.config.mlfp import database as d_mlfp + +#------------------------------------------------------------------------------ + +def bob_annotate(image): + """Find face landmarks using bob's routine.""" + + image = rgb_to_gray(image) + lm_list = detect_landmarks(image, 1) + lm = lm_list[0].landmarks + bounding_box = lm_list[0].bounding_box + annotations = {} + + if lm is not None: + lm = np.array(lm) + lm = np.vstack((lm[:, 1], lm[:, 0])).T + right_eye, left_eye = _get_eye_pos(lm) + + points = [] + for i in range(lm.shape[0]): + points.append((int(lm[i, 0]), int(lm[i, 1]))) + + annotations['topleft'] = bounding_box.topleft + annotations['bottomright'] = bounding_box.bottomright + annotations['landmarks'] = points + annotations['leye'] = left_eye + annotations['reye'] = right_eye + + return annotations + #------------------------------------------------------------------------------ -class Annotator: +def _get_eye_pos(lm): + """Return the locations of left and right eyes.""" + + rr_c = (lm[36, :]+lm[39, :])/2.0 + ll_c = (lm[42, :]+lm[45, :])/2.0 - def __init__(self): - pass + right_eye = (int(rr_c[1]), int(rr_c[0])) + left_eye = (int(ll_c[1]), int(ll_c[0])) + + return right_eye, left_eye #------------------------------------------------------------------------------ - def find_annotations(self, image): +def find_annotations(image): + """Find face landmarks for image using variety of methods.""" + + # if image is grayscale, convert to 3 channel for face detection + if len(image.shape) == 2: + image = np.repeat(image[:, :, np.newaxis], 3, axis=2) + image = np.transpose(image, (2, 0, 1)) - # if image is grayscale, convert to 3 channel for face detection - if len(image.shape) == 2: - image = np.repeat(image[:, :, np.newaxis], 3, axis=2) - image = np.transpose(image, (2,0,1)) + # find annotations using MTCNN + frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn") + if frame_annotations: + return frame_annotations - # find annotations using MTCNN - frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn") - if frame_annotations: - #print(" --> Found using MTCNN") - return frame_annotations + # else, find annotations using dlib + frame_annotations = detect_face_landmarks_in_image(image, method="dlib") + if frame_annotations: + return frame_annotations - # else, find annotations using dlib - frame_annotations = detect_face_landmarks_in_image(image, method="dlib") - if frame_annotations: - print(" --> Found using dlib") - return frame_annotations + # else, find annotations using landmark detection routines from bob + #frame_annotations = bob_annotate(image.astype(np.uint8)) + #if frame_annotations: + # return frame_annotations - # else, return empty dictionary with warning - print(" --> Could not find annotations") - return None + # else, return empty dictionary with warning + print(" --> Could not find annotations") + return None #------------------------------------------------------------------------------ - def normalize_image(self, image, n_sigma=3.0): +def normalize_image(image, n_sigma=3.0, use_central_region=False): + """Normalize image using MAD.""" - assert(len(image.shape)==2) - - image = image.astype(np.float64) + assert(len(image.shape) == 2) + image = image.astype(np.float64) - # use central region of image to determine parameters for normalization + # use central region of image to determine parameters for normalization + if use_central_region: h, w = image.shape region = image[int(0.25*h):int(0.75*h), int(0.25*w):int(0.75*w)] - - # calculate median values - med = np.median(region) - mad = np.median(np.abs(region - med)) - image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 + else: + region = image + + # calculate median values + med = np.median(region) + mad = np.median(np.abs(region - med)) + + image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 + + # Clamping to 0-255 + image_n = np.maximum(image_n, 0) + image_n = np.minimum(image_n, 255) + + image_n = image_n.astype(np.uint8) + + return image_n - # Clamping to 0-255 - image_n = np.maximum(image_n, 0) - image_n = np.minimum(image_n, 255) +#------------------------------------------------------------------------------ + +class Annotator(object): + """A class to generate annotations for bob databases.""" - image_n = image_n.astype(np.uint8) + def __init__(self, database, annotation_directory, n_sigma=3.0,\ + use_central_region=False): + """Instantiate the Annotator class with database object (compatible to + bob), annotation directory, and parameters for image normalization.""" + + self.database = database + self.annotation_directory = annotation_directory + self.n_sigma = n_sigma + self.use_central_region = use_central_region - return image_n + if not os.path.exists(self.annotation_directory): + os.makedirs(self.annotation_directory) + print("Annotation directory created at:{}"\ + .format(self.annotation_directory)) #------------------------------------------------------------------------------ - def process(self, fc): - - annotations = {} + def process_frame(self, fc): + """Process the FrameContainer for annotations.""" + + annotations = {} prev_valid_index = None - + for index, frame, _ in fc: - image = self.normalize_image(frame) + image = normalize_image(frame, n_sigma=self.n_sigma,\ + use_central_region=self.use_central_region) try: - frame_annotations = self.find_annotations(image) + frame_annotations = find_annotations(image) except Exception as e: - print("\tException: {}".format(e)) + print("Exception: {}".format(e)) # copy annotations of previous frame if (prev_valid_index is not None): - frame_annotations = annotations[prev_valid_index] - print("\tCopying annotations of valid previous frame") - + print(" --> Copying annotations of valid previous frame") + else: - frame_annotations = None - print("\tSetting empty annotations") + print(" --> Setting empty annotations") if frame_annotations is not None: annotations[str(index)] = frame_annotations prev_valid_index = str(index) - - return annotations - -#------------------------------------------------------------------------------ - -#------------------------------------------------------------------------------ - -class AnnotationGenerator: - - def __init__(self, database = None, annotation_directory = None): - - self.database = database - - self.annotation_directory = annotation_directory - - if not os.path.exists(self.annotation_directory): - os.makedirs(self.annotation_directory) - print("Annotation directory created at: {}"\ - .format(self.annotation_directory)) - self.annotator = Annotator() + return annotations #------------------------------------------------------------------------------ - def process_video(self, filename): + def process_video(self, f): + """Processe a PAD video for annotations.""" # load the video into framecontainer - fc = filename.load(directory = self.database.original_directory,\ - extension = self.database.original_extension) + fc = f.load(directory=self.database.original_directory, extension=\ + self.database.original_extension) # obtain the annotations - annotations = self.annotator.process(fc) + annotations = self.process_frame(fc) # save the annotations as json. - json_filepath = os.path.join(self.annotation_directory, filename.path + ".json") - + json_filepath = os.path.join(self.annotation_directory, f.path + ".json") + create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False) with open(json_filepath, "w+") as json_file: json_file.write(json.dumps(annotations)) @@ -146,9 +194,10 @@ class AnnotationGenerator: #------------------------------------------------------------------------------ def run(self, job_index): + """Execute the annotation script over the entire dataset.""" # collect the files to be processed - self.filelist = self.database.objects() + self.filelist = self.database.objects(protocol=self.database.protocol) total = len(self.filelist) print("Files to be annotated: {}".format(total)) @@ -162,17 +211,18 @@ class AnnotationGenerator: start_index = 0 end_index = total - print("Processing Job Index: {} (Files: {} to {})".format(job_index, start_index, end_index)) + print("Processing Job Index: {} (Files: {} to {})".format(job_index,\ + start_index, end_index)) # process each video in the given range for idx, f in enumerate(self.filelist[start_index:end_index]): print("[{:03d}/{:03d}] Sample: {}".format(idx+1, total, f.path)) - json_filepath = os.path.join(self.annotation_directory, f.path + ".json") - - if not os.path.exists(json_filepath): + json_path = os.path.join(self.annotation_directory, f.path + ".json") + + if not os.path.exists(json_path): self.process_video(f) - + else: print("Annotations exist: {}. Skipping".format(f.path)) @@ -180,17 +230,47 @@ class AnnotationGenerator: #------------------------------------------------------------------------------ - def main(): + """A script to annotate the bob dataset. + + This scripts generates the annotations for frames in each video, stores + the annotations in json file (one per video) in the annotation_directory. + The number of frames selected per video can be set through the dataset + interface. + + Parameters + ---------- + database_name : str + The name of database. (``wmca`` or ``mlfp``) + annotation_directory : str + The desired location of annotation directory. + n_sigma : float + The sigma value for MAD normalization (default: 3.0) + use_central_region : bool + Set to True if face should be searched only within the central half + region of the frame. + + Returns + ------- + None + + """ if len(sys.argv) < 3: - print("Usage: {} <database> <annotation-directory> [<job-index>]"\ + print("Usage: {} <database_name> <annotation-directory> [<job-index>]"\ .format(__name__)) exit(0) - database = d1 # sys.argv[1] + if sys.argv[1] == "wmca": + database = d_wmca + elif sys.argv[1] == "mlfp": + database = d_mlfp + #database.protocol = "cv1" + else: + raise ValueError("database must be either wmca or mlfp.") + annotation_directory = sys.argv[2] - print("Database: {}. Annotation directory: {}".format(database,\ + print("Database: {}. Annotation directory: {}".format(database.name,\ annotation_directory)) if len(sys.argv) >= 4: @@ -199,9 +279,14 @@ def main(): else: job_index = -1 - ag = AnnotationGenerator(database, annotation_directory) - ag.run(job_index) - + if sys.argv[1] == "wmca": + an = Annotator(database, annotation_directory) + else: # for MLFP + an = Annotator(database, annotation_directory, n_sigma=4.0,\ + use_central_region=True) + + an.run(job_index) + #------------------------------------------------------------------------------ if __name__ == "__main__": @@ -209,8 +294,3 @@ if __name__ == "__main__": main() #------------------------------------------------------------------------------ - - - - - diff --git a/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py b/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py index d451f5fce41a910e0b3a5f3f197ae852db3e1447..65f997265a2493b5c072f6aaac726effdbb59c44 100755 --- a/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py +++ b/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py @@ -1,25 +1,28 @@ -# -# @ desc: script to convert MLFP (NIR) data into specific format -# @ Ketan Kotwal -# +""" + Desc: Script to convert MLFP (NIR) data into specific format compatible + to bob + Author: Ketan Kotwal +""" + #------------------------------------------------------------------------------ # imports +import os +import sys +import numpy as np import scipy.io as spio -import os, sys from bob.bio.video import FrameContainer from bob.io.base import create_directories_safe, HDF5File -import numpy as np -from bob.pad.face.preprocessor.FaceCropAlign import detect_face_landmarks_in_image -import json -frames_per_video = 20 +FRAMES_PER_VIDEO = 20 #------------------------------------------------------------------------------ -class MLFPConvertor: +class MLFPConvertor(object): + """A class to convert the NIR files MLFP dataset into bob compatible format.""" - def __init__(self, input_directory, output_directory, annotation_directory): + def __init__(self, input_directory, output_directory): + """Instantiate the class with input and output directories.""" self.input_directory = input_directory @@ -27,22 +30,19 @@ class MLFPConvertor: if not os.path.exists(self.output_directory): os.makedirs(self.output_directory) - self.annotation_directory = annotation_directory - if not os.path.exists(self.annotation_directory): - os.makedirs(self.annotation_directory) - - self.file_objects = self.load_db(self.input_directory) + self.file_objects = self._load_db(self.input_directory) print("Input directory: {}".format(self.input_directory)) print("Output directory: {}".format(self.output_directory)) - print("Annotation_directory: {}".format(self.annotation_directory)) #------------------------------------------------------------------------------ - def load_db(self, db_directory): - + def _load_db(self, db_directory): + """Create a list of files by walkthrough into input directory.""" + file_list = [] - for dirpath, dirs, files in os.walk(db_directory): + + for dirpath, _, files in os.walk(db_directory): for name in files: file_path = os.path.join(dirpath, name) @@ -52,133 +52,40 @@ class MLFPConvertor: return file_list -#------------------------------------------------------------------------------ - - def normalize_image(self, image, n_sigma=4.0): - - assert(len(image.shape)==2) - - image = image.astype(np.float64) - - # use central region of image to determine parameters for normalization - h, w = image.shape - region = image[int(0.25*h):int(0.75*h), int(0.25*w):int(0.75*w)] - - # calculate median values - med = np.median(region) - mad = np.median(np.abs(region - med)) - image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 - - # Clamping to 0-255 - image_n = np.maximum(image_n, 0) - image_n = np.minimum(image_n, 255) - - image_n = image_n.astype(np.uint8) - - return image_n - -#------------------------------------------------------------------------------ - - def annotate(self, fc): - - # find anotations for each frame in framecontainer - # if the annotations are not found, set the previous ones - - annotations = {} - prev_index = None - - for index, frame, _ in fc: - - image = self.normalize_image(frame) - - try: - frame_annotations = self.find_annotations(image) - - except Exception as e: - print(" --> Exception: {}".format(e)) - - # copy annotations of previous frame - if (prev_index is not None): - frame_annotations = annotations[prev_index] - print(" --> Copying annotations of previous frame") - - else: - frame_annotations = None - print(" --> No annotations") - - if frame_annotations is not None: - annotations[str(index)] = frame_annotations - prev_index = str(index) - - return annotations - -#------------------------------------------------------------------------------ - - def find_annotations(self, image): - - # if image is grayscale, convert to 3 channel for face detection - if len(image.shape) == 2: - image = np.repeat(image[:, :, np.newaxis], 3, axis=2) - image = np.transpose(image, (2,0,1)) - - # find annotations using MTCNN - frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn") - if frame_annotations: - #print(" --> Found using MTCNN") - return frame_annotations - - # else, find annotations using dlib - frame_annotations = detect_face_landmarks_in_image(image, method="dlib") - if frame_annotations: - #print(" --> Found using dlib") - return frame_annotations - - # else, return empty dictionary with warning - print(" --> Could not find annotations") - return None - #------------------------------------------------------------------------------ def process_file(self, filename): + """Transform a .mat file into .hdf5 file containing FrameContainer.""" - # load the file + # load the file fc = spio.loadmat(os.path.join(self.input_directory, filename + ".mat")) fc = fc["IR"] # select first frames_per_video frames and add to framecontainer - fc_bob = FrameContainer() + fc_out = FrameContainer() - for idx, frame in enumerate(fc[:frames_per_video]): + for idx, frame in enumerate(fc[:FRAMES_PER_VIDEO]): frame = frame[0]/256.0 - fc_bob.add(idx, frame.astype(np.uint8), None) - - # find annotations for FC - annotations = self.annotate(fc_bob) + fc_out.add(idx, frame.astype(np.uint8), None) # save fc to hdf file out_filepath = os.path.join(self.output_directory, filename + ".hdf5") create_directories_safe(directory=os.path.split(out_filepath)[0], dryrun=False) - f_out = HDF5File(out_filepath, 'w') - fc_bob.save(f_out) - del f_out - - # save annotations - json_filepath = os.path.join(self.annotation_directory, filename + ".json") - create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False) - - with open(json_filepath, "w+") as json_file: - json_file.write(json.dumps(annotations)) + hdf_out = HDF5File(out_filepath, 'w') + fc_out.save(hdf_out) + del hdf_out return #------------------------------------------------------------------------------ - + def run(self): + """Execute the conversion script.""" total = len(self.file_objects) print("Files to be converted to .hdf5: {}".format(total)) - + for idx, f in enumerate(self.file_objects): print("[{:03d}/{:03d}] Sample: {}".format(idx+1, total, f)) @@ -189,26 +96,40 @@ class MLFPConvertor: else: print("File exist for {}. Skipping".format(f)) -#------------------------------------------------------------------------------ + return -#input_directory = "/idiap/resource/database/MLFP/NIR_Protocol" -#output_directory = "/idiap/temp/kkotwal/nmad_experiments/mlfp_int2/" +#------------------------------------------------------------------------------ def main(): + """A script to convert the MLFP dataset into bob compatible format. + + This script converts the MLFP dataset (only NIR files) into bob compatible + dataset format. The script can function properly only if the dataset + structure is unchanged. + + Parameters + ---------- + input_directory : str + The location of the MLFP dataset + output_directory : str + The desired location of converted data + + Returns + ------- + None + + """ input_directory = sys.argv[1] output_directory = sys.argv[2] - annotation_directory = sys.argv[3] - m_conv = MLFPConvertor(input_directory, output_directory, annotation_directory) + m_conv = MLFPConvertor(input_directory, output_directory) m_conv.run() - + #------------------------------------------------------------------------------ if __name__ == "__main__": main() -#---------------------------------------------------------- - - +#------------------------------------------------------------------------------ diff --git a/environment.yml b/environment.yml index a69284100fbd707cd3f9eeea6850cb2465b95d8a..72c22700c9c211fed20bd0ec8865d49931dd7ab8 100644 --- a/environment.yml +++ b/environment.yml @@ -3,34 +3,16 @@ channels: - https://www.idiap.ch/software/bob/conda - defaults dependencies: - - python=3.6.6 - - python-dateutil=2.7.3 - - numpy=1.14.5=py36hcd700cb_3 - - scipy=1.1.0=py36hfc37229_0 - - matplotlib=2.2.2 - - scikit-image - - pytorch - - torchvision - - bob=5.0.0=py36hc3b47e9_0 - - bob.bio.base=4.0.0 - - bob.bio.face=4.0.0 - - bob.bio.video - - bob.blitz - - bob.core=2.2.1 + - python=3.7.9 + - numpy=1.17.4 + - pytorch=1.4.0 + - torchvision=0.2.1 + - bob.pad.face=2.2.1 + - bob.bio.video=3.4.4 - bob.db.base - bob.extension - - bob.io.base - - bob.io.image - - bob.io.video - - bob.ip.dlib - - bob.ip.facedetect - - bob.ip.facelandmarks - - bob.ip.flandmark - - bob.ip.mtcnn - - bob.learn.linear - - bob.math - - bob.measure - - bob.pad.base - - bob.pad.face + - bob.io.base=3.0.6 + - bob.ip.mtcnn=1.0.3 + - bob.ip.dlib=1.0.7 - bob.buildout