Refactoring helpers for building preprocessing pipelines, in order to reduce code duplication

eyes-center cropped position remain hardcoded for 112x112 embedding transformer
parent b2843c75
from bob.bio.face.embeddings import FaceNetSanderberg
from bob.bio.face.embeddings import FaceNetSanderberg_20170512_110547
from bob.bio.face.config.baseline.helpers import embedding_transformer_160x160
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -17,7 +17,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_160x160(
FaceNetSanderberg(), annotation_type, fixed_positions
FaceNetSanderberg_20170512_110547(), annotation_type, fixed_positions
)
algorithm = Distance()
......
......@@ -24,15 +24,14 @@ else:
annotation_type = None
fixed_positions = None
def load(annotation_type, fixed_positions=None):
####### SOLVING THE FACE CROPPER TO BE USED ##########
def get_cropper(annotation_type, fixed_positions=None):
# Cropping
face_cropper, transform_extra_arguments = crop_80x64(
annotation_type, fixed_positions, color_channel="gray"
)
return face_cropper, transform_extra_arguments
def get_pipeline(face_cropper, transform_extra_arguments):
preprocessor = bob.bio.face.preprocessor.INormLBP(
face_cropper=face_cropper, dtype=np.float64
)
......@@ -79,6 +78,11 @@ def load(annotation_type, fixed_positions=None):
algorithm = BioAlgorithmLegacy(gabor_jet, base_dir=tempdir)
return VanillaBiometricsPipeline(transformer, algorithm)
def load(annotation_type, fixed_positions=None):
####### SOLVING THE FACE CROPPER TO BE USED ##########
face_cropper, transform_extra_arguments = get_cropper(annotation_type, fixed_positions)
return get_pipeline(face_cropper, transform_extra_arguments)
pipeline = load(annotation_type, fixed_positions)
transformer = pipeline.transformer
This diff is collapsed.
from bob.bio.face.embeddings import InceptionResnetv1_CasiaWebFace
from bob.bio.face.embeddings import InceptionResnetv1_Casia_CenterLoss_2018
from bob.bio.face.config.baseline.helpers import embedding_transformer_160x160
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -15,7 +15,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_160x160(
InceptionResnetv1_CasiaWebFace(), annotation_type, fixed_positions
InceptionResnetv1_Casia_CenterLoss_2018(), annotation_type, fixed_positions
)
algorithm = Distance()
......
from bob.bio.face.embeddings import InceptionResnetv1_MsCeleb
from bob.bio.face.embeddings import InceptionResnetv1_MsCeleb_CenterLoss_2018
from bob.bio.face.config.baseline.helpers import embedding_transformer_160x160
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -16,7 +16,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_160x160(
InceptionResnetv1_MsCeleb(), annotation_type, fixed_positions
InceptionResnetv1_MsCeleb_CenterLoss_2018(), annotation_type, fixed_positions
)
algorithm = Distance()
......
from bob.bio.face.embeddings import InceptionResnetv2_CasiaWebFace
from bob.bio.face.embeddings import InceptionResnetv2_Casia_CenterLoss_2018
from bob.bio.face.config.baseline.helpers import embedding_transformer_160x160
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -16,7 +16,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_160x160(
InceptionResnetv2_CasiaWebFace(), annotation_type, fixed_positions
InceptionResnetv2_Casia_CenterLoss_2018(), annotation_type, fixed_positions
)
algorithm = Distance()
......
from bob.bio.face.embeddings import InceptionResnetv2_MsCeleb
from bob.bio.face.embeddings import InceptionResnetv2_MsCeleb_CenterLoss_2018
from bob.bio.face.config.baseline.helpers import embedding_transformer_160x160
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -16,7 +16,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_160x160(
InceptionResnetv2_MsCeleb(), annotation_type, fixed_positions
InceptionResnetv2_MsCeleb_CenterLoss_2018(), annotation_type, fixed_positions
)
algorithm = Distance()
......
......@@ -21,15 +21,14 @@ else:
annotation_type = None
fixed_positions = None
def load(annotation_type, fixed_positions=None):
####### SOLVING THE FACE CROPPER TO BE USED ##########
def get_cropper(annotation_type, fixed_positions=None):
# Cropping
face_cropper, transform_extra_arguments = crop_80x64(
annotation_type, fixed_positions, color_channel="gray"
)
return face_cropper, transform_extra_arguments
def get_pipeline(face_cropper, transform_extra_arguments):
preprocessor = bob.bio.face.preprocessor.TanTriggs(
face_cropper=face_cropper, dtype=np.float64
)
......@@ -56,7 +55,6 @@ def load(annotation_type, fixed_positions=None):
)
### BIOMETRIC ALGORITHM
histogram = bob.bio.face.algorithm.Histogram(
distance_function = bob.math.histogram_intersection,
......@@ -69,5 +67,10 @@ def load(annotation_type, fixed_positions=None):
return VanillaBiometricsPipeline(transformer, algorithm)
def load(annotation_type, fixed_positions=None):
####### SOLVING THE FACE CROPPER TO BE USED ##########
face_cropper, transform_extra_arguments = get_cropper(annotation_type, fixed_positions)
return get_pipeline(face_cropper, transform_extra_arguments)
pipeline = load(annotation_type, fixed_positions)
transformer = pipeline.transformer
\ No newline at end of file
from bob.extension import rc
from bob.bio.face.embeddings.tf2_inception_resnet import InceptionResnetv2
from bob.bio.face.preprocessor import FaceCrop
from bob.bio.face.config.baseline.helpers import (
embedding_transformer_default_cropping,
embedding_transformer
)
from sklearn.pipeline import make_pipeline
from bob.pipelines.wrappers import wrap
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
VanillaBiometricsPipeline,
)
if "database" in locals():
annotation_type = database.annotation_type
fixed_positions = database.fixed_positions
else:
annotation_type = None
fixed_positions = None
def load(annotation_type, fixed_positions=None):
CROPPED_IMAGE_SIZE = (160, 160)
CROPPED_POSITIONS = embedding_transformer_default_cropping(CROPPED_IMAGE_SIZE,
annotation_type=annotation_type)
extractor_path = rc['bob.bio.face.tf2.casia-webface-inception-v2']
embedding = InceptionResnetv2(checkpoint_path=extractor_path)
transformer = embedding_transformer(CROPPED_IMAGE_SIZE,
embedding,
annotation_type,
CROPPED_POSITIONS,
fixed_positions)
algorithm = Distance()
return VanillaBiometricsPipeline(transformer, algorithm)
pipeline = load(annotation_type, fixed_positions)
transformer = pipeline.transformer
from .facenet_sanderberg import FaceNetSanderberg
from .idiap_inception_resnet import (
InceptionResnetv2_MsCeleb,
InceptionResnetv2_CasiaWebFace,
InceptionResnetv1_MsCeleb,
InceptionResnetv1_CasiaWebFace
)
import os
import bob.extension.download
def download_model(model_path, urls, zip_file="model.tar.gz"):
"""
Download and unzip a model from some URL.
Parameters
----------
model_path: str
Path where the model is supposed to be stored
urls: list
List of paths where the model is stored
from .arface import ArcFace_InsightFaceTF
zip_file: str
File name after the download
"""
if not os.path.exists(model_path):
os.makedirs(model_path, exist_ok=True)
zip_file = os.path.join(model_path, zip_file)
bob.extension.download.download_and_unzip(urls, zip_file)
from .tf2_inception_resnet import (
InceptionResnet,
InceptionResnetv2_MsCeleb_CenterLoss_2018,
InceptionResnetv2_Casia_CenterLoss_2018,
InceptionResnetv1_MsCeleb_CenterLoss_2018,
InceptionResnetv1_Casia_CenterLoss_2018,
FaceNetSanderberg_20170512_110547
)
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
......@@ -26,11 +51,11 @@ def __appropriate__(*args):
__appropriate__(
FaceNetSanderberg,
InceptionResnetv2_MsCeleb,
InceptionResnetv2_CasiaWebFace,
InceptionResnetv1_MsCeleb,
InceptionResnetv1_CasiaWebFace,
ArcFace_InsightFaceTF
InceptionResnet,
InceptionResnetv2_MsCeleb_CenterLoss_2018,
InceptionResnetv1_MsCeleb_CenterLoss_2018,
InceptionResnetv2_Casia_CenterLoss_2018,
InceptionResnetv1_Casia_CenterLoss_2018,
FaceNetSanderberg_20170512_110547
)
__all__ = [_ for _ in dir() if not _.startswith("_")]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
"""
Wrapper for the free FaceNet variant:
https://github.com/davidsandberg/facenet
Model 20170512-110547
"""
from __future__ import division
from sklearn.base import TransformerMixin, BaseEstimator
import os
import re
import logging
import numpy as np
from bob.ip.color import gray_to_rgb
from bob.io.image import to_matplotlib
from bob.extension import rc
import bob.extension.download
import bob.io.base
from sklearn.utils import check_array
logger = logging.getLogger(__name__)
FACENET_MODELPATH_KEY = "bob.bio.face.facenet_sanderberg_modelpath"
def prewhiten(img):
mean = np.mean(img)
std = np.std(img)
std_adj = np.maximum(std, 1.0 / np.sqrt(img.size))
y = np.multiply(np.subtract(img, mean), 1 / std_adj)
return y
def get_model_filenames(model_dir):
# code from https://github.com/davidsandberg/facenet
files = os.listdir(model_dir)
meta_files = [s for s in files if s.endswith(".meta")]
if len(meta_files) == 0:
raise ValueError("No meta file found in the model directory (%s)" % model_dir)
elif len(meta_files) > 1:
raise ValueError(
"There should not be more than one meta file in the model "
"directory (%s)" % model_dir
)
meta_file = meta_files[0]
max_step = -1
for f in files:
step_str = re.match(r"(^model-[\w\- ]+.ckpt-(\d+))", f)
if step_str is not None and len(step_str.groups()) >= 2:
step = int(step_str.groups()[1])
if step > max_step:
max_step = step
ckpt_file = step_str.groups()[0]
return meta_file, ckpt_file
class FaceNetSanderberg(TransformerMixin, BaseEstimator):
"""Wrapper for the free FaceNet variant:
https://github.com/davidsandberg/facenet
And for a preprocessor you can use::
from bob.bio.face.preprocessor import FaceCrop
# This is the size of the image that this model expects
CROPPED_IMAGE_HEIGHT = 160
CROPPED_IMAGE_WIDTH = 160
# eye positions for frontal images
RIGHT_EYE_POS = (46, 53)
LEFT_EYE_POS = (46, 107)
# Crops the face using eye annotations
preprocessor = FaceCrop(
cropped_image_size=(CROPPED_IMAGE_HEIGHT, CROPPED_IMAGE_WIDTH),
cropped_positions={'leye': LEFT_EYE_POS, 'reye': RIGHT_EYE_POS},
color_channel='rgb'
)
"""
def __init__(
self,
model_path=rc[FACENET_MODELPATH_KEY],
image_size=160,
layer_name="embeddings:0",
**kwargs,
):
super(FaceNetSanderberg, self).__init__()
self.model_path = model_path
self.image_size = image_size
self.layer_name = layer_name
self.loaded = False
self._clean_unpicklables()
def _clean_unpicklables(self):
self.session = None
self.embeddings = None
self.graph = None
self.images_placeholder = None
self.phase_train_placeholder = None
def _check_feature(self, img):
img = check_array(img, allow_nd=True)
def _convert(img):
assert img.shape[-2] == self.image_size
assert img.shape[-3] == self.image_size
img = prewhiten(img)
return img
if img.ndim == 3:
if img.shape[0]==3:
img = np.moveaxis(img, 0, -1)
return _convert(img)[None, ...] # Adding another axis
elif img.ndim == 4:
if img.shape[1]==3:
img = np.moveaxis(img, 1, -1)
return _convert(img)
else:
raise ValueError(f"Image shape {img.shape} not supported")
def load_model(self):
import tensorflow as tf
tf.compat.v1.reset_default_graph()
session_conf = tf.compat.v1.ConfigProto(
intra_op_parallelism_threads=1, inter_op_parallelism_threads=1
)
self.graph = tf.Graph()
self.session = tf.compat.v1.Session(graph=self.graph, config=session_conf)
if self.model_path is None:
self.model_path = self.get_modelpath()
if not os.path.exists(self.model_path):
bob.io.base.create_directories_safe(FaceNetSanderberg.get_modelpath())
zip_file = os.path.join(
FaceNetSanderberg.get_modelpath(), "20170512-110547.zip"
)
urls = [
# This link only works in Idiap CI to save bandwidth.
"http://www.idiap.ch/private/wheels/gitlab/"
"facenet_model2_20170512-110547.zip",
# this link to dropbox would work for everybody
"https://www.dropbox.com/s/"
"k7bhxe58q7d48g7/facenet_model2_20170512-110547.zip?dl=1",
]
bob.extension.download.download_and_unzip(urls, zip_file)
# code from https://github.com/davidsandberg/facenet
model_exp = os.path.expanduser(self.model_path)
with self.graph.as_default():
if os.path.isfile(model_exp):
logger.info("Model filename: %s" % model_exp)
with tf.compat.v1.gfile.FastGFile(model_exp, "rb") as f:
graph_def = tf.compat.v1.GraphDef()
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def, name="")
else:
logger.info("Model directory: %s" % model_exp)
meta_file, ckpt_file = get_model_filenames(model_exp)
logger.info("Metagraph file: %s" % meta_file)
logger.info("Checkpoint file: %s" % ckpt_file)
saver = tf.compat.v1.train.import_meta_graph(
os.path.join(model_exp, meta_file)
)
saver.restore(self.session, os.path.join(model_exp, ckpt_file))
# Get input and output tensors
self.images_placeholder = self.graph.get_tensor_by_name("input:0")
self.embeddings = self.graph.get_tensor_by_name(self.layer_name)
self.phase_train_placeholder = self.graph.get_tensor_by_name("phase_train:0")
logger.info("Successfully loaded the model.")
self.loaded = True
def transform(self, X, **kwargs):
def _transform(X):
images = self._check_feature(X)
if not self.loaded:
self.load_model()
feed_dict = {
self.images_placeholder: images,
self.phase_train_placeholder: False,
}
features = self.session.run(self.embeddings, feed_dict=feed_dict)
return features
return [_transform(i) for i in X]
@staticmethod
def get_modelpath():
"""
Get default model path.
First we try the to search this path via Global Configuration System.
If we can not find it, we set the path in the directory
`<project>/data`
"""
# Priority to the RC path
model_path = rc[FACENET_MODELPATH_KEY]
if model_path is None:
import pkg_resources
model_path = pkg_resources.resource_filename(
__name__, "data/FaceNet/20170512-110547"
)
return model_path
def __setstate__(self, d):
# Handling unpicklable objects
self.__dict__ = d
def __getstate__(self):
import tensorflow as tf
# Handling unpicklable objects
d = self.__dict__
d.pop("session") if "session" in d else None
d.pop("embeddings") if "embeddings" in d else None
d.pop("graph") if "graph" in d else None
d.pop("images_placeholder") if "images_placeholder" in d else None
d.pop("phase_train_placeholder") if "phase_train_placeholder" in d else None
tf.compat.v1.reset_default_graph()
self.loaded = False
return d
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None):
return self
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
from sklearn.base import TransformerMixin, BaseEstimator
from .tensorflow_compat_v1 import TensorflowCompatV1
class InceptionResnetv2_MsCeleb(TensorflowCompatV1):
"""
Inception Restnet v2 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV2.py
This model was trained using the MsCeleb 1M dataset
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
from bob.learn.tensorflow.network import inception_resnet_v2_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v2_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/inception-v2_batchnorm_rgb.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v2_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v2_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv2_CasiaWebFace(TensorflowCompatV1):
"""
Inception Restnet v2 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV2.py
This model was trained using the Casia WebFace
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
"""Loads the tensorflow model
"""
from bob.learn.tensorflow.network import inception_resnet_v2_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v2_casiawebface_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v2_casiawebface_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v2_casiawebface_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v2_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv1_MsCeleb(TensorflowCompatV1):
"""
Inception Restnet v1 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV1.py
This model was trained using the MsCeleb 1M dataset
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
from bob.learn.tensorflow.network import inception_resnet_v1_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v1_msceleb_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v1_msceleb_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v1_msceleb_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v1_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv1_CasiaWebFace(TensorflowCompatV1):
"""
Inception Restnet v1 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV1.py
This model was trained using the Casia WebFace
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
"""Loads the tensorflow model
"""
from bob.learn.tensorflow.network import inception_resnet_v1_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v1_casiawebface_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v1_casiawebface_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v1_casiawebface_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v1_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
import pkg_resources
import bob.extension.download
from bob.extension import rc
from sklearn.base import TransformerMixin, BaseEstimator
import numpy as np
import logging
from sklearn.utils import check_array
from bob.pipelines.sample import SampleBatch
import copy
logger = logging.getLogger(__name__)
class TensorflowCompatV1(TransformerMixin, BaseEstimator):
"""
Tensorflow v1 compatible set of transformers.
Parameters
----------
checkpoint_filename: str
Path of your checkpoint. If the .meta file is providede the last checkpoint will be loaded.
input_shape: tuple
input_shape: Input shape for the tensorflow neural network
architecture_fn :
A tf.Tensor containing the operations to be executed
"""
def __init__(self, checkpoint_filename, input_shape, architecture_fn):
"""Loads the tensorflow model
"""
self.checkpoint_filename = checkpoint_filename
self.input_shape = input_shape
self.architecture_fn = architecture_fn
self.loaded = False
def transform(self, X):
"""
Forward the data with the loaded neural network
Parameters
----------
X : numpy.ndarray
Input Data
Returns
-------
numpy.ndarray
The features.
"""
def _transform(data):
data = check_array(data, allow_nd=True)
# THE INPUT SHAPE FOR THESE MODELS
# ARE `N x C x H x W`
# If ndim==3 we add another axis
if data.ndim == 3:
data = data[None, ...]
# Making sure it's channels last and has three channels
if data.ndim == 4:
# Just swiping the second dimension if bob format NxCxHxH
if data.shape[1] == 3:
data = np.moveaxis(data, 1, -1)
if data.shape != self.input_shape:
raise ValueError(
f"Image shape {data.shape} not supported. Expected {self.input_shape}"
)
if not self.loaded:
self.load_model()
return self.session.run(self.embedding, feed_dict={self.input_tensor: data},)