diff --git a/bob/paper/nir_patch_pooling/script/annotate_database.py b/bob/paper/nir_patch_pooling/script/annotate_database.py index 4e4db2abbdfafc1a9775c683f3eddd36edf2e5d7..071084e90b1f2dd1048d086b877e81546ff89f91 100755 --- a/bob/paper/nir_patch_pooling/script/annotate_database.py +++ b/bob/paper/nir_patch_pooling/script/annotate_database.py @@ -1,246 +1,219 @@ # -# script to generate face annotations for NMAD directly over NIR data -# @ Ketan Kotwal +# Script to generate face annotations for NIR data +# @author: Ketan Kotwal # -#---------------------------------------------------------- +#------------------------------------------------------------------------------ # imports from bob.pad.face.preprocessor.FaceCropAlign import detect_face_landmarks_in_image -from bob.ip.color import rgb_to_gray -from bob.ip.facelandmarks import detect_landmarks -import bob.bio.video -import bob.io.base +from bob.bio.video import FrameContainer +from bob.io.base import create_directories_safe import numpy as np -import json, h5py +import json import os, sys -from bob.paper.makeup_aim.database import MLFPDatabase +#------------------------------------------------------------------------------ -ORIGINAL_DIRECTORY = "/idiap/temp/kkotwal/nmad_experiments/mlfp_int/" +class Annotator: -#---------------------------------------------------------- + def __init__(self): + pass -class AnnotationGenerator: - - def __init__(self, input_directory, annotation_directory): - - self.input_directory = input_directory.rstrip("/") - self.annotation_directory = annotation_directory.rstrip("/") - if not os.path.exists(self.annotation_directory): - os.makedirs(self.annotation_directory) - - self.file_objects = self._load_db(self) - - print("Input directory: {}".format(self.input_directory)) - print("Annotation directory: {}".format(self.annotation_directory)) - -#-------------------------------------- - - def _load_db(self, protocol): - - database = MLFPDatabase(original_directory = ORIGINAL_DIRECTORY, - protocol = "grandtest", annotation_directory = None) - - return database.objects(protocol="grandtest") - -#---------------------------------------------------------- +#------------------------------------------------------------------------------ def find_annotations(self, image): + # if image is grayscale, convert to 3 channel for face detection if len(image.shape) == 2: image = np.repeat(image[:, :, np.newaxis], 3, axis=2) image = np.transpose(image, (2,0,1)) - else: - print("\t\tNot altering dimensions") # find annotations using MTCNN frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn") if frame_annotations: - print(" --> Found using MTCNN") + logger.debug(" --> Found using MTCNN") return frame_annotations # else, find annotations using dlib frame_annotations = detect_face_landmarks_in_image(image, method="dlib") if frame_annotations: - print(" --> Found using dlib") - return frame_annotations - - # else, find annotations using landmark detection routines from bob - frame_annotations = self.bob_annotate(image.astype(np.uint8)) - if frame_annotations: - print(" --> Found using bob routine") + logger.debug(" --> Found using dlib") return frame_annotations # else, return empty dictionary with warning - print(" --> Could not find annotations") + logger.warning(" --> Could not find annotations") return {} -#---------------------------------------------------------- - - def _get_eye_pos(self, lm): - """ - This function returns the locations of left and right eyes - """ - - left_eye_t = (lm[36, :] + lm[39, :]) / 2.0 - right_eye_t = (lm[42, :] + lm[45, :]) / 2.0 - - right_eye = (int(left_eye_t[1]), int(left_eye_t[0])) - left_eye = (int(right_eye_t[1]), int(right_eye_t[0])) - - return right_eye, left_eye - -#-------------------------------------- +#------------------------------------------------------------------------------ - def bob_annotate(self, image): + def normalize_image(self, image, n_sigma=3.0): # or 4.0 - image = rgb_to_gray(image) - lm1 = detect_landmarks(image, 1) - lm = lm1[0].landmarks - bounding_box = lm1[0].bounding_box - annotations = {} + assert(len(image.shape)==2) + + image = image.astype(np.float64) - if lm is not None: - lm = np.array(lm) - lm = np.vstack((lm[:, 1], lm[:, 0])).T - right_eye, left_eye = self._get_eye_pos(lm) + # use central region of image to determine parameters for normalization + h, w = image.shape + region = image[int(0.25*h):int(0.75*h), int(0.25*w):int(0.75*w)] + + # calculate median values + med = np.median(region) + mad = np.median(np.abs(region - med)) + image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 - points = [] - for i in range(lm.shape[0]): - points.append((int(lm[i, 0]), int(lm[i, 1]))) + # Clamping to 0-255 + image_n = np.maximum(image_n, 0) + image_n = np.minimum(image_n, 255) - annotations['topleft'] = bounding_box.topleft - annotations['bottomright'] = bounding_box.bottomright - annotations['landmarks'] = points - annotations['leye'] = left_eye - annotations['reye'] = right_eye + image_n = image_n.astype(np.uint8) - return annotations + return image_n -#-------------------------------------- +#------------------------------------------------------------------------------ - def normalize_image(self, image, n_sigma=4.0, norm_method="MAD"): + def process(self, fc): - face = image[image.shape[0]>>2:3*image.shape[0]>>2, \ - image.shape[1]>>2:3*image.shape[1]>>2] + """ + fc: FrameContainer + returns: + annotations: dictionary where each frame index is a key. + """ + + annotations = {} + prev_index = None + + for index, frame, _ in fc: - face = face.astype(np.float64) - image = image.astype(np.float64) + image = self.normalize_image(frame) - assert(len(image.shape)==2) + try: + frame_annotations = self.find_annotations(image) - if norm_method=='STD': - - mu = np.mean(face) - std = np.std(face) - image_n=((image-mu+n_sigma*std)/(2.0*n_sigma*std))*255.0 + except Exception as e: + logger.error("\tException: {}".format(e)) + + # copy annotations of previous frame + if (prev_index is not None): + + frame_annotations = annotations[prev_index] + logger.warning("\tCopying annotations of previous frame") + + else: + + frame_annotations = {} + logger.warning("\tSetting empty annotations") - if norm_method=='MAD': + annotations[str(index)] = frame_annotations + prev_index = str(index) + + return annotations - med = np.median(face) - mad = np.median(np.abs(face - med)) - image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 +#------------------------------------------------------------------------------ - if norm_method=='MINMAX': - - t_min = np.min(face) - t_max = np.max(face) - image_n = ((image-t_min)/(t_max-t_min))*255.0 +#------------------------------------------------------------------------------ - # Clamping to 0-255 - image_n=np.maximum(image_n, 0) - image_n=np.minimum(image_n, 255) +class AnnotationGenerator: - image_n = image_n.astype(np.uint8) + def __init__(self, database = None, annotation_directory = None): - return image_n + self.database = database -#---------------------------------------------------------- + self.annotation_directory = annotation_directory - def process_image(self, filename): + if not os.path.exists(self.annotation_directory): + os.makedirs(self.annotation_directory) + logger.warning("Annotation directory created at: {}"\ + .format(self.annotation_directory)) - # load the image - fc = h5py.File(os.path.join(ORIGINAL_DIRECTORY, filename.path + ".hdf5")) - - annotations = {} - for i, frame in enumerate(fc.keys()): - print(" Frame {:02d}".format(i)) - image = fc[frame]["array"].value + self.annotator = Annotator() - try: - tmp_annotations = self.find_annotations(image) +#------------------------------------------------------------------------------ - except Exception as e: - print(" Exception: {}".format(e)) + def process_video(self, filename): - if (i > 0) and (bool(annotations[str(i-1)])): - tmp_annotations = annotations[str(i-1)] - print(" Copying annotations of previous frame") - else: - tmp_annotations = {} - print(" Annotations for previous frame do not exist. Setting empty annotations") + # load the video into framecontainer + fc = filename.load(directory = self.database.original_directory,\ + extension = self.database.original_extension) - annotations[str(i)] = tmp_annotations + # obtain the annotations + annotations = self.annotator.process(fc) + # save the annotations as json. json_filepath = os.path.join(self.annotation_directory, filename.path + ".json") - bob.io.base.create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False) + + create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False) with open(json_filepath, "w+") as json_file: json_file.write(json.dumps(annotations)) - fc.close() - return -#-------------------------------------- - +#------------------------------------------------------------------------------ + def run(self, job_index): - total = len(self.file_objects) - print("Found {} files to be annotated".format(total)) + # collect the files to be processed + self.filelist = self.database.objects() + total = len(self.filelist) + logger.info("Files to be annotated: {}".format(total)) + # calculate split indices if computation is parallel if(job_index != -1): - num_jobs = 32 + num_jobs = 32 file_range = np.linspace(0, total, num_jobs+1, dtype=np.int) - - start_id = file_range[job_index] - end_id = file_range[job_index+1] + start_index = file_range[job_index] + end_index = file_range[job_index+1] else: - start_id = 0 - end_id = total - - print("Processing Job Index: {} (Files: {} to {})".format(job_index, start_id, end_id)) + start_index = 0 + end_index = total - for i, f in enumerate(self.file_objects[start_id:end_id]): + print("Processing Job Index: {} (Files: {} to {})".format(job_index, start_index, end_index)) - print("[{:03d}/{:03d}] Sample: {}".format(i+1, total, f.path)) + # process each video in the given range + for idx, f in enumerate(self.filelist[start_index:end_index]): + logger.info("[{:03d}/{:03d}] Sample: {}".format(idx+1, total, f.path)) json_filepath = os.path.join(self.annotation_directory, f.path + ".json") + if not os.path.exists(json_filepath): - self.process_image(f) + self.process_video(f) + else: - print("Annotations exist for {}. Skipping".format(f.path)) + logger.info("Annotations exist: {}. Skipping".format(f.path)) + + return + +#------------------------------------------------------------------------------ -#---------------------------------------------------------- def main(): - job_index = int(float(sys.argv[1])) - print("# Job Index: {}".format(job_index)) + if len(sys.argv) < 3: + print("Usage: {} <database> <annotation-directory> [<job-index>]"\ + .format(__name__)) + exit(0) + + database = sys.argv[1] + annotation_directory = sys.argv[2] + print("Database: {}. Annotation directory: {}".format(database,\ + annotation_directory)) - input_directory = ORIGINAL_DIRECTORY #sys.argv[1] - annotation_directory = "/idiap/temp/kkotwal/nmad_experiments/annotations/mlfp" + if len(sys.argv) >= 4: + job_index = int(float(sys.argv[3])) + print("# Job Index: {}".format(job_index)) + else: + job_index = -1 - ag = AnnotationGenerator(input_directory, annotation_directory) + ag = AnnotationGenerator(database, annotation_directory) ag.run(job_index) -#---------------------------------------------------------- +#------------------------------------------------------------------------------ if __name__ == "__main__": main() -#---------------------------------------------------------- +#------------------------------------------------------------------------------