Commit 179c75a3 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Merge branch 'arcface' into 'master'

Arcface from InsightFace

See merge request !79
parents 14fa2b41 c1339770
Pipeline #45495 canceled with stages
in 4 minutes and 52 seconds
......@@ -14,3 +14,4 @@ dist
record.txt
build/
bob/bio/face/embeddings/data
*.DS_Store
from bob.bio.face.embeddings import ArcFace_InsightFaceTF
from bob.bio.face.embeddings import ArcFaceInsightFace
from bob.bio.face.config.baseline.helpers import embedding_transformer_112x112
from bob.bio.base.pipelines.vanilla_biometrics import (
Distance,
......@@ -16,7 +16,7 @@ else:
def load(annotation_type, fixed_positions=None):
transformer = embedding_transformer_112x112(
ArcFace_InsightFaceTF(), annotation_type, fixed_positions
ArcFaceInsightFace(), annotation_type, fixed_positions, color_channel="rgb"
)
algorithm = Distance()
......
......@@ -111,7 +111,7 @@ def legacy_default_cropping(cropped_image_size, annotation_type):
return cropped_positions
def embedding_transformer(cropped_image_size, embedding, annotation_type, cropped_positions, fixed_positions=None):
def embedding_transformer(cropped_image_size, embedding, annotation_type, cropped_positions, fixed_positions=None, color_channel = "rgb"):
"""
Creates a pipeline composed by and FaceCropper and an Embedding extractor.
This transformer is suited for Facenet based architectures
......@@ -120,8 +120,6 @@ def embedding_transformer(cropped_image_size, embedding, annotation_type, croppe
This will resize images to the requested `image_size`
"""
color_channel = "rgb"
face_cropper = face_crop_solver(
cropped_image_size,
color_channel=color_channel,
......@@ -142,7 +140,7 @@ def embedding_transformer(cropped_image_size, embedding, annotation_type, croppe
return transformer
def embedding_transformer_160x160(embedding, annotation_type, fixed_positions):
def embedding_transformer_160x160(embedding, annotation_type, fixed_positions, color_channel="rgb"):
"""
Creates a pipeline composed by and FaceCropper and an Embedding extractor.
This transformer is suited for Facenet based architectures
......@@ -153,10 +151,10 @@ def embedding_transformer_160x160(embedding, annotation_type, fixed_positions):
"""
cropped_positions = embedding_transformer_default_cropping((160, 160), annotation_type)
return embedding_transformer((160, 160), embedding, annotation_type, cropped_positions, fixed_positions)
return embedding_transformer((160, 160), embedding, annotation_type, cropped_positions, fixed_positions, color_channel=color_channel)
def embedding_transformer_112x112(embedding, annotation_type, fixed_positions):
def embedding_transformer_112x112(embedding, annotation_type, fixed_positions, color_channel="rgb"):
"""
Creates a pipeline composed by and FaceCropper and an Embedding extractor.
This transformer is suited for Facenet based architectures
......@@ -166,7 +164,6 @@ def embedding_transformer_112x112(embedding, annotation_type, fixed_positions):
"""
cropped_image_size = (112, 112)
if annotation_type == "eyes-center":
# Hard coding eye positions for backward consistency
cropped_positions = {'leye': (32, 77), 'reye': (32, 34)}
......@@ -174,7 +171,7 @@ def embedding_transformer_112x112(embedding, annotation_type, fixed_positions):
# Will use default
cropped_positions = embedding_transformer_default_cropping(cropped_image_size, annotation_type)
return embedding_transformer(cropped_image_size, embedding, annotation_type, cropped_positions, fixed_positions)
return embedding_transformer(cropped_image_size, embedding, annotation_type, cropped_positions, fixed_positions, color_channel=color_channel)
def crop_80x64(annotation_type, fixed_positions=None, color_channel="gray"):
......
......@@ -34,6 +34,8 @@ from .tf2_inception_resnet import (
FaceNetSanderberg_20170512_110547
)
from .mxnet_models import ArcFaceInsightFace
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
......@@ -56,6 +58,7 @@ __appropriate__(
InceptionResnetv1_MsCeleb_CenterLoss_2018,
InceptionResnetv2_Casia_CenterLoss_2018,
InceptionResnetv1_Casia_CenterLoss_2018,
FaceNetSanderberg_20170512_110547
FaceNetSanderberg_20170512_110547,
ArcFaceInsightFace
)
__all__ = [_ for _ in dir() if not _.startswith("_")]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import os
from sklearn.base import TransformerMixin, BaseEstimator
from .tensorflow_compat_v1 import TensorflowCompatV1
from bob.io.image import to_matplotlib
import numpy as np
from sklearn.utils import check_array
class ArcFace_InsightFaceTF(TensorflowCompatV1):
"""
Models copied from
https://github.com/luckycallor/InsightFace-tensorflow/blob/master/backbones/utils.py
The input shape of this model is :math:`3 \times 112 \times 112`
The output embedding is :math:`n \times 512`, where :math:`n` is the number of samples
"""
def __init__(self):
bob_rc_variable = "bob.bio.face.arcface_tf_path"
urls = [
"https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/arcface_insight_tf.tar.gz"
]
model_subdirectory = "arcface_tf_path"
checkpoint_filename = self.get_modelpath(bob_rc_variable, model_subdirectory)
self.download_model(checkpoint_filename, urls)
input_shape = (1, 112, 112, 3)
architecture_fn = init_network
super().__init__(checkpoint_filename, input_shape, architecture_fn)
def transform(self, data):
# https://github.com/luckycallor/InsightFace-tensorflow/blob/master/evaluate.py#L42
data = check_array(data, allow_nd=True)
data = data / 127.5 - 1.0
return super().transform(data)
def load_model(self):
self.input_tensor = tf.compat.v1.placeholder(
dtype=tf.float32, shape=self.input_shape, name="input_image",
)
prelogits = self.architecture_fn(self.input_tensor)
self.embedding = prelogits
# Initializing the variables of the current graph
self.session = tf.compat.v1.Session()
self.session.run(tf.compat.v1.global_variables_initializer())
# Loading the last checkpoint and overwriting the current variables
saver = tf.compat.v1.train.Saver()
if os.path.splitext(self.checkpoint_filename)[1] == ".meta":
saver.restore(
self.session,
tf.train.latest_checkpoint(os.path.dirname(self.checkpoint_filename)),
)
elif os.path.isdir(self.checkpoint_filename):
saver.restore(
self.session, tf.train.latest_checkpoint(self.checkpoint_filename)
)
else:
saver.restore(self.session, self.checkpoint_filename)
self.loaded = True
###########################
# CODE COPIED FROM
# https://github.com/luckycallor/InsightFace-tensorflow/blob/master/backbones/utils.py
###########################
import tensorflow as tf
import tensorflow.contrib.slim as slim
from collections import namedtuple
def init_network(input_tensor):
with tf.variable_scope("embd_extractor", reuse=False):
arg_sc = resnet_arg_scope()
with slim.arg_scope(arg_sc):
net, _ = resnet_v2_m_50(input_tensor, is_training=False, return_raw=True)
net = slim.batch_norm(net, activation_fn=None, is_training=False)
net = slim.dropout(net, keep_prob=1, is_training=False)
net = slim.flatten(net)
net = slim.fully_connected(net, 512, normalizer_fn=None, activation_fn=None)
net = slim.batch_norm(
net, scale=False, activation_fn=None, is_training=False
)
# end_points['embds'] = net
return net
def resnet_v2_m_50(
inputs,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
spatial_squeeze=True,
reuse=None,
scope="resnet_v2_50",
):
"""ResNet-50 model of [1]. See resnet_v2() for arg and return description."""
blocks = [
resnet_v2_block("block1", base_depth=16, num_units=3, stride=2),
resnet_v2_block("block2", base_depth=32, num_units=4, stride=2),
resnet_v2_block("block3", base_depth=64, num_units=14, stride=2),
resnet_v2_block("block4", base_depth=128, num_units=3, stride=2),
]
return resnet_v2_m(
inputs,
blocks,
num_classes,
is_training=is_training,
return_raw=return_raw,
global_pool=global_pool,
output_stride=output_stride,
include_root_block=True,
spatial_squeeze=spatial_squeeze,
reuse=reuse,
scope=scope,
)
def resnet_v2_block(scope, base_depth, num_units, stride):
return Block(
scope,
block,
[{"depth": base_depth * 4, "stride": stride}]
+ (num_units - 1) * [{"depth": base_depth * 4, "stride": 1}],
)
class Block(namedtuple("Block", ["scope", "unit_fn", "args"])):
"""A named tuple describing a ResNet block.
Its parts are:
scope: The scope of the `Block`.
unit_fn: The ResNet unit function which takes as input a `Tensor` and returns another `Tensor` with the output of the ResNet unit.
args: A list of length equal to the number of units in the `Block`. The list contains one (depth, depth_bottleneck, stride) tuple for each unit in the block to serve as argument to unit_fn.
"""
pass
def resnet_v2_m(
inputs,
blocks,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
include_root_block=True,
spatial_squeeze=True,
reuse=None,
scope=None,
):
with tf.variable_scope(scope, "resnet_v2", [inputs], reuse=reuse) as sc:
end_points_collection = sc.original_name_scope + "_end_points"
with slim.arg_scope(
[slim.conv2d, bottleneck, stack_blocks_dense],
outputs_collections=end_points_collection,
):
with slim.arg_scope([slim.batch_norm], is_training=is_training):
net = inputs
if include_root_block:
if output_stride is not None:
if output_stride % 4 != 0:
raise ValueError(
"The output_stride needs to be a multiple of 4."
)
output_stride /= 4
with slim.arg_scope(
[slim.conv2d], activation_fn=None, normalizer_fn=None
):
net = conv2d_same(net, 64, 3, stride=1, scope="conv1")
# net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool1')
net = stack_blocks_dense(net, blocks, output_stride)
end_points = slim.utils.convert_collection_to_dict(
end_points_collection
)
if return_raw:
return net, end_points
net = slim.batch_norm(net, activation_fn=tf.nn.relu, scope="postnorm")
end_points[sc.name + "/postnorm"] = net
if global_pool:
net = tf.reduce_mean(net, [1, 2], name="pool5", keep_dims=True)
end_points["global_pool"] = net
if num_classes:
net = slim.conv2d(
net,
num_classes,
[1, 1],
activation_fn=None,
normalizer_fn=None,
scope="logits",
)
end_points[sc.name + "/logits"] = net
if spatial_squeeze:
net = tf.squeeze(net, [1, 2], name="SpatialSqueeze")
end_points[sc.name + "/spatial_squeeze"] = net
end_points["predictions"] = slim.softmax(net, scope="predictions")
return net, end_points
def conv2d_same(inputs, num_outputs, kernel_size, stride, rate=1, scope=None):
if stride == 1:
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=1,
rate=rate,
padding="SAME",
scope=scope,
)
else:
kernel_size_effective = kernel_size + (kernel_size - 1) * (rate - 1)
pad_total = kernel_size_effective - 1
pad_beg = pad_total // 2
pad_end = pad_total - pad_beg
inputs = tf.pad(
inputs, [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]]
) # zero padding
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=stride,
rate=rate,
padding="VALID",
scope=scope,
)
@slim.add_arg_scope
def stack_blocks_dense(
net,
blocks,
output_stride=None,
store_non_strided_activations=False,
outputs_collections=None,
):
current_stride = 1
rate = 1
for block in blocks:
with tf.variable_scope(block.scope, "block", [net]) as sc:
block_stride = 1
for i, unit in enumerate(block.args):
if store_non_strided_activations and i == len(block.args) - 1:
block_stride = unit.get("stride", 1)
unit = dict(unit, stride=1)
with tf.variable_scope("unit_%d" % (i + 1), values=[net]):
if output_stride is not None and current_stride == output_stride:
net = block.unit_fn(net, rate=rate, **dict(unit, stride=1))
rate *= unit.get("stride", 1)
else:
net = block.unit_fn(net, rate=1, **unit)
current_stride *= unit.get("stride", 1)
if output_stride is not None and current_stride > output_stride:
raise ValueError(
"The target output_stride cannot be reached."
)
net = slim.utils.collect_named_outputs(outputs_collections, sc.name, net)
if output_stride is not None and current_stride == output_stride:
rate *= block_stride
else:
net = subsample(net, block_stride)
current_stride *= block_stride
if output_stride is not None and current_stride > output_stride:
raise ValueError("The target output_stride cannot be reached.")
if output_stride is not None and current_stride != output_stride:
raise ValueError("The target output_stride cannot be reached.")
return net
def block(inputs, depth, stride, rate=1, outputs_collections=None, scope=None):
with tf.variable_scope(scope, "block_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = conv2d_same(preact, depth, 3, stride, rate=rate, scope="conv1")
residual = slim.conv2d(
residual,
depth,
[3, 3],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv2",
)
# residual = slim.conv2d(residual, depth, [1, 1], stride=1, normalizer_fn=None, activation_fn=None, scope='conv3')
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
@slim.add_arg_scope
def bottleneck(
inputs,
depth,
depth_bottleneck,
stride,
rate=1,
outputs_collections=None,
scope=None,
):
with tf.variable_scope(scope, "bottleneck_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = slim.conv2d(
preact, depth_bottleneck, [1, 1], stride=1, scope="conv1"
)
residual = conv2d_same(
residual, depth_bottleneck, 3, stride, rate=rate, scope="conv2"
)
residual = slim.conv2d(
residual,
depth,
[1, 1],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv3",
)
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
def subsample(inputs, factor, scope=None):
if factor == 1:
return inputs
else:
return slim.max_pool2d(
inputs, [1, 1], stride=factor, scope=scope
) # padding='VALID'
def resnet_arg_scope(
weight_decay=0.0001,
batch_norm_decay=0.9,
batch_norm_epsilon=2e-5,
batch_norm_scale=True,
activation_fn=tf.nn.leaky_relu,
use_batch_norm=True,
batch_norm_updates_collections=tf.GraphKeys.UPDATE_OPS,
):
batch_norm_params = {
"decay": batch_norm_decay,
"epsilon": batch_norm_epsilon,
"scale": batch_norm_scale,
"updates_collections": batch_norm_updates_collections,
"fused": None, # Use fused batch norm if possible.
"param_regularizers": {"gamma": slim.l2_regularizer(weight_decay)},
}
with slim.arg_scope(
[slim.conv2d],
weights_regularizer=slim.l2_regularizer(weight_decay),
weights_initializer=tf.contrib.layers.xavier_initializer(uniform=False),
activation_fn=activation_fn,
normalizer_fn=slim.batch_norm if use_batch_norm else None,
normalizer_params=batch_norm_params,
):
with slim.arg_scope([slim.batch_norm], **batch_norm_params):
with slim.arg_scope([slim.max_pool2d], padding="SAME") as arg_sc:
return arg_sc
"""
Load and predict using checkpoints based on mxnet
"""
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.utils import check_array
import numpy as np
from bob.bio.face.embeddings import download_model
import pkg_resources
import os
from bob.extension import rc
class ArcFaceInsightFace(TransformerMixin, BaseEstimator):
"""
ArcFace from Insight Face.
Model and source code taken from the repository
https://github.com/deepinsight/insightface/blob/master/python-package/insightface/model_zoo/face_recognition.py
"""
def __init__(self, use_gpu=False, **kwargs):
super().__init__(**kwargs)
self.model = None
self.use_gpu = use_gpu
internal_path = pkg_resources.resource_filename(
__name__, os.path.join("data", "arcface_insightface"),
)
checkpoint_path = (
internal_path
if rc["bob.bio.face.models.ArcFaceInsightFace"] is None
else rc["bob.bio.face.models.ArcFaceInsightFace"]
)
urls = [
"https://www.idiap.ch/software/bob/data/bob/bob.bio.face/master/mxnet/arcface_r100_v1_mxnet.tar.gz"
]
download_model(checkpoint_path, urls, "arcface_r100_v1_mxnet.tar.gz")
self.checkpoint_path = checkpoint_path
def load_model(self):
import mxnet as mx
sym, arg_params, aux_params = mx.model.load_checkpoint(
os.path.join(self.checkpoint_path, "model"), 0
)
all_layers = sym.get_internals()
sym = all_layers["fc1_output"]
# LOADING CHECKPOINT
ctx = mx.gpu() if self.use_gpu else mx.cpu()
model = mx.mod.Module(symbol=sym, context=ctx, label_names=None)
data_shape = (1, 3, 112, 112)
model.bind(data_shapes=[("data", data_shape)])
model.set_params(arg_params, aux_params)
# warmup
data = mx.nd.zeros(shape=data_shape)
db = mx.io.DataBatch(data=(data,))
model.forward(db, is_train=False)
embedding = model.get_outputs()[0].asnumpy()
self.model = model
def transform(self, X):
import mxnet as mx
if self.model is None:
self.load_model()
X = check_array(X, allow_nd=True)
X = mx.nd.array(X)
db = mx.io.DataBatch(data=(X,))
self.model.forward(db, is_train=False)
return self.model.get_outputs()[0].asnumpy()
def __getstate__(self):
# Handling unpicklable objects
d = self.__dict__.copy()
d["model"] = None
return d
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
......@@ -49,7 +49,7 @@ class InceptionResnet(TransformerMixin, BaseEstimator):
if self.preprocessor is not None: