Skip to content
Snippets Groups Projects
Commit a950ce94 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Moved transformers moduled to embeddings

parent e4649226
No related branches found
No related tags found
2 merge requests!66Adding some baselines as transformers,!64Dask pipelines
from .facenet_sanderberg import FaceNetSanderberg
from .idiap_inception_resnet import (
InceptionResnetv2_MsCeleb,
InceptionResnetv2_CasiaWebFace,
InceptionResnetv1_MsCeleb,
InceptionResnetv1_CasiaWebFace
)
from .arface import ArcFace_InsightFaceTF
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is shortened.
Parameters:
*args: An iterable of objects to modify
Resolves `Sphinx referencing issues
<https://github.com/sphinx-doc/sphinx/issues/3048>`
"""
for obj in args:
obj.__module__ = __name__
__appropriate__(
FaceNetSanderberg,
InceptionResnetv2_MsCeleb,
InceptionResnetv2_CasiaWebFace,
InceptionResnetv1_MsCeleb,
InceptionResnetv1_CasiaWebFace,
ArcFace_InsightFaceTF
)
__all__ = [_ for _ in dir() if not _.startswith("_")]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
from sklearn.base import TransformerMixin, BaseEstimator
from .tensorflow_compat_v1 import TensorflowCompatV1
from bob.io.image import to_matplotlib
import numpy as np
from sklearn.utils import check_array
class ArcFace_InsightFaceTF(TensorflowCompatV1):
"""
Models copied from
https://github.com/luckycallor/InsightFace-tensorflow/blob/master/backbones/utils.py
The input shape of this model is :math:`3 \times 112 \times 112`
The output embedding is :math:`n \times 512`, where :math:`n` is the number of samples
"""
def __init__(self):
bob_rc_variable = "bob.bio.face.arcface_tf_path"
urls = [
"https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/arcface_insight_tf.tar.gz"
]
model_subdirectory = "arcface_tf_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 112, 112, 3)
architecture_fn = init_network
super().__init__(checkpoint_filename, input_shape, architecture_fn)
def transform(self, data):
# https://github.com/luckycallor/InsightFace-tensorflow/blob/master/evaluate.py#L42
data = check_array(data, allow_nd=True)
data = data / 127.5 - 1.0
return super().transform(data)
def load_model(self):
self.input_tensor = tf.compat.v1.placeholder(
dtype=tf.float32, shape=self.input_shape, name="input_image",
)
prelogits = self.architecture_fn(self.input_tensor)
self.embedding = prelogits
# Initializing the variables of the current graph
self.session = tf.compat.v1.Session()
self.session.run(tf.compat.v1.global_variables_initializer())
# Loading the last checkpoint and overwriting the current variables
saver = tf.compat.v1.train.Saver()
if os.path.splitext(self.checkpoint_filename)[1] == ".meta":
saver.restore(
self.session,
tf.train.latest_checkpoint(os.path.dirname(self.checkpoint_filename)),
)
elif os.path.isdir(self.checkpoint_filename):
saver.restore(
self.session, tf.train.latest_checkpoint(self.checkpoint_filename)
)
else:
saver.restore(self.session, self.checkpoint_filename)
self.loaded = True
###########################
# CODE COPIED FROM
# https://github.com/luckycallor/InsightFace-tensorflow/blob/master/backbones/utils.py
###########################
import tensorflow as tf
import tensorflow.contrib.slim as slim
from collections import namedtuple
def init_network(input_tensor):
with tf.variable_scope("embd_extractor", reuse=False):
arg_sc = resnet_arg_scope()
with slim.arg_scope(arg_sc):
net, _ = resnet_v2_m_50(input_tensor, is_training=False, return_raw=True)
net = slim.batch_norm(net, activation_fn=None, is_training=False)
net = slim.dropout(net, keep_prob=1, is_training=False)
net = slim.flatten(net)
net = slim.fully_connected(net, 512, normalizer_fn=None, activation_fn=None)
net = slim.batch_norm(
net, scale=False, activation_fn=None, is_training=False
)
# end_points['embds'] = net
return net
def resnet_v2_m_50(
inputs,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
spatial_squeeze=True,
reuse=None,
scope="resnet_v2_50",
):
"""ResNet-50 model of [1]. See resnet_v2() for arg and return description."""
blocks = [
resnet_v2_block("block1", base_depth=16, num_units=3, stride=2),
resnet_v2_block("block2", base_depth=32, num_units=4, stride=2),
resnet_v2_block("block3", base_depth=64, num_units=14, stride=2),
resnet_v2_block("block4", base_depth=128, num_units=3, stride=2),
]
return resnet_v2_m(
inputs,
blocks,
num_classes,
is_training=is_training,
return_raw=return_raw,
global_pool=global_pool,
output_stride=output_stride,
include_root_block=True,
spatial_squeeze=spatial_squeeze,
reuse=reuse,
scope=scope,
)
def resnet_v2_block(scope, base_depth, num_units, stride):
return Block(
scope,
block,
[{"depth": base_depth * 4, "stride": stride}]
+ (num_units - 1) * [{"depth": base_depth * 4, "stride": 1}],
)
class Block(namedtuple("Block", ["scope", "unit_fn", "args"])):
"""A named tuple describing a ResNet block.
Its parts are:
scope: The scope of the `Block`.
unit_fn: The ResNet unit function which takes as input a `Tensor` and returns another `Tensor` with the output of the ResNet unit.
args: A list of length equal to the number of units in the `Block`. The list contains one (depth, depth_bottleneck, stride) tuple for each unit in the block to serve as argument to unit_fn.
"""
pass
def resnet_v2_m(
inputs,
blocks,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
include_root_block=True,
spatial_squeeze=True,
reuse=None,
scope=None,
):
with tf.variable_scope(scope, "resnet_v2", [inputs], reuse=reuse) as sc:
end_points_collection = sc.original_name_scope + "_end_points"
with slim.arg_scope(
[slim.conv2d, bottleneck, stack_blocks_dense],
outputs_collections=end_points_collection,
):
with slim.arg_scope([slim.batch_norm], is_training=is_training):
net = inputs
if include_root_block:
if output_stride is not None:
if output_stride % 4 != 0:
raise ValueError(
"The output_stride needs to be a multiple of 4."
)
output_stride /= 4
with slim.arg_scope(
[slim.conv2d], activation_fn=None, normalizer_fn=None
):
net = conv2d_same(net, 64, 3, stride=1, scope="conv1")
# net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool1')
net = stack_blocks_dense(net, blocks, output_stride)
end_points = slim.utils.convert_collection_to_dict(
end_points_collection
)
if return_raw:
return net, end_points
net = slim.batch_norm(net, activation_fn=tf.nn.relu, scope="postnorm")
end_points[sc.name + "/postnorm"] = net
if global_pool:
net = tf.reduce_mean(net, [1, 2], name="pool5", keep_dims=True)
end_points["global_pool"] = net
if num_classes:
net = slim.conv2d(
net,
num_classes,
[1, 1],
activation_fn=None,
normalizer_fn=None,
scope="logits",
)
end_points[sc.name + "/logits"] = net
if spatial_squeeze:
net = tf.squeeze(net, [1, 2], name="SpatialSqueeze")
end_points[sc.name + "/spatial_squeeze"] = net
end_points["predictions"] = slim.softmax(net, scope="predictions")
return net, end_points
def conv2d_same(inputs, num_outputs, kernel_size, stride, rate=1, scope=None):
if stride == 1:
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=1,
rate=rate,
padding="SAME",
scope=scope,
)
else:
kernel_size_effective = kernel_size + (kernel_size - 1) * (rate - 1)
pad_total = kernel_size_effective - 1
pad_beg = pad_total // 2
pad_end = pad_total - pad_beg
inputs = tf.pad(
inputs, [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]]
) # zero padding
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=stride,
rate=rate,
padding="VALID",
scope=scope,
)
@slim.add_arg_scope
def stack_blocks_dense(
net,
blocks,
output_stride=None,
store_non_strided_activations=False,
outputs_collections=None,
):
current_stride = 1
rate = 1
for block in blocks:
with tf.variable_scope(block.scope, "block", [net]) as sc:
block_stride = 1
for i, unit in enumerate(block.args):
if store_non_strided_activations and i == len(block.args) - 1:
block_stride = unit.get("stride", 1)
unit = dict(unit, stride=1)
with tf.variable_scope("unit_%d" % (i + 1), values=[net]):
if output_stride is not None and current_stride == output_stride:
net = block.unit_fn(net, rate=rate, **dict(unit, stride=1))
rate *= unit.get("stride", 1)
else:
net = block.unit_fn(net, rate=1, **unit)
current_stride *= unit.get("stride", 1)
if output_stride is not None and current_stride > output_stride:
raise ValueError(
"The target output_stride cannot be reached."
)
net = slim.utils.collect_named_outputs(outputs_collections, sc.name, net)
if output_stride is not None and current_stride == output_stride:
rate *= block_stride
else:
net = subsample(net, block_stride)
current_stride *= block_stride
if output_stride is not None and current_stride > output_stride:
raise ValueError("The target output_stride cannot be reached.")
if output_stride is not None and current_stride != output_stride:
raise ValueError("The target output_stride cannot be reached.")
return net
def block(inputs, depth, stride, rate=1, outputs_collections=None, scope=None):
with tf.variable_scope(scope, "block_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = conv2d_same(preact, depth, 3, stride, rate=rate, scope="conv1")
residual = slim.conv2d(
residual,
depth,
[3, 3],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv2",
)
# residual = slim.conv2d(residual, depth, [1, 1], stride=1, normalizer_fn=None, activation_fn=None, scope='conv3')
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
@slim.add_arg_scope
def bottleneck(
inputs,
depth,
depth_bottleneck,
stride,
rate=1,
outputs_collections=None,
scope=None,
):
with tf.variable_scope(scope, "bottleneck_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = slim.conv2d(
preact, depth_bottleneck, [1, 1], stride=1, scope="conv1"
)
residual = conv2d_same(
residual, depth_bottleneck, 3, stride, rate=rate, scope="conv2"
)
residual = slim.conv2d(
residual,
depth,
[1, 1],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv3",
)
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
def subsample(inputs, factor, scope=None):
if factor == 1:
return inputs
else:
return slim.max_pool2d(
inputs, [1, 1], stride=factor, scope=scope
) # padding='VALID'
def resnet_arg_scope(
weight_decay=0.0001,
batch_norm_decay=0.9,
batch_norm_epsilon=2e-5,
batch_norm_scale=True,
activation_fn=tf.nn.leaky_relu,
use_batch_norm=True,
batch_norm_updates_collections=tf.GraphKeys.UPDATE_OPS,
):
batch_norm_params = {
"decay": batch_norm_decay,
"epsilon": batch_norm_epsilon,
"scale": batch_norm_scale,
"updates_collections": batch_norm_updates_collections,
"fused": None, # Use fused batch norm if possible.
"param_regularizers": {"gamma": slim.l2_regularizer(weight_decay)},
}
with slim.arg_scope(
[slim.conv2d],
weights_regularizer=slim.l2_regularizer(weight_decay),
weights_initializer=tf.contrib.layers.xavier_initializer(uniform=False),
activation_fn=activation_fn,
normalizer_fn=slim.batch_norm if use_batch_norm else None,
normalizer_params=batch_norm_params,
):
with slim.arg_scope([slim.batch_norm], **batch_norm_params):
with slim.arg_scope([slim.max_pool2d], padding="SAME") as arg_sc:
return arg_sc
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
"""
Wrapper for the free FaceNet variant:
https://github.com/davidsandberg/facenet
Model 20170512-110547
"""
from __future__ import division
from sklearn.base import TransformerMixin, BaseEstimator
import os
import re
import logging
import numpy as np
from bob.ip.color import gray_to_rgb
from bob.io.image import to_matplotlib
from bob.extension import rc
import bob.extension.download
import bob.io.base
from sklearn.utils import check_array
logger = logging.getLogger(__name__)
FACENET_MODELPATH_KEY = "bob.bio.face.facenet_sanderberg_modelpath"
def prewhiten(img):
mean = np.mean(img)
std = np.std(img)
std_adj = np.maximum(std, 1.0 / np.sqrt(img.size))
y = np.multiply(np.subtract(img, mean), 1 / std_adj)
return y
def get_model_filenames(model_dir):
# code from https://github.com/davidsandberg/facenet
files = os.listdir(model_dir)
meta_files = [s for s in files if s.endswith(".meta")]
if len(meta_files) == 0:
raise ValueError("No meta file found in the model directory (%s)" % model_dir)
elif len(meta_files) > 1:
raise ValueError(
"There should not be more than one meta file in the model "
"directory (%s)" % model_dir
)
meta_file = meta_files[0]
max_step = -1
for f in files:
step_str = re.match(r"(^model-[\w\- ]+.ckpt-(\d+))", f)
if step_str is not None and len(step_str.groups()) >= 2:
step = int(step_str.groups()[1])
if step > max_step:
max_step = step
ckpt_file = step_str.groups()[0]
return meta_file, ckpt_file
class FaceNetSanderberg(TransformerMixin, BaseEstimator):
"""Wrapper for the free FaceNet variant:
https://github.com/davidsandberg/facenet
And for a preprocessor you can use::
from bob.bio.face.preprocessor import FaceCrop
# This is the size of the image that this model expects
CROPPED_IMAGE_HEIGHT = 160
CROPPED_IMAGE_WIDTH = 160
# eye positions for frontal images
RIGHT_EYE_POS = (46, 53)
LEFT_EYE_POS = (46, 107)
# Crops the face using eye annotations
preprocessor = FaceCrop(
cropped_image_size=(CROPPED_IMAGE_HEIGHT, CROPPED_IMAGE_WIDTH),
cropped_positions={'leye': LEFT_EYE_POS, 'reye': RIGHT_EYE_POS},
color_channel='rgb'
)
"""
def __init__(
self,
model_path=rc[FACENET_MODELPATH_KEY],
image_size=160,
layer_name="embeddings:0",
**kwargs,
):
super(FaceNetSanderberg, self).__init__()
self.model_path = model_path
self.image_size = image_size
self.layer_name = layer_name
self.loaded = False
self._clean_unpicklables()
def _clean_unpicklables(self):
self.session = None
self.embeddings = None
self.graph = None
self.images_placeholder = None
self.phase_train_placeholder = None
def _check_feature(self, img):
img = check_array(img, allow_nd=True)
def _convert(img):
assert img.shape[-2] == self.image_size
assert img.shape[-3] == self.image_size
img = prewhiten(img)
return img
if img.ndim == 3:
if img.shape[0]==3:
img = np.moveaxis(img, 0, -1)
return _convert(img)[None, ...] # Adding another axis
elif img.ndim == 4:
if img.shape[1]==3:
img = np.moveaxis(img, 1, -1)
return _convert(img)
else:
raise ValueError(f"Image shape {img.shape} not supported")
def load_model(self):
import tensorflow as tf
tf.compat.v1.reset_default_graph()
session_conf = tf.compat.v1.ConfigProto(
intra_op_parallelism_threads=1, inter_op_parallelism_threads=1
)
self.graph = tf.Graph()
self.session = tf.compat.v1.Session(graph=self.graph, config=session_conf)
if self.model_path is None:
self.model_path = self.get_modelpath()
if not os.path.exists(self.model_path):
bob.io.base.create_directories_safe(FaceNetSanderberg.get_modelpath())
zip_file = os.path.join(
FaceNetSanderberg.get_modelpath(), "20170512-110547.zip"
)
urls = [
# This link only works in Idiap CI to save bandwidth.
"http://www.idiap.ch/private/wheels/gitlab/"
"facenet_model2_20170512-110547.zip",
# this link to dropbox would work for everybody
"https://www.dropbox.com/s/"
"k7bhxe58q7d48g7/facenet_model2_20170512-110547.zip?dl=1",
]
bob.extension.download.download_and_unzip(urls, zip_file)
# code from https://github.com/davidsandberg/facenet
model_exp = os.path.expanduser(self.model_path)
with self.graph.as_default():
if os.path.isfile(model_exp):
logger.info("Model filename: %s" % model_exp)
with tf.compat.v1.gfile.FastGFile(model_exp, "rb") as f:
graph_def = tf.compat.v1.GraphDef()
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def, name="")
else:
logger.info("Model directory: %s" % model_exp)
meta_file, ckpt_file = get_model_filenames(model_exp)
logger.info("Metagraph file: %s" % meta_file)
logger.info("Checkpoint file: %s" % ckpt_file)
saver = tf.compat.v1.train.import_meta_graph(
os.path.join(model_exp, meta_file)
)
saver.restore(self.session, os.path.join(model_exp, ckpt_file))
# Get input and output tensors
self.images_placeholder = self.graph.get_tensor_by_name("input:0")
self.embeddings = self.graph.get_tensor_by_name(self.layer_name)
self.phase_train_placeholder = self.graph.get_tensor_by_name("phase_train:0")
logger.info("Successfully loaded the model.")
self.loaded = True
def transform(self, X, **kwargs):
def _transform(X):
images = self._check_feature(X)
if not self.loaded:
self.load_model()
feed_dict = {
self.images_placeholder: images,
self.phase_train_placeholder: False,
}
features = self.session.run(self.embeddings, feed_dict=feed_dict)
return features
if isinstance(X, list):
return [_transform(i) for i in X]
else:
return _transform(X)
@staticmethod
def get_modelpath():
"""
Get default model path.
First we try the to search this path via Global Configuration System.
If we can not find it, we set the path in the directory
`<project>/data`
"""
# Priority to the RC path
model_path = rc[FACENET_MODELPATH_KEY]
if model_path is None:
import pkg_resources
model_path = pkg_resources.resource_filename(
__name__, "data/FaceNet/20170512-110547"
)
return model_path
def __setstate__(self, d):
# Handling unpicklable objects
self.__dict__ = d
self.loaded = False
def __getstate__(self):
import tensorflow as tf
# Handling unpicklable objects
d = self.__dict__
d.pop("session") if "session" in d else None
d.pop("embeddings") if "embeddings" in d else None
d.pop("graph") if "graph" in d else None
d.pop("images_placeholder") if "images_placeholder" in d else None
d.pop("phase_train_placeholder") if "phase_train_placeholder" in d else None
tf.compat.v1.reset_default_graph()
return d
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None):
return self
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
from sklearn.base import TransformerMixin, BaseEstimator
from .tensorflow_compat_v1 import TensorflowCompatV1
class InceptionResnetv2_MsCeleb(TensorflowCompatV1):
"""
Inception Restnet v2 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV2.py
This model was trained using the MsCeleb 1M dataset
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
from bob.learn.tensorflow.network import inception_resnet_v2_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v2_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/inception-v2_batchnorm_rgb.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v2_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v2_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv2_CasiaWebFace(TensorflowCompatV1):
"""
Inception Restnet v2 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV2.py
This model was trained using the Casia WebFace
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
"""Loads the tensorflow model
"""
from bob.learn.tensorflow.network import inception_resnet_v2_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v2_casiawebface_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v2_casiawebface_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v2_casiawebface_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v2_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv1_MsCeleb(TensorflowCompatV1):
"""
Inception Restnet v1 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV1.py
This model was trained using the MsCeleb 1M dataset
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
from bob.learn.tensorflow.network import inception_resnet_v1_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v1_msceleb_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v1_msceleb_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v1_msceleb_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v1_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
class InceptionResnetv1_CasiaWebFace(TensorflowCompatV1):
"""
Inception Restnet v1 model from https://gitlab.idiap.ch/bob/bob.learn.tensorflow/-/blob/1e40a68bfbbb3dd8813c48d50b2f23ff7a399956/bob/learn/tensorflow/network/InceptionResnetV1.py
This model was trained using the Casia WebFace
The input shape of this model is :math:`3 \times 160 \times 160`
The output embedding is :math:`n \times 128`, where :math:`n` is the number of samples
"""
def __init__(self):
"""Loads the tensorflow model
"""
from bob.learn.tensorflow.network import inception_resnet_v1_batch_norm
bob_rc_variable = "bob.bio.face.idiap_inception_resnet_v1_casiawebface_path"
urls = ["https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/idiap_inception_resnet_v1_casiawebface_path.tar.gz"]
model_subdirectory = "idiap_inception_resnet_v1_casiawebface_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 160, 160, 3)
architecture_fn = inception_resnet_v1_batch_norm
super().__init__(checkpoint_filename, input_shape, architecture_fn)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
import pkg_resources
import bob.extension.download
from bob.extension import rc
from sklearn.base import TransformerMixin, BaseEstimator
import numpy as np
import logging
from sklearn.utils import check_array
logger = logging.getLogger(__name__)
class TensorflowCompatV1(TransformerMixin, BaseEstimator):
"""
Tensorflow v1 compatible set of transformers.
Parameters
----------
checkpoint_filename: str
Path of your checkpoint. If the .meta file is providede the last checkpoint will be loaded.
input_shape: tuple
input_shape: Input shape for the tensorflow neural network
architecture_fn :
A tf.Tensor containing the operations to be executed
"""
def __init__(self, checkpoint_filename, input_shape, architecture_fn):
"""Loads the tensorflow model
"""
self.checkpoint_filename = checkpoint_filename
self.input_shape = input_shape
self.architecture_fn = architecture_fn
self.loaded = False
def transform(self, data):
"""
Forward the data with the loaded neural network
Parameters
----------
image : numpy.ndarray
Input Data
Returns
-------
numpy.ndarray
The features.
"""
data = check_array(data, allow_nd=True)
# THE INPUT SHAPE FOR THESE MODELS
# ARE `N x C x H x W`
# If ndim==3 we add another axis
if data.ndim == 3:
data = data[None, ...]
# Making sure it's channels last and has three channels
if data.ndim == 4:
# Just swiping the second dimension if bob format NxCxHxH
if data.shape[1] == 3:
data = np.moveaxis(data, 1, -1)
if data.shape != self.input_shape:
raise ValueError(
f"Image shape {data.shape} not supported. Expected {self.input_shape}"
)
if not self.loaded:
self.load_model()
return self.session.run(self.embedding, feed_dict={self.input_tensor: data},)
def load_model(self):
import tensorflow as tf
logger.info(f"Loading model `{self.checkpoint_filename}`")
tf.compat.v1.reset_default_graph()
self.input_tensor = tf.placeholder(tf.float32, shape=self.input_shape)
# Taking the embedding
prelogits = self.architecture_fn(
tf.stack(
[
tf.image.per_image_standardization(i)
for i in tf.unstack(self.input_tensor)
]
),
mode=tf.estimator.ModeKeys.PREDICT,
)[0]
self.embedding = tf.nn.l2_normalize(prelogits, dim=1, name="embedding")
# Initializing the variables of the current architecture_fn
self.session = tf.compat.v1.Session()
self.session.run(tf.compat.v1.global_variables_initializer())
# Loading the last checkpoint and overwriting the current variables
saver = tf.compat.v1.train.Saver()
if os.path.splitext(self.checkpoint_filename)[1] == ".meta":
saver.restore(
self.session,
tf.train.latest_checkpoint(os.path.dirname(self.checkpoint_filename)),
)
elif os.path.isdir(self.checkpoint_filename):
saver.restore(
self.session, tf.train.latest_checkpoint(self.checkpoint_filename)
)
else:
saver.restore(self.session, self.checkpoint_filename)
self.loaded = True
def __setstate__(self, d):
# Handling unpicklable objects
self.__dict__ = d
self.loaded = False
def __getstate__(self):
import tensorflow as tf
# Handling unpicklable objects
d = self.__dict__
d.pop("session", None)
d.pop("input_tensor", None)
d.pop("embedding", None)
tf.compat.v1.reset_default_graph()
return d
# def __del__(self):
# tf.compat.v1.reset_default_graph()
def get_modelpath(self, bob_rc_variable, model_subdirectory):
"""
Get default model path.
First we try the to search this path via Global Configuration System.
If we can not find it, we set the path in the directory
`<project>/data`
"""
# Priority to the RC path
model_path = rc[bob_rc_variable]
if model_path is None:
model_path = pkg_resources.resource_filename(
__name__, os.path.join("data", model_subdirectory)
)
return model_path
def download_model(self, model_path, urls, zip_file="model.tar.gz"):
"""
Download and unzip a model from some URL.
Parameters
----------
model_path: str
Path where the model is supposed to be stored
urls: list
List of paths where the model is stored
zip_file: str
File name after the download
"""
if not os.path.exists(model_path):
bob.io.base.create_directories_safe(model_path)
zip_file = os.path.join(model_path, zip_file)
bob.extension.download.download_and_unzip(urls, zip_file)
def fit(self, X, y=None):
return self
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
from bob.bio.face.preprocessor import FaceDetect, FaceCrop, Scale
from skimage.transform import resize
import numpy as np
def face_crop_solver(
cropped_image_size,
color_channel="rgb",
cropped_positions=None,
fixed_positions=None,
use_face_detector=False,
):
"""
Decide which face cropper to use.
"""
if use_face_detector:
return FaceDetect(
face_cropper="face-crop-eyes", use_flandmark=True
)
else:
# If there's not cropped positions, just resize
if cropped_positions is None:
return Scale(cropped_image_size)
else:
# Detects the face and crops it without eye detection
return FaceCrop(
cropped_image_size=cropped_image_size,
cropped_positions=cropped_positions,
color_channel=color_channel,
fixed_positions=fixed_positions,
)
import bob.bio.face
import numpy as np
from bob.pipelines import Sample, wrap
def test_facenet():
from bob.bio.face.embeddings import FaceNetSanderberg
np.random.seed(10)
transformer = FaceNetSanderberg()
# Raw data
data = np.random.rand(3, 160, 160).astype("uint8")
output = transformer.transform(data)
assert output.size == 128, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 128, output.shape
def test_idiap_inceptionv2_msceleb():
from bob.bio.face.embeddings import InceptionResnetv2_MsCeleb
np.random.seed(10)
transformer = InceptionResnetv2_MsCeleb()
data = np.random.rand(3, 160, 160).astype("uint8")
output = transformer.transform(data)
assert output.size == 128, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 128, output.shape
def test_idiap_inceptionv2_casia():
from bob.bio.face.embeddings import InceptionResnetv2_CasiaWebFace
np.random.seed(10)
transformer = InceptionResnetv2_CasiaWebFace()
data = np.random.rand(3, 160, 160).astype("uint8")
output = transformer.transform(data)
assert output.size == 128, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 128, output.shape
def test_idiap_inceptionv1_msceleb():
from bob.bio.face.embeddings import InceptionResnetv1_MsCeleb
np.random.seed(10)
transformer = InceptionResnetv1_MsCeleb()
data = np.random.rand(3, 160, 160).astype("uint8")
output = transformer.transform(data)
assert output.size == 128, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 128, output.shape
def test_idiap_inceptionv1_casia():
from bob.bio.face.embeddings import InceptionResnetv1_CasiaWebFace
np.random.seed(10)
transformer = InceptionResnetv1_CasiaWebFace()
data = np.random.rand(3, 160, 160).astype("uint8")
output = transformer.transform(data)
assert output.size == 128, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 128, output.shape
def test_arface_insight_tf():
import tensorflow as tf
tf.compat.v1.reset_default_graph()
from bob.bio.face.embeddings import ArcFace_InsightFaceTF
np.random.seed(10)
transformer = ArcFace_InsightFaceTF()
data = np.random.rand(3, 112, 112).astype("uint8")
output = transformer.transform(data)
assert output.size == 512, output.shape
# Sample Batch
sample = Sample(data)
transformer_sample = wrap(["sample"], transformer)
output = [s.data for s in transformer_sample.transform([sample])][0]
assert output.size == 512, output.shape
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment