Commit bc881775 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Added ArcFace model

parent f5ecbee7
Pipeline #39862 failed with stage
in 13 minutes and 37 seconds
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
"""
Models copied from
https://github.com/luckycallor/InsightFace-tensorflow/blob/master/backbones/utils.py
"""
import os
import tensorflow as tf
import tensorflow.contrib.slim as slim
from bob.extension import rc
from collections import namedtuple
import multiprocessing
import numpy
from bob.io.image import to_matplotlib
ARCFACE_MODELPATH_KEY = "bob.ip.tensorflow_extractor.arcface_modelpath"
_semaphore = multiprocessing.Semaphore()
class ArcFace(object):
"""
Feature extractor using tensorflow
"""
def __init__(self):
"""Loads the tensorflow model
Parameters
----------
checkpoint_filename: str
Path of your checkpoint. If the .meta file is providede the last checkpoint will be loaded.
model :
input_tensor: tf.Tensor used as a data entrypoint. It can be a **tf.placeholder**, the
result of **tf.train.string_input_producer**, etc
graph :
A tf.Tensor containing the operations to be executed
"""
self.image_size = 112
self._clean_unpicklables()
def _clean_unpicklables(self):
self.graph = None
self.session = None
def load_model(self):
backbone_type = "resnet_v2_m_50"
checkpoint_filename = self.get_modelpath()
self.input_tensor = tf.compat.v1.placeholder(
dtype=tf.float32,
shape=(1, self.image_size, self.image_size, 3),
name="input_image",
)
self.graph = init_network(self.input_tensor)
# Initializing the variables of the current graph
self.session = tf.compat.v1.Session()
self.session.run(tf.compat.v1.global_variables_initializer())
# Loading the last checkpoint and overwriting the current variables
saver = tf.compat.v1.train.Saver()
if os.path.splitext(checkpoint_filename)[1] == ".meta":
saver.restore(
self.session,
tf.train.latest_checkpoint(os.path.dirname(checkpoint_filename)),
)
elif os.path.isdir(checkpoint_filename):
saver.restore(self.session, tf.train.latest_checkpoint(checkpoint_filename))
else:
saver.restore(self.session, checkpoint_filename)
def _check_feature(self, img):
img = numpy.ascontiguousarray(img)
assert img.shape[-1] == self.image_size
assert img.shape[-2] == self.image_size
img = to_matplotlib(img)
# https://github.com/luckycallor/InsightFace-tensorflow/blob/master/evaluate.py#L42
img = img / 127.5 - 1.0
img = img.reshape((1, self.image_size, self.image_size, 3))
return img
def __del__(self):
tf.compat.v1.reset_default_graph()
def __call__(self, data):
"""
Forward the data with the loaded neural network
Parameters
----------
image : numpy.ndarray
Input Data
Returns
-------
numpy.ndarray
The features.
"""
data = self._check_feature(data)
with _semaphore:
if self.session is None:
self.load_model()
features = self.session.run(
self.graph, feed_dict={self.input_tensor: data}
)
return features.flatten()
@staticmethod
def get_modelpath():
"""
Get default model path.
First we try the to search this path via Global Configuration System.
If we can not find it, we set the path in the directory
`<project>/data`
"""
# Priority to the RC path
model_path = rc[ARCFACE_MODELPATH_KEY]
if model_path is None:
import pkg_resources
model_path = pkg_resources.resource_filename(__name__, "data/arcface/")
return model_path
def __setstate__(self, d):
# Handling unpicklable objects
self.__dict__ = d
def __getstate__(self):
# Handling unpicklable objects
with _semaphore:
self._clean_unpicklables()
return self.__dict__
def init_network(input_tensor):
with tf.variable_scope("embd_extractor", reuse=False):
arg_sc = resnet_arg_scope()
with slim.arg_scope(arg_sc):
net, _ = resnet_v2_m_50(input_tensor, is_training=False, return_raw=True)
net = slim.batch_norm(net, activation_fn=None, is_training=False)
net = slim.dropout(net, keep_prob=1, is_training=False)
net = slim.flatten(net)
net = slim.fully_connected(net, 512, normalizer_fn=None, activation_fn=None)
net = slim.batch_norm(
net, scale=False, activation_fn=None, is_training=False
)
# end_points['embds'] = net
return net
def resnet_v2_m_50(
inputs,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
spatial_squeeze=True,
reuse=None,
scope="resnet_v2_50",
):
"""ResNet-50 model of [1]. See resnet_v2() for arg and return description."""
blocks = [
resnet_v2_block("block1", base_depth=16, num_units=3, stride=2),
resnet_v2_block("block2", base_depth=32, num_units=4, stride=2),
resnet_v2_block("block3", base_depth=64, num_units=14, stride=2),
resnet_v2_block("block4", base_depth=128, num_units=3, stride=2),
]
return resnet_v2_m(
inputs,
blocks,
num_classes,
is_training=is_training,
return_raw=return_raw,
global_pool=global_pool,
output_stride=output_stride,
include_root_block=True,
spatial_squeeze=spatial_squeeze,
reuse=reuse,
scope=scope,
)
def resnet_v2_block(scope, base_depth, num_units, stride):
return Block(
scope,
block,
[{"depth": base_depth * 4, "stride": stride}]
+ (num_units - 1) * [{"depth": base_depth * 4, "stride": 1}],
)
class Block(namedtuple("Block", ["scope", "unit_fn", "args"])):
"""A named tuple describing a ResNet block.
Its parts are:
scope: The scope of the `Block`.
unit_fn: The ResNet unit function which takes as input a `Tensor` and returns another `Tensor` with the output of the ResNet unit.
args: A list of length equal to the number of units in the `Block`. The list contains one (depth, depth_bottleneck, stride) tuple for each unit in the block to serve as argument to unit_fn.
"""
pass
def resnet_v2_m(
inputs,
blocks,
num_classes=None,
is_training=True,
return_raw=True,
global_pool=True,
output_stride=None,
include_root_block=True,
spatial_squeeze=True,
reuse=None,
scope=None,
):
with tf.variable_scope(scope, "resnet_v2", [inputs], reuse=reuse) as sc:
end_points_collection = sc.original_name_scope + "_end_points"
with slim.arg_scope(
[slim.conv2d, bottleneck, stack_blocks_dense],
outputs_collections=end_points_collection,
):
with slim.arg_scope([slim.batch_norm], is_training=is_training):
net = inputs
if include_root_block:
if output_stride is not None:
if output_stride % 4 != 0:
raise ValueError(
"The output_stride needs to be a multiple of 4."
)
output_stride /= 4
with slim.arg_scope(
[slim.conv2d], activation_fn=None, normalizer_fn=None
):
net = conv2d_same(net, 64, 3, stride=1, scope="conv1")
# net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool1')
net = stack_blocks_dense(net, blocks, output_stride)
end_points = slim.utils.convert_collection_to_dict(
end_points_collection
)
if return_raw:
return net, end_points
net = slim.batch_norm(net, activation_fn=tf.nn.relu, scope="postnorm")
end_points[sc.name + "/postnorm"] = net
if global_pool:
net = tf.reduce_mean(net, [1, 2], name="pool5", keep_dims=True)
end_points["global_pool"] = net
if num_classes:
net = slim.conv2d(
net,
num_classes,
[1, 1],
activation_fn=None,
normalizer_fn=None,
scope="logits",
)
end_points[sc.name + "/logits"] = net
if spatial_squeeze:
net = tf.squeeze(net, [1, 2], name="SpatialSqueeze")
end_points[sc.name + "/spatial_squeeze"] = net
end_points["predictions"] = slim.softmax(net, scope="predictions")
return net, end_points
def conv2d_same(inputs, num_outputs, kernel_size, stride, rate=1, scope=None):
if stride == 1:
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=1,
rate=rate,
padding="SAME",
scope=scope,
)
else:
kernel_size_effective = kernel_size + (kernel_size - 1) * (rate - 1)
pad_total = kernel_size_effective - 1
pad_beg = pad_total // 2
pad_end = pad_total - pad_beg
inputs = tf.pad(
inputs, [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]]
) # zero padding
return slim.conv2d(
inputs,
num_outputs,
kernel_size,
stride=stride,
rate=rate,
padding="VALID",
scope=scope,
)
@slim.add_arg_scope
def stack_blocks_dense(
net,
blocks,
output_stride=None,
store_non_strided_activations=False,
outputs_collections=None,
):
current_stride = 1
rate = 1
for block in blocks:
with tf.variable_scope(block.scope, "block", [net]) as sc:
block_stride = 1
for i, unit in enumerate(block.args):
if store_non_strided_activations and i == len(block.args) - 1:
block_stride = unit.get("stride", 1)
unit = dict(unit, stride=1)
with tf.variable_scope("unit_%d" % (i + 1), values=[net]):
if output_stride is not None and current_stride == output_stride:
net = block.unit_fn(net, rate=rate, **dict(unit, stride=1))
rate *= unit.get("stride", 1)
else:
net = block.unit_fn(net, rate=1, **unit)
current_stride *= unit.get("stride", 1)
if output_stride is not None and current_stride > output_stride:
raise ValueError(
"The target output_stride cannot be reached."
)
net = slim.utils.collect_named_outputs(outputs_collections, sc.name, net)
if output_stride is not None and current_stride == output_stride:
rate *= block_stride
else:
net = subsample(net, block_stride)
current_stride *= block_stride
if output_stride is not None and current_stride > output_stride:
raise ValueError("The target output_stride cannot be reached.")
if output_stride is not None and current_stride != output_stride:
raise ValueError("The target output_stride cannot be reached.")
return net
def block(inputs, depth, stride, rate=1, outputs_collections=None, scope=None):
with tf.variable_scope(scope, "block_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = conv2d_same(preact, depth, 3, stride, rate=rate, scope="conv1")
residual = slim.conv2d(
residual,
depth,
[3, 3],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv2",
)
# residual = slim.conv2d(residual, depth, [1, 1], stride=1, normalizer_fn=None, activation_fn=None, scope='conv3')
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
@slim.add_arg_scope
def bottleneck(
inputs,
depth,
depth_bottleneck,
stride,
rate=1,
outputs_collections=None,
scope=None,
):
with tf.variable_scope(scope, "bottleneck_v2", [inputs]) as sc:
depth_in = slim.utils.last_dimension(inputs.get_shape(), min_rank=4)
preact = slim.batch_norm(inputs, activation_fn=tf.nn.leaky_relu, scope="preact")
if depth == depth_in:
shortcut = subsample(inputs, stride, "shortcut")
else:
shortcut = slim.conv2d(
preact,
depth,
[1, 1],
stride=stride,
normalizer_fn=None,
activation_fn=None,
scope="shortcut",
)
residual = slim.conv2d(
preact, depth_bottleneck, [1, 1], stride=1, scope="conv1"
)
residual = conv2d_same(
residual, depth_bottleneck, 3, stride, rate=rate, scope="conv2"
)
residual = slim.conv2d(
residual,
depth,
[1, 1],
stride=1,
normalizer_fn=None,
activation_fn=None,
scope="conv3",
)
output = shortcut + residual
return slim.utils.collect_named_outputs(outputs_collections, sc.name, output)
def subsample(inputs, factor, scope=None):
if factor == 1:
return inputs
else:
return slim.max_pool2d(
inputs, [1, 1], stride=factor, scope=scope
) # padding='VALID'
def resnet_arg_scope(
weight_decay=0.0001,
batch_norm_decay=0.9,
batch_norm_epsilon=2e-5,
batch_norm_scale=True,
activation_fn=tf.nn.leaky_relu,
use_batch_norm=True,
batch_norm_updates_collections=tf.GraphKeys.UPDATE_OPS,
):
batch_norm_params = {
"decay": batch_norm_decay,
"epsilon": batch_norm_epsilon,
"scale": batch_norm_scale,
"updates_collections": batch_norm_updates_collections,
"fused": None, # Use fused batch norm if possible.
"param_regularizers": {"gamma": slim.l2_regularizer(weight_decay)},
}
with slim.arg_scope(
[slim.conv2d],
weights_regularizer=slim.l2_regularizer(weight_decay),
weights_initializer=tf.contrib.layers.xavier_initializer(uniform=False),
activation_fn=activation_fn,
normalizer_fn=slim.batch_norm if use_batch_norm else None,
normalizer_params=batch_norm_params,
):
with slim.arg_scope([slim.batch_norm], **batch_norm_params):
with slim.arg_scope([slim.max_pool2d], padding="SAME") as arg_sc:
return arg_sc
......@@ -12,7 +12,7 @@ def get_config():
from .FaceNet import FaceNet
from .MTCNN import MTCNN
from .Extractor import Extractor
from .ArcFace import ArcFace
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
......
......@@ -3,6 +3,7 @@ import bob.io.image
from bob.io.base.test_utils import datafile
import numpy
import json
from .ArcFace import ArcFace
numpy.random.seed(10)
......@@ -16,6 +17,14 @@ def test_facenet():
assert output.size == 128, output.shape
def test_arcface():
from bob.ip.tensorflow_extractor import FaceNet
extractor = ArcFace()
data = numpy.random.rand(3, 112, 112).astype("uint8")
output = extractor(data)
assert output.size == 512, output.shape
def test_mtcnn():
test_image = datafile("mtcnn/test_image.png", __name__)
ref_numbers = datafile("mtcnn/mtcnn.hdf5", __name__)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment