Commit ffbe30af authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Merge branch 'vuln-csv' into 'master'

Vulnerability framework - CSV datasets

See merge request !106
parents bf618295 e9fbd67a
Pipeline #51429 failed with stage
in 239 minutes and 27 seconds
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Yannick Dayer <yannick.dayer@idiap.ch>
"""Replay-mobile CSV database interface configuration
The Replay-Mobile Database for face spoofing consists of video clips of
photo and video attack attempts under different lighting conditions.
The vulnerability analysis pipeline uses single frames extracted from the
videos to be accepted by most face recognition systems.
Feed this file (defined as resource: ``replaymobile-img``) to ``bob bio pipelines`` as
configuration:
$ bob bio pipelines vanilla-biometrics -v --write-metadata-scores replaymobile-img inception-resnetv2-msceleb
$ bob bio pipelines vanilla-biometrics -v --write-metadata-scores my_config/protocol.py replaymobile-img inception-resnetv2-msceleb
"""
from bob.bio.face.database.replaymobile import ReplayMobileBioDatabase
default_protocol = "grandtest"
if "protocol" not in locals():
protocol = default_protocol
database = ReplayMobileBioDatabase(
protocol=protocol,
)
#!/usr/bin/env python
from bob.bio.face.database import ReplayMobileBioDatabase
from bob.bio.base.pipelines.vanilla_biometrics import DatabaseConnector
from bob.extension import rc
replay_mobile_directory = rc["bob.db.replay_mobile.directory"]
database = DatabaseConnector(
ReplayMobileBioDatabase(
original_directory=replay_mobile_directory,
original_extension=".mov",
protocol="grandtest-licit",
),
annotation_type="bounding-box",
fixed_positions=None,
)
#!/usr/bin/env python
from bob.bio.face.database import ReplayMobileBioDatabase
from bob.bio.base.pipelines.vanilla_biometrics import DatabaseConnector
from bob.extension import rc
replay_mobile_directory = rc["bob.db.replay_mobile.directory"]
database = DatabaseConnector(
ReplayMobileBioDatabase(
original_directory=replay_mobile_directory,
original_extension=".mov",
protocol="grandtest-spoof",
),
annotation_type="bounding-box",
fixed_positions=None,
# Only compare with spoofs from the same target identity
allow_scoring_with_all_biometric_references=False,
)
#!/usr/bin/env python #!/usr/bin/env python
# vim: set fileencoding=utf-8 : # Yannick Dayer <yannick.dayer@idiap.ch>
""" The Replay-Mobile Database for face spoofing implementation of from bob.bio.base.database import CSVDataset, CSVToSampleLoaderBiometrics
bob.bio.base.database.BioDatabase interface.""" from bob.pipelines.sample_loaders import AnnotationsLoader
from bob.pipelines.sample import DelayedSample
from .database import FaceBioFile from bob.db.base.annotations import read_annotation_file
from bob.bio.base.database import BioDatabase from bob.extension.download import get_file
from bob.io.video import reader
from bob.extension import rc from bob.extension import rc
from sklearn.pipeline import make_pipeline
import functools
import os.path
import logging
import numpy
class ReplayMobileBioFile(FaceBioFile): logger = logging.getLogger(__name__)
"""FaceBioFile implementation of the Replay Mobile Database"""
def __init__(self, f):
super(ReplayMobileBioFile, self).__init__(client_id=f.client_id, path=f.path, file_id=f.id)
self._f = f
def load(self, directory=None, extension=None): def load_frame_from_file_replaymobile(file_name, frame, should_flip):
if extension in (None, '.mov'): """Loads a single frame from a video file for replay-mobile.
return self._f.load(directory, extension)
else: This function uses bob's video reader utility that does not load the full
return super(ReplayMobileBioFile, self).load(directory, extension) video in memory to just access one frame.
Parameters
----------
file_name: str
The video file to load the frames from
@property frame: None or list of int
def annotations(self): The index of the frame to load.
return self._f.annotations
capturing device: str
'mobile' devices' frames will be flipped vertically.
Other devices' frames will not be flipped.
class ReplayMobileBioDatabase(BioDatabase): Returns
-------
images: 3D numpy array
The frame of the video in bob format (channel, height, width)
""" """
ReplayMobile database implementation of :py:class:`bob.bio.base.database.BioDatabase` interface. logger.debug(f"Reading frame {frame} from '{file_name}'")
It is an extension of an SQL-based database interface, which directly talks to ReplayMobile database, for video_reader = reader(file_name)
verification experiments (good to use in bob.bio.base framework). image = video_reader[frame]
# Image captured by the 'mobile' device are flipped vertically.
# (Images were captured horizontally and bob.io.video does not read the
# metadata correctly, whether it was on the right or left side)
if should_flip:
image = numpy.flip(image, 2)
# Convert to bob format (channel, height, width)
image = numpy.transpose(image, (0, 2, 1))
return image
class ReplayMobileCSVFrameSampleLoader(CSVToSampleLoaderBiometrics):
"""A loader transformer returning a specific frame of a video file.
This is specifically tailored for replay-mobile. It uses a specific loader
that processes the `should_flip` metadata to correctly orient the frames.
""" """
def __init__(self, max_number_of_frames=None, def __init__(
annotation_directory=None, self,
annotation_extension='.json', dataset_original_directory="",
annotation_type='json', extension="",
original_directory=rc['bob.db.replaymobile.directory'], reference_id_equal_subject_id=True,
original_extension='.mov', ):
name='replay-mobile', super().__init__(
**kwargs): data_loader=None,
from bob.db.replaymobile.verificationprotocol import Database as LowLevelDatabase extension=extension,
self._db = LowLevelDatabase( dataset_original_directory=dataset_original_directory,
max_number_of_frames,
original_directory=original_directory,
original_extension=original_extension,
annotation_directory=annotation_directory,
annotation_extension=annotation_extension,
annotation_type=annotation_type,
) )
self.reference_id_equal_subject_id = reference_id_equal_subject_id
self.references_list = []
# call base class constructors to open a session to the database def convert_row_to_sample(self, row, header):
super(ReplayMobileBioDatabase, self).__init__( """Creates a sample given a row of the CSV protocol definition."""
name=name, fields = dict([[str(h).lower(), r] for h, r in zip(header, row)])
original_directory=original_directory,
original_extension=original_extension,
annotation_directory=annotation_directory,
annotation_extension=annotation_extension,
annotation_type=annotation_type,
**kwargs)
self._kwargs['max_number_of_frames'] = max_number_of_frames
@property if self.reference_id_equal_subject_id:
def original_directory(self): fields["subject_id"] = fields["reference_id"]
return self._db.original_directory else:
if "subject_id" not in fields:
raise ValueError(f"`subject_id` not available in {header}")
if "should_flip" not in fields:
raise ValueError(f"`should_flip` not available in {header}")
if "purpose" not in fields:
raise ValueError(f"`purpose` not available in {header}")
@original_directory.setter kwargs = {k: fields[k] for k in fields.keys() - {"id", "should_flip"}}
def original_directory(self, value):
self._db.original_directory = value
@property # Retrieve the references list
def original_extension(self): if (
return self._db.original_extension fields["purpose"].lower() == "enroll"
and fields["reference_id"] not in self.references_list
):
self.references_list.append(fields["reference_id"])
# Set the references list in the probes for vanilla-biometrics
if fields["purpose"].lower() != "enroll":
if fields["attack_type"]:
# Attacks are only compare to their target (no `spoof_neg`)
kwargs["references"] = [fields["reference_id"]]
else:
kwargs["references"] = self.references_list
# One row leads to multiple samples (different frames)
return DelayedSample(
functools.partial(
load_frame_from_file_replaymobile,
file_name=os.path.join(
self.dataset_original_directory, fields["path"] + self.extension
),
frame=int(fields["frame"]),
should_flip=fields["should_flip"] == "TRUE",
),
key=fields["id"],
**kwargs,
)
@original_extension.setter
def original_extension(self, value):
self._db.original_extension = value
@property def read_frame_annotation_file_replaymobile(file_name, frame, annotations_type="json"):
def annotation_directory(self): """Returns the bounding-box for one frame of a video file of replay-mobile.
return self._db.annotation_directory
@annotation_directory.setter Given an annnotation file location and a frame number, returns the bounding
def annotation_directory(self, value): box coordinates corresponding to the frame.
self._db.annotation_directory = value
@property The replay-mobile annotation files are composed of 4 columns and N rows for
def annotation_extension(self): N frames of the video:
return self._db.annotation_extension
@annotation_extension.setter 120 230 40 40
def annotation_extension(self, value): 125 230 40 40
self._db.annotation_extension = value ...
<x> <y> <w> <h>
@property Parameters
def annotation_type(self): ----------
return self._db.annotation_type
@annotation_type.setter file_name: str
def annotation_type(self, value): The annotation file name (relative to annotations_path).
self._db.annotation_type = value
def protocol_names(self): frame: int
return self._db.protocols() The video frame index.
"""
logger.debug(f"Reading annotation file '{file_name}', frame {frame}.")
def groups(self): video_annotations = read_annotation_file(
return self._db.groups() file_name, annotation_type=annotations_type
)
# read_annotation_file returns an ordered dict with str keys as frame number
frame_annotations = video_annotations[str(frame)]
if frame_annotations is None:
logger.warning(
f"Annotation for file '{file_name}' at frame {frame} was 'null'."
)
return frame_annotations
def annotations(self, myfile):
"""Will return the bounding box annotation of nth frame of the video."""
return myfile.annotations
def model_ids_with_protocol(self, groups=None, protocol=None, **kwargs): class FrameBoundingBoxAnnotationLoader(AnnotationsLoader):
return self._db.model_ids_with_protocol(groups, protocol, **kwargs) """A transformer that adds bounding-box to a sample from annotations files.
def objects(self, groups=None, protocol=None, purposes=None, model_ids=None, **kwargs): Parameters
return [ReplayMobileBioFile(f) for f in self._db.objects(groups, protocol, purposes, model_ids, **kwargs)] ----------
def arrange_by_client(self, files): annotation_directory: str or None
client_files = {} """
for file in files:
if str(file.client_id) not in client_files:
client_files[str(file.client_id)] = []
client_files[str(file.client_id)].append(file)
files_by_clients = [] def __init__(
for client in sorted(client_files.keys()): self, annotation_directory=None, annotation_extension=".json", **kwargs
files_by_clients.append(client_files[client]) ):
return files_by_clients super().__init__(
annotation_directory=annotation_directory,
annotation_extension=annotation_extension,
**kwargs,
)
def transform(self, X):
"""Adds the bounding-box annotations to a series of samples."""
if self.annotation_directory is None:
return None
annotated_samples = []
for x in X:
# Adds the annotations as delayed_attributes, loading them when needed
annotated_samples.append(
DelayedSample(
x._load,
parent=x,
delayed_attributes=dict(
annotations=functools.partial(
read_frame_annotation_file_replaymobile,
file_name=f"{self.annotation_directory}:{x.path}{self.annotation_extension}",
frame=int(x.frame),
annotations_type=self.annotation_type,
)
),
)
)
return annotated_samples
class ReplayMobileBioDatabase(CSVDataset):
"""Database interface that loads a csv definition for replay-mobile
Looks for the protocol definition files (structure of CSV files). If not
present, downloads them.
Then sets the data and annotation paths from __init__ parameters or from
the configuration (``bob config`` command).
Parameters
----------
protocol_name: str
The protocol to use. Must be a sub-folder of ``protocol_definition_path``
protocol_definition_path: str or None
Specifies a path where to fetch the database definition from.
(See :py:func:`bob.extension.download.get_file`)
If None: Downloads the file in the path from ``bob_data_folder`` config.
If None and the config does not exist: Downloads the file in ``~/bob_data``.
data_path: str or None
Overrides the config-defined data location.
If None: uses the ``bob.db.replaymobile.directory`` config.
If None and the config does not exist, set as cwd.
annotation_path: str or None
Specifies a path where the annotation files are located.
If None: Downloads the files to the path poited by the
``bob.db.replaymobile.annotation_directory`` config.
If None and the config does not exist: Downloads the file in ``~/bob_data``.
"""
def __init__(
self,
protocol="grandtest",
protocol_definition_path=None,
data_path=None,
data_extension=".mov",
annotations_path=None,
annotations_extension=".json",
**kwargs,
):
if protocol_definition_path is None:
# Downloading database description files if it is not specified
proto_def_name = "bio-face-replaymobile-img-3a584a97.tar.gz"
proto_def_urls = [
f"https://www.idiap.ch/software/bob/data/bob/bob.bio.face/{proto_def_name}",
f"http://www.idiap.ch/software/bob/data/bob/bob.bio.face/{proto_def_name}",
]
protocol_definition_path = get_file(
filename=proto_def_name,
urls=proto_def_urls,
cache_subdir="datasets",
file_hash="3a584a97",
)
if data_path is None:
data_path = rc.get("bob.db.replaymobile.directory", "")
if data_path == "":
logger.warning(
"Raw data path is not configured. Please set "
"'bob.db.replaymobile.directory' with the 'bob config set' command. "
"Will now attempt with current directory."
)
if annotations_path is None:
annot_name = "annotations-replaymobile-mtcnn-9cd6e452.tar.xz"
annot_urls = [
f"https://www.idiap.ch/software/bob/data/bob/bob.pad.face/{annot_name}",
f"http://www.idiap.ch/software/bob/data/bob/bob.pad.face/{annot_name}",
]
annotations_path = get_file(
filename=annot_name,
urls=annot_urls,
cache_subdir="annotations",
file_hash="9cd6e452",
)
logger.info(
f"Database: Will read CSV protocol definitions in '{protocol_definition_path}'."
)
logger.info(f"Database: Will read raw data files in '{data_path}'.")
logger.info(f"Database: Will read annotation files in '{annotations_path}'.")
super().__init__(
name="replaymobile-img",
protocol=protocol,
dataset_protocol_path=protocol_definition_path,
csv_to_sample_loader=make_pipeline(
ReplayMobileCSVFrameSampleLoader(
dataset_original_directory=data_path,
extension=data_extension,
),
FrameBoundingBoxAnnotationLoader(
annotation_directory=annotations_path,
annotation_extension=annotations_extension,
),
),
fetch_probes=False,
**kwargs,
)
self.annotation_type = "eyes-center"
self.fixed_positions = None
...@@ -26,6 +26,7 @@ from bob.bio.base.test.utils import db_available ...@@ -26,6 +26,7 @@ from bob.bio.base.test.utils import db_available
from bob.bio.base.test.test_database_implementations import check_database from bob.bio.base.test.test_database_implementations import check_database
import bob.core import bob.core
from bob.extension.download import get_file from bob.extension.download import get_file
from nose.plugins.skip import SkipTest
logger = bob.core.log.setup("bob.bio.face") logger = bob.core.log.setup("bob.bio.face")
...@@ -271,46 +272,36 @@ def test_replay_spoof(): ...@@ -271,46 +272,36 @@ def test_replay_spoof():
) )
@db_available("replaymobile") def test_replaymobile():
def test_replaymobile_licit():
database = bob.bio.base.load_resource( database = bob.bio.base.load_resource(
"replaymobile-img-licit", "database", preferred_package="bob.bio.face" "replaymobile-img", "database", preferred_package="bob.bio.face"
) )
samples = database.all_samples(groups=("dev", "eval"))
assert len(samples) == 8300, len(samples)
sample = samples[0]
assert hasattr(sample, "annotations")
assert "reye" in sample.annotations
assert "leye" in sample.annotations
assert hasattr(sample, "path")
assert hasattr(sample, "frame")
assert len(database.references()) == 16
assert len(database.references(group="eval")) == 12
assert len(database.probes()) == 4160
assert len(database.probes(group="eval")) == 3020
try: try:
check_database(database, groups=("dev", "eval")) assert sample.annotations == {
except IOError as e: "bottomright": [785, 395],
pytest.skip( "topleft": [475, 167],
"The database could not be queried; probably the db.sql3 file is missing. Here is the error: '%s'" "leye": [587, 336],
% e "reye": [588, 238],
) "mouthleft": [705, 252],
try: "mouthright": [706, 326],
_check_annotations(database, topleft=True, limit_files=20) "nose": [643, 295],
except IOError as e: }
pytest.skip( assert sample.data.shape == (3, 1280, 720)
"The annotations could not be queried; probably the annotation files are missing. Here is the error: '%s'" assert sample.data[0, 0, 0] == 87
% e except RuntimeError as e:
) raise SkipTest(e)
@db_available("replaymobile")
def test_replaymobile_spoof():
database = bob.bio.base.load_resource(
"replaymobile-img-spoof", "database", preferred_package="bob.bio.face"
)
try:
check_database(database, groups=("dev", "eval"))
except IOError as e:
pytest.skip(
"The database could not be queried; probably the db.sql3 file is missing. Here is the error: '%s'"
% e
)
try:
_check_annotations(database, topleft=True, limit_files=20)
except IOError as e:
pytest.skip(
"The annotations could not be queried; probably the annotation files are missing. Here is the error: '%s'"
% e
)
def test_ijbc(): def test_ijbc():
......
...@@ -26,6 +26,7 @@ requirements: ...@@ -26,6 +26,7 @@ requirements:
- bob.core - bob.core
- bob.io.base - bob.io.base
- bob.io.image - bob.io.image
- bob.io.video
- bob.math - bob.math
- bob.sp - bob.sp
- bob.ip.base - bob.ip.base
......
...@@ -105,10 +105,7 @@ setup( ...@@ -105,10 +105,7 @@ setup(
"mobio-all = bob.bio.face.config.database.mobio_all:database", "mobio-all = bob.bio.face.config.database.mobio_all:database",
"multipie = bob.bio.face.config.database.multipie:database", "multipie = bob.bio.face.config.database.multipie:database",
"multipie-pose = bob.bio.face.config.database.multipie_pose:database", "multipie-pose = bob.bio.face.config.database.multipie_pose:database",
"replay-img-licit = bob.bio.face.config.database.replay:replay_licit", "replaymobile-img = bob.bio.face.config.database.replaymobile:database",
"replay-img-spoof = bob.bio.face.config.database.replay:replay_spoof",
"replaymobile-img-licit = bob.bio.face.config.database.replaymobile:replaymobile_licit",
"replaymobile-img-spoof = bob.bio.face.config.database.replaymobile:replaymobile_spoof",
"fargo = bob.bio.face.config.database.fargo:database",