Skip to content
Snippets Groups Projects
Commit 445338eb authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Cleaning up vanilla biometrics

parent fe38ce3f
No related branches found
No related tags found
2 merge requests!192Redoing baselines,!180[dask] Preparing bob.bio.base for dask pipelines
Pipeline #40518 passed
......@@ -2,8 +2,14 @@ from .pipelines import VanillaBiometricsPipeline
from .biometric_algorithms import Distance
from .score_writers import FourColumnsScoreWriter, CSVScoreWriter
from .wrappers import BioAlgorithmCheckpointWrapper, BioAlgorithmDaskWrapper, dask_vanilla_biometrics, checkpoint_vanilla_biometrics
from .wrappers import (
BioAlgorithmCheckpointWrapper,
BioAlgorithmDaskWrapper,
dask_vanilla_biometrics,
checkpoint_vanilla_biometrics,
dask_get_partition_size,
)
from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper, ZTNormCheckpointWrapper
from .legacy import BioAlgorithmLegacy, DatabaseConnector
\ No newline at end of file
from .legacy import BioAlgorithmLegacy, DatabaseConnector
......@@ -74,13 +74,14 @@ class DatabaseConnector(Database):
allow_scoring_with_all_biometric_references=True,
annotation_type="eyes-center",
fixed_positions=None,
** kwargs,
**kwargs,
):
self.database = database
self.allow_scoring_with_all_biometric_references = allow_scoring_with_all_biometric_references
self.allow_scoring_with_all_biometric_references = (
allow_scoring_with_all_biometric_references
)
self.annotation_type = annotation_type
self.fixed_positions=fixed_positions
self.fixed_positions = fixed_positions
def background_model_samples(self):
"""Returns :py:class:`Sample`'s to train a background model (group
......@@ -234,8 +235,14 @@ class BioAlgorithmLegacy(BioAlgorithm):
@base_dir.setter
def base_dir(self, v):
self._base_dir = v
self.biometric_reference_dir = os.path.join(self._base_dir, "biometric_references")
self.biometric_reference_dir = os.path.join(
self._base_dir, "biometric_references"
)
self.score_dir = os.path.join(self._base_dir, "scores")
if self.projector_file is not None:
self.projector_file = os.path.join(
self._base_dir, os.path.basename(self.projector_file)
)
def load_legacy_background_model(self):
# Loading background model
......
......@@ -11,8 +11,14 @@ import h5py
import cloudpickle
from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper
from .legacy import BioAlgorithmLegacy
from bob.bio.base.transformers import PreprocessorTransformer, ExtractorTransformer, AlgorithmTransformer
from bob.bio.base.transformers import (
PreprocessorTransformer,
ExtractorTransformer,
AlgorithmTransformer,
)
from bob.pipelines.wrappers import SampleWrapper
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
class BioAlgorithmCheckpointWrapper(BioAlgorithm):
"""Wrapper used to checkpoint enrolled and Scoring samples.
......@@ -259,6 +265,28 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
return pipeline
def dask_get_partition_size(cluster, n_objects):
"""
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
Parameters:
-----------
cluster: :any:`bob.pipelines.distributed.SGEMultipleQueuesCluster`
Cluster of the type :any:`bob.pipelines.distributed.SGEMultipleQueuesCluster`
n_objects: int
Number of objects to be processed
"""
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
return n_objects//max_jobs if n_objects>max_jobs else 1
def checkpoint_vanilla_biometrics(pipeline, base_dir):
"""
......@@ -269,7 +297,7 @@ def checkpoint_vanilla_biometrics(pipeline, base_dir):
----------
pipeline: :any:`VanillaBiometrics`
Vanilla Biometrics based pipeline to be dasked
Vanilla Biometrics based pipeline to be checkpointed
base_dir: str
Path to store biometric references and scores
......@@ -280,29 +308,37 @@ def checkpoint_vanilla_biometrics(pipeline, base_dir):
for i, name, estimator in sk_pipeline._iter():
# If they are legacy objects, we need to hook their load/save functions
save_func=None
load_func=None
save_func = None
load_func = None
if not isinstance(estimator, SampleWrapper):
raise ValueError(f"{estimator} needs to be the type `SampleWrapper` to be checkpointed")
raise ValueError(
f"{estimator} needs to be the type `SampleWrapper` to be checkpointed"
)
if isinstance(estimator.estimator, PreprocessorTransformer):
save_func = estimator.estimator.instance.write_data
load_func = estimator.estimator.instance.read_data
elif any([isinstance(estimator.estimator, ExtractorTransformer),
isinstance(estimator.estimator, AlgorithmTransformer)]):
elif any(
[
isinstance(estimator.estimator, ExtractorTransformer),
isinstance(estimator.estimator, AlgorithmTransformer),
]
):
save_func = estimator.estimator.instance.write_feature
load_func = estimator.estimator.instance.read_feature
wraped_estimator = bob.pipelines.wrap(
["checkpoint"], estimator, features_dir=os.path.join(base_dir, name),
["checkpoint"],
estimator,
features_dir=os.path.join(base_dir, name),
load_func=load_func,
save_func=save_func
save_func=save_func,
)
sk_pipeline.steps[i] = (name, wraped_estimator)
if isinstance(pipeline.biometric_algorithm, BioAlgorithmLegacy):
if isinstance(pipeline.biometric_algorithm, BioAlgorithmLegacy):
pipeline.biometric_algorithm.base_dir = base_dir
else:
pipeline.biometric_algorithm = BioAlgorithmCheckpointWrapper(
......
import click
import pkg_resources
from click_plugins import with_plugins
from bob.extension.scripts.click_helper import AliasedGroup
@with_plugins(pkg_resources.iter_entry_points("bob.bio.pipelines.cli"))
@click.group(cls=AliasedGroup)
def pipelines():
"""Pipelines commands."""
pass
......@@ -21,15 +21,66 @@ from bob.bio.base.pipelines.vanilla_biometrics import (
VanillaBiometricsPipeline,
BioAlgorithmCheckpointWrapper,
BioAlgorithmDaskWrapper,
ZTNormPipeline,
ZTNormDaskWrapper,
ZTNormCheckpointWrapper,
checkpoint_vanilla_biometrics,
dask_vanilla_biometrics,
dask_get_partition_size,
)
from dask.delayed import Delayed
import pkg_resources
from bob.extension.config import load as chain_load
from bob.pipelines.utils import isinstance_nested
logger = logging.getLogger(__name__)
def get_resource_filename(resource_name, group):
"""
Get the file name of a resource.
Parameters
----------
resource_name: str
Name of the resource to be searched
group: str
Entry point group
Return
------
filename: str
The entrypoint file name
"""
# Check if it's already a path
if os.path.exists(resource_name):
return resource_name
# If it's a resource get the path of this resource
resources = [r for r in pkg_resources.iter_entry_points(group)]
# if resource_name not in [r.name for r in resources]:
# raise ValueError(f"Resource not found: `{resource_name}`")
for r in resources:
if r.name == resource_name:
resource = r
break
else:
raise ValueError(f"Resource not found: `{resource_name}`")
# TODO: This get the root path only
# I don't know how to get the filename
return (
pkg_resources.resource_filename(
resource.module_name, resource.module_name.split(".")[-1]
)
+ ".py"
)
EPILOG = """\b
......@@ -65,30 +116,37 @@ TODO: Work out this help
"""
def compute_scores(result, dask_client):
if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
if dask_client is not None:
result = result.compute(scheduler=dask_client)
else:
logger.warning("`dask_client` not set. Your pipeline will run locally")
result = result.compute(scheduler="single-threaded")
return result
def post_process_scores(pipeline, scores, path):
writed_scores = pipeline.write_scores(scores)
return pipeline.post_process(writed_scores, path)
@click.command(
entry_point_group="bob.pipelines.config", cls=ConfigCommand, epilog=EPILOG,
entry_point_group="bob.bio.pipeline.config", cls=ConfigCommand, epilog=EPILOG,
)
@click.option(
"--pipeline",
"-p",
required=True,
cls=ResourceOption,
entry_point_group="bob.pipelines.pipeline",
help="Feature extraction algorithm",
"--pipeline", "-p", required=True, help="Vanilla biometrics pipeline",
)
@click.option(
"--database",
"-d",
required=True,
cls=ResourceOption,
entry_point_group="bob.bio.database", # This should be linked to bob.bio.base
help="Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)",
)
@click.option(
"--dask-client",
"-l",
required=False,
cls=ResourceOption,
help="Dask client for the execution of the pipeline.",
)
@click.option(
......@@ -107,11 +165,8 @@ TODO: Work out this help
default="results",
help="Name of output directory",
)
@click.option("--ztnorm", is_flag=True, help="If set, run an experiment with ZTNorm")
@verbosity_option(cls=ResourceOption)
def vanilla_biometrics(
pipeline, database, dask_client, groups, output, ztnorm, **kwargs
):
def vanilla_biometrics(pipeline, database, dask_client, groups, output, **kwargs):
"""Runs the simplest biometrics pipeline.
Such pipeline consists into three sub-pipelines.
......@@ -156,78 +211,45 @@ def vanilla_biometrics(
"""
def _compute_scores(result, dask_client):
if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
if dask_client is not None:
result = result.compute(scheduler=dask_client)
else:
logger.warning("`dask_client` not set. Your pipeline will run locally")
result = result.compute(scheduler="single-threaded")
return result
def _post_process_scores(pipeline, scores, path):
writed_scores = pipeline.write_scores(scores)
return pipeline.post_process(writed_scores, path)
def _merge_references_ztnorm(biometric_references,probes,zprobes,treferences):
treferences_sub = [t.subject for t in treferences]
biometric_references_sub = [t.subject for t in biometric_references]
for i in range(len(zprobes)):
probes[i].references += treferences_sub
for i in range(len(zprobes)):
zprobes[i].references = biometric_references_sub + treferences_sub
return probes, zprobes
def _is_dask_checkpoint(pipeline):
"""
Check if a VanillaPipeline has daskable and checkpointable algorithms
"""
is_dask = False
is_checkpoint = False
algorithm = pipeline.biometric_algorithm
base_dir = ""
while True:
if isinstance(algorithm, BioAlgorithmDaskWrapper):
is_dask = True
if isinstance(algorithm, BioAlgorithmCheckpointWrapper):
is_checkpoint = True
base_dir = algorithm.base_dir
if hasattr(algorithm, "biometric_algorithm"):
algorithm = algorithm.biometric_algorithm
else:
break
return is_dask, is_checkpoint, base_dir
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
# Patching the pipeline in case of ZNorm
if ztnorm:
pipeline = ZTNormPipeline(pipeline)
is_dask, is_checkpoint, base_dir = _is_dask_checkpoint(
pipeline.vanilla_biometrics_pipeline
)
if is_checkpoint:
pipeline.ztnorm_solver = ZTNormCheckpointWrapper(
pipeline.ztnorm_solver, os.path.join(base_dir, "normed-scores")
)
if is_dask:
pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver)
# It's necessary to chain load 2 resources together
pipeline_config = get_resource_filename(pipeline, "bob.bio.pipeline")
database_config = get_resource_filename(database, "bob.bio.database")
vanilla_pipeline = chain_load([database_config, pipeline_config])
dask_client = chain_load([dask_client]).dask_client
# Picking the resources
database = vanilla_pipeline.database
pipeline = vanilla_pipeline.pipeline
# Check if it's already checkpointed
if not isinstance_nested(
pipeline.biometric_algorithm,
"biometric_algorithm",
BioAlgorithmCheckpointWrapper,
):
pipeline = checkpoint_vanilla_biometrics(pipeline, output)
background_model_samples = database.background_model_samples()
for group in groups:
score_file_name = os.path.join(output, f"scores-{group}")
biometric_references = database.references(group=group)
probes = database.probes(group=group)
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
n_objects = (
len(background_model_samples) + len(biometric_references) + len(probes)
)
pipeline = dask_vanilla_biometrics(
pipeline,
partition_size=dask_get_partition_size(dask_client.cluster, n_objects),
)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
......@@ -236,68 +258,15 @@ def vanilla_biometrics(
else False
)
if ztnorm:
zprobes = database.zprobes()
probes = database.probes(group=group)
treferences = database.treferences()
probes, zprobes = _merge_references_ztnorm(biometric_references,probes,zprobes,treferences)
raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores = pipeline(
database.background_model_samples(),
biometric_references,
probes,
zprobes,
treferences,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
def _build_filename(score_file_name, suffix):
return os.path.join(score_file_name, suffix)
# Running RAW_SCORES
raw_scores = _post_process_scores(
pipeline, raw_scores, _build_filename(score_file_name, "raw_scores")
)
_ = _compute_scores(raw_scores, dask_client)
# Z-SCORES
z_normed_scores = _post_process_scores(
pipeline,
z_normed_scores,
_build_filename(score_file_name, "z_normed_scores"),
)
_ = _compute_scores(z_normed_scores, dask_client)
# T-SCORES
t_normed_scores = _post_process_scores(
pipeline,
t_normed_scores,
_build_filename(score_file_name, "t_normed_scores"),
)
_ = _compute_scores(t_normed_scores, dask_client)
# ZT-SCORES
zt_normed_scores = _post_process_scores(
pipeline,
zt_normed_scores,
_build_filename(score_file_name, "zt_normed_scores"),
)
_ = _compute_scores(zt_normed_scores, dask_client)
else:
result = pipeline(
database.background_model_samples(),
biometric_references,
database.probes(group=group),
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
result = pipeline(
background_model_samples,
biometric_references,
probes,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
post_processed_scores = _post_process_scores(
pipeline, result, score_file_name
)
_ = _compute_scores(post_processed_scores, dask_client)
post_processed_scores = post_process_scores(pipeline, result, score_file_name)
_ = compute_scores(post_processed_scores, dask_client)
if dask_client is not None:
dask_client.shutdown()
......@@ -13,39 +13,41 @@ from bob.bio.base.database import BioDatabase, ZTBioDatabase
def check_database(database, groups=('dev',), protocol=None, training_depends=False, models_depend=False, skip_train=False, check_zt=False):
assert isinstance(database, BioDatabase)
database_legacy = database.database
assert isinstance(database_legacy, BioDatabase)
# load the directories
if 'HOME' in os.environ:
database.replace_directories(os.path.join(os.environ['HOME'], '.bob_bio_databases.txt'))
database_legacy.replace_directories(os.path.join(os.environ['HOME'], '.bob_bio_databases.txt'))
if protocol:
database.protocol = protocol
database_legacy.protocol = protocol
if protocol is None:
protocol = database.protocol
protocol = database_legacy.protocol
assert len(database.all_files(add_zt_files=check_zt)) > 0
assert len(database_legacy.all_files(add_zt_files=check_zt)) > 0
if not skip_train:
assert len(database.training_files('train_extractor')) > 0
assert len(database.arrange_by_client(database.training_files('train_enroller'))) > 0
assert len(database_legacy.training_files('train_extractor')) > 0
assert len(database_legacy.arrange_by_client(database_legacy.training_files('train_enroller'))) > 0
for group in groups:
model_ids = database.model_ids_with_protocol(group, protocol=protocol)
model_ids = database_legacy.model_ids_with_protocol(group, protocol=protocol)
assert len(model_ids) > 0
assert database.client_id_from_model_id(model_ids[0], group) is not None
assert len(database.enroll_files(model_ids[0], group)) > 0
assert len(database.probe_files(model_ids[0], group)) > 0
assert database_legacy.client_id_from_model_id(model_ids[0], group) is not None
assert len(database_legacy.enroll_files(model_ids[0], group)) > 0
assert len(database_legacy.probe_files(model_ids[0], group)) > 0
assert database.training_depends_on_protocol == training_depends
assert database.models_depend_on_protocol == models_depend
assert database_legacy.training_depends_on_protocol == training_depends
assert database_legacy.models_depend_on_protocol == models_depend
def check_database_zt(database, groups=('dev', 'eval'), protocol=None, training_depends=False, models_depend=False):
database_legacy = database.database
check_database(database, groups, protocol, training_depends, models_depend, check_zt=True)
assert isinstance(database, ZTBioDatabase)
assert isinstance(database_legacy, ZTBioDatabase)
for group in groups:
t_model_ids = database.t_model_ids(group)
t_model_ids = database_legacy.t_model_ids(group)
assert len(t_model_ids) > 0
assert database.client_id_from_model_id(t_model_ids[0], group) is not None
assert len(database.t_enroll_files(t_model_ids[0], group)) > 0
assert len(database.z_probe_files(group)) > 0
assert database_legacy.client_id_from_model_id(t_model_ids[0], group) is not None
assert len(database_legacy.t_enroll_files(t_model_ids[0], group)) > 0
assert len(database_legacy.z_probe_files(group)) > 0
......@@ -4,7 +4,7 @@ from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
from collections import defaultdict
def split_X_by_y(X, y):
def split_X_by_y(X, y):
training_data = defaultdict(list)
for x1, y1 in zip(X, y):
training_data[y1].append(x1)
......
......@@ -132,6 +132,7 @@ setup(
'evaluate = bob.bio.base.script.commands:evaluate',
'baseline = bob.bio.base.script.baseline:baseline',
'sort = bob.bio.base.script.sort:sort',
'pipelines = bob.bio.base.script.pipelines:pipelines',
],
# annotators
......@@ -140,9 +141,8 @@ setup(
],
# run pipelines
'bob.pipelines.cli':[
'bob.bio.pipelines.cli':[
'vanilla-biometrics = bob.bio.base.script.vanilla_biometrics:vanilla_biometrics',
'vanilla-biometrics-template = bob.bio.base.script.vanilla_biometrics:vanilla_biometrics_template'
],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment