diff --git a/bob/paper/nir_patch_pooling/database/mlfp.py b/bob/paper/nir_patch_pooling/database/mlfp.py index b1c84acdeac7d2a50b5eddc836fe1f212239eb92..3b6473a5885c72b7150651a678492ca346c8bb89 100644 --- a/bob/paper/nir_patch_pooling/database/mlfp.py +++ b/bob/paper/nir_patch_pooling/database/mlfp.py @@ -92,40 +92,46 @@ class MLFPDatabase(FileListPadDatabase): def annotations(self, f): + """ + Returns annotations for a given file object ``f``. + Annotations must be precomputed. + + **Parameters:** + + ``f`` : :py:class:`object` + An instance of file object defined above. + + **Returns:** + + ``annotations`` : :py:class:`dict` + A dictionary containing annotations for + each frame in the video. + Dictionary structure: + ``annotations = {'1': frame1_dict, '2': frame1_dict, ...}``. + Where + ``frameN_dict`` contains coordinates of the + face bounding box and landmarks in frame N. + """ + if self.annotation_directory is None: raise ValueError("Annotation Directory is not provided.") file_path = os.path.join(self.annotation_directory, f.path + ".json") - # if annotations exist, load from file_path if os.path.isfile(file_path): with open(file_path, "r") as json_file: annotations = json.load(json_file) - - else: - - # find and save annotations - annotations = {} - video = f.load(directory=self.original_directory, extension=self.original_extension) - - for idx, image in enumerate(video.as_array()): - frame_annotations = detect_face_landmarks_in_image(image, method=self.landmark_detect_method) - if frame_annotations: - annotations[str(idx)] = frame_annotations - - # save to file_path - create_directories_safe(directory=os.path.split(file_path)[0], dryrun=False) - with open(file_path, 'w+') as json_file: - json_file.write(json.dumps(annotations)) + + if not annotations: # if dictionary is empty + logger.warning("Empty annotations for %s", f.path) + return None + return annotations - if not annotations: # if dictionary is empty - logger.warning("Empty annotations for {}".format(f.path)) + else: + logger.warning("Annotation file for %s does not exist. (Overall path: %s)", f.path, file_path) return None - return annotations - - #------------------------------------------------------------------------------ diff --git a/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py b/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py index 3c32ef19e495303d6fe7efc5e38be942fa2d0b5c..ef4ba4171ac34577b8979d826aa809c152d2d102 100755 --- a/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py +++ b/bob/paper/nir_patch_pooling/script/convert_mlfp_database.py @@ -17,17 +17,23 @@ frames_per_video = 20 class MLFPConvertor: - def __init__(self, input_directory, output_directory): + def __init__(self, input_directory, output_directory, annotation_directory): self.input_directory = input_directory + self.output_directory = output_directory if not os.path.exists(self.output_directory): os.makedirs(self.output_directory) + self.annotation_directory = annotation_directory + if not os.path.exists(self.annotation_directory): + os.makedirs(self.annotation_directory) + self.file_objects = self.load_db(self.input_directory) print("Input directory: {}".format(self.input_directory)) - print("output directory: {}".format(self.output_directory)) + print("Output directory: {}".format(self.output_directory)) + print("Annotation_directory: {}".format(self.annotation_directory)) #------------------------------------------------------------------------------ @@ -44,7 +50,93 @@ class MLFPConvertor: return file_list #------------------------------------------------------------------------------ + + def normalize_image(self, image, n_sigma=3.0): # or 4.0 + + assert(len(image.shape)==2) + + image = image.astype(np.float64) + + # use central region of image to determine parameters for normalization + h, w = image.shape + region = image[int(0.25*h):int(0.75*h), int(0.25*w):int(0.75*w)] + + # calculate median values + med = np.median(region) + mad = np.median(np.abs(region - med)) + image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0 + + # Clamping to 0-255 + image_n = np.maximum(image_n, 0) + image_n = np.minimum(image_n, 255) + + image_n = image_n.astype(np.uint8) + + return image_n + +#------------------------------------------------------------------------------ + + def annotate(self, fc): + + # find anotations for each frame in framecontainer + # if the annotations are not found, set the previous ones + + annotations = {} + prev_index = None + for index, frame, _ in fc: + + image = self.normalize_image(frame) + + try: + frame_annotations = self.find_annotations(image) + + except Exception as e: + logger.error("\tException: {}".format(e)) + + # copy annotations of previous frame + if (prev_index is not None): + frame_annotations = annotations[prev_index] + logger.warning("\tCopying annotations of previous frame") + print("\tCopying annotations of previous frame") + + else: + frame_annotations = {} + logger.warning("\tSetting empty annotations") + + annotations[str(index)] = frame_annotations + prev_index = str(index) + + return annotations + +#------------------------------------------------------------------------------ + + def find_annotations(self, image): + + # if image is grayscale, convert to 3 channel for face detection + if len(image.shape) == 2: + print (image.shape) + image = np.repeat(image[:, :, np.newaxis], 3, axis=2) + image = np.transpose(image, (2,0,1)) + + # find annotations using MTCNN + frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn") + if frame_annotations: + logger.debug(" --> Found using MTCNN") + return frame_annotations + + # else, find annotations using dlib + frame_annotations = detect_face_landmarks_in_image(image, method="dlib") + if frame_annotations: + logger.debug(" --> Found using dlib") + return frame_annotations + + # else, return empty dictionary with warning + logger.warning(" --> Could not find annotations") + return {} + +#------------------------------------------------------------------------------ + def process_file(self, filename): # load the file @@ -53,10 +145,13 @@ class MLFPConvertor: # select first frames_per_video frames and add to framecontainer fc_bob = FrameContainer() + for idx, frame in enumerate(fc[:frames_per_video]): frame = frame[0]/256.0 fc_bob.add(idx, frame.astype(np.uint8), None) + # find annotations for FC + annotations = self.annotate(fc_bob) # save fc to hdf file out_filepath = os.path.join(self.output_directory, filename + ".hdf5") @@ -66,6 +161,13 @@ class MLFPConvertor: fc_bob.save(f_out) del f_out + # save annotations + json_filepath = os.path.join(self.annotation_directory, filename + ".json") + create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False) + + with open(json_filepath, "w+") as json_file: + json_file.write(json.dumps(annotations)) + return #------------------------------------------------------------------------------ @@ -94,8 +196,9 @@ def main(): input_directory = sys.argv[1] output_directory = sys.argv[2] + annotation_directory = sys.argv[3] - m_conv = MLFPConvertor(input_directory, output_directory) + m_conv = MLFPConvertor(input_directory, output_directory, annotation_directory) m_conv.run() #------------------------------------------------------------------------------ @@ -107,6 +210,3 @@ if __name__ == "__main__": #---------------------------------------------------------- - - -