Commit ac4e3f1c authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Allow the used to set the value of the partition_size and dask.n_workers

[sphinx] Polished documentation

[py] Fixed rebasing
parent cf557040
Pipeline #45942 passed with stage
in 6 minutes and 5 seconds
......@@ -17,7 +17,10 @@ from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper, ZTNormCheckpointWrapper,
from .legacy import BioAlgorithmLegacy, DatabaseConnector
from .vanilla_biometrics import execute_vanilla_biometrics
from .vanilla_biometrics import (
execute_vanilla_biometrics,
execute_vanilla_biometrics_ztnorm,
)
# gets sphinx autodoc done right - don't remove it
......
......@@ -13,6 +13,11 @@ from bob.bio.base.pipelines.vanilla_biometrics import (
)
from bob.pipelines.utils import isinstance_nested
from bob.bio.base.pipelines.vanilla_biometrics import (
ZTNormPipeline,
ZTNormCheckpointWrapper,
)
logger = logging.getLogger(__name__)
......@@ -40,6 +45,8 @@ def execute_vanilla_biometrics(
output,
write_metadata_scores,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
):
"""
......@@ -102,11 +109,19 @@ def execute_vanilla_biometrics(
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
# Scaling up
if dask_n_workers is not None:
dask_client.cluster.scale(dask_n_workers)
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
partition_size = dask_get_partition_size(dask_client.cluster, n_objects)
partition_size = (
dask_get_partition_size(dask_client.cluster, n_objects)
if dask_partition_size is None
else dask_partition_size
)
pipeline = dask_vanilla_biometrics(pipeline, partition_size=partition_size,)
logger.info(f"Running vanilla biometrics for group {group}")
......@@ -125,3 +140,189 @@ def execute_vanilla_biometrics(
post_processed_scores = post_process_scores(pipeline, result, score_file_name)
_ = compute_scores(post_processed_scores, dask_client)
def execute_vanilla_biometrics_ztnorm(
pipeline,
database,
dask_client,
groups,
output,
consider_genuines,
write_metadata_scores,
ztnorm_cohort_proportion,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
):
"""
Function that executes the Vanilla Biometrics pipeline with ZTNorm.
This is called when using the ``bob bio pipelines vanilla-biometrics-ztnorm``
command.
This is also callable from a script without fear of interrupting the running
Dask instance, allowing chaining multiple experiments while keeping the
workers alive.
Parameters
----------
pipeline: Instance of :py:class:`~bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`
A constructed vanilla-biometrics pipeline.
database: Instance of :py:class:`~bob.bio.base.pipelines.vanilla_biometrics.abstract_class.Database`
A database interface instance
dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``.
groups: list of str
Groups of the dataset that will be requested from the database interface.
output: str
Path where the results and checkpoints will be saved to.
write_metadata_scores: bool
Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
checkpoint: bool
Whether checkpoint files will be created for every step of the pipelines.
dask_partition_size: int
If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).
dask_n_workers: int
If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you.
ztnorm_cohort_proportion: float
Sets the percentage of samples used for t-norm and z-norm. Sometimes you don't want to use all the t/z samples for normalization
consider_genuines: float
If set, will consider genuine scores in the ZT score normalization
"""
def _merge_references_ztnorm(biometric_references, probes, zprobes, treferences):
treferences_sub = [t.subject for t in treferences]
biometric_references_sub = [t.subject for t in biometric_references]
for i in range(len(zprobes)):
probes[i].references += treferences_sub
for i in range(len(zprobes)):
zprobes[i].references = biometric_references_sub + treferences_sub
return probes, zprobes
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
if write_metadata_scores:
pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
else:
pipeline.score_writer = FourColumnsScoreWriter(os.path.join(output, "./tmp"))
# Check if it's already checkpointed
if checkpoint and not is_checkpointed(pipeline):
pipeline = checkpoint_vanilla_biometrics(pipeline, output)
# Patching the pipeline in case of ZNorm and checkpointing it
pipeline = ZTNormPipeline(pipeline)
if checkpoint:
pipeline.ztnorm_solver = ZTNormCheckpointWrapper(
pipeline.ztnorm_solver, os.path.join(output, "normed-scores")
)
background_model_samples = database.background_model_samples()
zprobes = database.zprobes(proportion=ztnorm_cohort_proportion)
treferences = database.treferences(proportion=ztnorm_cohort_proportion)
for group in groups:
score_file_name = os.path.join(output, f"scores-{group}")
biometric_references = database.references(group=group)
probes = database.probes(group=group)
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
pipeline = dask_vanilla_biometrics(
pipeline,
partition_size=dask_get_partition_size(dask_client.cluster, n_objects),
)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
database.allow_scoring_with_all_biometric_references
if hasattr(database, "allow_scoring_with_all_biometric_references")
else False
)
if consider_genuines:
z_probes_cpy = copy.deepcopy(zprobes)
zprobes += copy.deepcopy(treferences)
treferences += z_probes_cpy
probes, zprobes = _merge_references_ztnorm(
biometric_references, probes, zprobes, treferences
)
(
raw_scores,
z_normed_scores,
t_normed_scores,
zt_normed_scores,
s_normed_scores,
) = pipeline(
background_model_samples,
biometric_references,
probes,
zprobes,
treferences,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
def _build_filename(score_file_name, suffix):
return os.path.join(score_file_name, suffix)
# Running RAW_SCORES
raw_scores = post_process_scores(
pipeline, raw_scores, _build_filename(score_file_name, "raw_scores")
)
_ = compute_scores(raw_scores, dask_client)
# Z-SCORES
z_normed_scores = post_process_scores(
pipeline,
z_normed_scores,
_build_filename(score_file_name, "z_normed_scores"),
)
_ = compute_scores(z_normed_scores, dask_client)
# T-SCORES
t_normed_scores = post_process_scores(
pipeline,
t_normed_scores,
_build_filename(score_file_name, "t_normed_scores"),
)
_ = compute_scores(t_normed_scores, dask_client)
# S-SCORES
s_normed_scores = post_process_scores(
pipeline,
s_normed_scores,
_build_filename(score_file_name, "s_normed_scores"),
)
_ = compute_scores(s_normed_scores, dask_client)
# ZT-SCORES
zt_normed_scores = post_process_scores(
pipeline,
zt_normed_scores,
_build_filename(score_file_name, "zt_normed_scores"),
)
_ = compute_scores(zt_normed_scores, dask_client)
......@@ -114,6 +114,24 @@ It is possible to do it via configuration file
help="If set, it will checkpoint all steps of the pipeline. Checkpoints will be saved in `--output`.",
cls=ResourceOption,
)
@click.option(
"--dask-partition-size",
"-s",
help="If using Dask, this option defines the size of each dask.bag.partition."
"Use this option if the current heuristic that sets this value doesn't suit your experiment."
"(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
default=None,
type=int,
)
@click.option(
"--dask-n-workers",
"-n",
help="If using Dask, this option defines the number of workers to start your experiment."
"Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
"Use this option if the current amount of workers set to start an experiment doesn't suit you.",
default=None,
type=int,
)
@verbosity_option(cls=ResourceOption)
def vanilla_biometrics(
pipeline,
......@@ -123,6 +141,8 @@ def vanilla_biometrics(
output,
write_metadata_scores,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
):
"""Runs the simplest biometrics pipeline.
......@@ -189,7 +209,9 @@ def vanilla_biometrics(
output,
write_metadata_scores,
checkpoint,
dask_partition_size,
dask_n_workers,
**kwargs,
)
logger.info("Experiment finished !!!!!")
logger.info("Experiment finished !")
......@@ -14,28 +14,8 @@ from bob.extension.scripts.click_helper import (
)
import logging
<<<<<<< HEAD
import os
from bob.bio.base.pipelines.vanilla_biometrics import (
BioAlgorithmDaskWrapper,
ZTNormPipeline,
ZTNormCheckpointWrapper,
checkpoint_vanilla_biometrics,
dask_vanilla_biometrics,
dask_get_partition_size,
FourColumnsScoreWriter,
CSVScoreWriter,
is_checkpointed,
)
from bob.pipelines.utils import isinstance_nested
from .vanilla_biometrics import (
compute_scores,
post_process_scores,
)
import copy
=======
from bob.bio.base.pipelines.vanilla_biometrics import execute_vanilla_biometrics_ztnorm
>>>>>>> Moved the ZTNorm code from the CLI command to the API
logger = logging.getLogger(__name__)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment