Skip to content
Snippets Groups Projects
Commit b64c606a authored by Ketan Kotwal's avatar Ketan Kotwal
Browse files

script to annotate both databases

parent 805dfa9b
No related branches found
No related tags found
No related merge requests found
#
# script to generate face annotations for NMAD directly over NIR data
# @ Ketan Kotwal
# Script to generate face annotations for NIR data
# @author: Ketan Kotwal
#
#----------------------------------------------------------
#------------------------------------------------------------------------------
# imports
from bob.pad.face.preprocessor.FaceCropAlign import detect_face_landmarks_in_image
from bob.ip.color import rgb_to_gray
from bob.ip.facelandmarks import detect_landmarks
import bob.bio.video
import bob.io.base
from bob.bio.video import FrameContainer
from bob.io.base import create_directories_safe
import numpy as np
import json, h5py
import json
import os, sys
from bob.paper.makeup_aim.database import MLFPDatabase
#------------------------------------------------------------------------------
ORIGINAL_DIRECTORY = "/idiap/temp/kkotwal/nmad_experiments/mlfp_int/"
class Annotator:
#----------------------------------------------------------
def __init__(self):
pass
class AnnotationGenerator:
def __init__(self, input_directory, annotation_directory):
self.input_directory = input_directory.rstrip("/")
self.annotation_directory = annotation_directory.rstrip("/")
if not os.path.exists(self.annotation_directory):
os.makedirs(self.annotation_directory)
self.file_objects = self._load_db(self)
print("Input directory: {}".format(self.input_directory))
print("Annotation directory: {}".format(self.annotation_directory))
#--------------------------------------
def _load_db(self, protocol):
database = MLFPDatabase(original_directory = ORIGINAL_DIRECTORY,
protocol = "grandtest", annotation_directory = None)
return database.objects(protocol="grandtest")
#----------------------------------------------------------
#------------------------------------------------------------------------------
def find_annotations(self, image):
# if image is grayscale, convert to 3 channel for face detection
if len(image.shape) == 2:
image = np.repeat(image[:, :, np.newaxis], 3, axis=2)
image = np.transpose(image, (2,0,1))
else:
print("\t\tNot altering dimensions")
# find annotations using MTCNN
frame_annotations = detect_face_landmarks_in_image(image, method="mtcnn")
if frame_annotations:
print(" --> Found using MTCNN")
logger.debug(" --> Found using MTCNN")
return frame_annotations
# else, find annotations using dlib
frame_annotations = detect_face_landmarks_in_image(image, method="dlib")
if frame_annotations:
print(" --> Found using dlib")
return frame_annotations
# else, find annotations using landmark detection routines from bob
frame_annotations = self.bob_annotate(image.astype(np.uint8))
if frame_annotations:
print(" --> Found using bob routine")
logger.debug(" --> Found using dlib")
return frame_annotations
# else, return empty dictionary with warning
print(" --> Could not find annotations")
logger.warning(" --> Could not find annotations")
return {}
#----------------------------------------------------------
def _get_eye_pos(self, lm):
"""
This function returns the locations of left and right eyes
"""
left_eye_t = (lm[36, :] + lm[39, :]) / 2.0
right_eye_t = (lm[42, :] + lm[45, :]) / 2.0
right_eye = (int(left_eye_t[1]), int(left_eye_t[0]))
left_eye = (int(right_eye_t[1]), int(right_eye_t[0]))
return right_eye, left_eye
#--------------------------------------
#------------------------------------------------------------------------------
def bob_annotate(self, image):
def normalize_image(self, image, n_sigma=3.0): # or 4.0
image = rgb_to_gray(image)
lm1 = detect_landmarks(image, 1)
lm = lm1[0].landmarks
bounding_box = lm1[0].bounding_box
annotations = {}
assert(len(image.shape)==2)
image = image.astype(np.float64)
if lm is not None:
lm = np.array(lm)
lm = np.vstack((lm[:, 1], lm[:, 0])).T
right_eye, left_eye = self._get_eye_pos(lm)
# use central region of image to determine parameters for normalization
h, w = image.shape
region = image[int(0.25*h):int(0.75*h), int(0.25*w):int(0.75*w)]
# calculate median values
med = np.median(region)
mad = np.median(np.abs(region - med))
image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0
points = []
for i in range(lm.shape[0]):
points.append((int(lm[i, 0]), int(lm[i, 1])))
# Clamping to 0-255
image_n = np.maximum(image_n, 0)
image_n = np.minimum(image_n, 255)
annotations['topleft'] = bounding_box.topleft
annotations['bottomright'] = bounding_box.bottomright
annotations['landmarks'] = points
annotations['leye'] = left_eye
annotations['reye'] = right_eye
image_n = image_n.astype(np.uint8)
return annotations
return image_n
#--------------------------------------
#------------------------------------------------------------------------------
def normalize_image(self, image, n_sigma=4.0, norm_method="MAD"):
def process(self, fc):
face = image[image.shape[0]>>2:3*image.shape[0]>>2, \
image.shape[1]>>2:3*image.shape[1]>>2]
"""
fc: FrameContainer
returns:
annotations: dictionary where each frame index is a key.
"""
annotations = {}
prev_index = None
for index, frame, _ in fc:
face = face.astype(np.float64)
image = image.astype(np.float64)
image = self.normalize_image(frame)
assert(len(image.shape)==2)
try:
frame_annotations = self.find_annotations(image)
if norm_method=='STD':
mu = np.mean(face)
std = np.std(face)
image_n=((image-mu+n_sigma*std)/(2.0*n_sigma*std))*255.0
except Exception as e:
logger.error("\tException: {}".format(e))
# copy annotations of previous frame
if (prev_index is not None):
frame_annotations = annotations[prev_index]
logger.warning("\tCopying annotations of previous frame")
else:
frame_annotations = {}
logger.warning("\tSetting empty annotations")
if norm_method=='MAD':
annotations[str(index)] = frame_annotations
prev_index = str(index)
return annotations
med = np.median(face)
mad = np.median(np.abs(face - med))
image_n = ((image-med+n_sigma*mad)/(2.0*n_sigma*mad))*255.0
#------------------------------------------------------------------------------
if norm_method=='MINMAX':
t_min = np.min(face)
t_max = np.max(face)
image_n = ((image-t_min)/(t_max-t_min))*255.0
#------------------------------------------------------------------------------
# Clamping to 0-255
image_n=np.maximum(image_n, 0)
image_n=np.minimum(image_n, 255)
class AnnotationGenerator:
image_n = image_n.astype(np.uint8)
def __init__(self, database = None, annotation_directory = None):
return image_n
self.database = database
#----------------------------------------------------------
self.annotation_directory = annotation_directory
def process_image(self, filename):
if not os.path.exists(self.annotation_directory):
os.makedirs(self.annotation_directory)
logger.warning("Annotation directory created at: {}"\
.format(self.annotation_directory))
# load the image
fc = h5py.File(os.path.join(ORIGINAL_DIRECTORY, filename.path + ".hdf5"))
annotations = {}
for i, frame in enumerate(fc.keys()):
print(" Frame {:02d}".format(i))
image = fc[frame]["array"].value
self.annotator = Annotator()
try:
tmp_annotations = self.find_annotations(image)
#------------------------------------------------------------------------------
except Exception as e:
print(" Exception: {}".format(e))
def process_video(self, filename):
if (i > 0) and (bool(annotations[str(i-1)])):
tmp_annotations = annotations[str(i-1)]
print(" Copying annotations of previous frame")
else:
tmp_annotations = {}
print(" Annotations for previous frame do not exist. Setting empty annotations")
# load the video into framecontainer
fc = filename.load(directory = self.database.original_directory,\
extension = self.database.original_extension)
annotations[str(i)] = tmp_annotations
# obtain the annotations
annotations = self.annotator.process(fc)
# save the annotations as json.
json_filepath = os.path.join(self.annotation_directory, filename.path + ".json")
bob.io.base.create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False)
create_directories_safe(directory=os.path.split(json_filepath)[0], dryrun=False)
with open(json_filepath, "w+") as json_file:
json_file.write(json.dumps(annotations))
fc.close()
return
#--------------------------------------
#------------------------------------------------------------------------------
def run(self, job_index):
total = len(self.file_objects)
print("Found {} files to be annotated".format(total))
# collect the files to be processed
self.filelist = self.database.objects()
total = len(self.filelist)
logger.info("Files to be annotated: {}".format(total))
# calculate split indices if computation is parallel
if(job_index != -1):
num_jobs = 32
num_jobs = 32
file_range = np.linspace(0, total, num_jobs+1, dtype=np.int)
start_id = file_range[job_index]
end_id = file_range[job_index+1]
start_index = file_range[job_index]
end_index = file_range[job_index+1]
else:
start_id = 0
end_id = total
print("Processing Job Index: {} (Files: {} to {})".format(job_index, start_id, end_id))
start_index = 0
end_index = total
for i, f in enumerate(self.file_objects[start_id:end_id]):
print("Processing Job Index: {} (Files: {} to {})".format(job_index, start_index, end_index))
print("[{:03d}/{:03d}] Sample: {}".format(i+1, total, f.path))
# process each video in the given range
for idx, f in enumerate(self.filelist[start_index:end_index]):
logger.info("[{:03d}/{:03d}] Sample: {}".format(idx+1, total, f.path))
json_filepath = os.path.join(self.annotation_directory, f.path + ".json")
if not os.path.exists(json_filepath):
self.process_image(f)
self.process_video(f)
else:
print("Annotations exist for {}. Skipping".format(f.path))
logger.info("Annotations exist: {}. Skipping".format(f.path))
return
#------------------------------------------------------------------------------
#----------------------------------------------------------
def main():
job_index = int(float(sys.argv[1]))
print("# Job Index: {}".format(job_index))
if len(sys.argv) < 3:
print("Usage: {} <database> <annotation-directory> [<job-index>]"\
.format(__name__))
exit(0)
database = sys.argv[1]
annotation_directory = sys.argv[2]
print("Database: {}. Annotation directory: {}".format(database,\
annotation_directory))
input_directory = ORIGINAL_DIRECTORY #sys.argv[1]
annotation_directory = "/idiap/temp/kkotwal/nmad_experiments/annotations/mlfp"
if len(sys.argv) >= 4:
job_index = int(float(sys.argv[3]))
print("# Job Index: {}".format(job_index))
else:
job_index = -1
ag = AnnotationGenerator(input_directory, annotation_directory)
ag = AnnotationGenerator(database, annotation_directory)
ag.run(job_index)
#----------------------------------------------------------
#------------------------------------------------------------------------------
if __name__ == "__main__":
main()
#----------------------------------------------------------
#------------------------------------------------------------------------------
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment