diff --git a/bob/bio/base/pipelines/vanilla_biometrics/__init__.py b/bob/bio/base/pipelines/vanilla_biometrics/__init__.py index 0c04f2b34156259d3e297f8c611b6e626c873ca7..58a34e1639781ff5f65506ec9fcf26ff45643a78 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/__init__.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/__init__.py @@ -2,8 +2,14 @@ from .pipelines import VanillaBiometricsPipeline from .biometric_algorithms import Distance from .score_writers import FourColumnsScoreWriter, CSVScoreWriter -from .wrappers import BioAlgorithmCheckpointWrapper, BioAlgorithmDaskWrapper, dask_vanilla_biometrics, checkpoint_vanilla_biometrics +from .wrappers import ( + BioAlgorithmCheckpointWrapper, + BioAlgorithmDaskWrapper, + dask_vanilla_biometrics, + checkpoint_vanilla_biometrics, + dask_get_partition_size, +) from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper, ZTNormCheckpointWrapper -from .legacy import BioAlgorithmLegacy, DatabaseConnector \ No newline at end of file +from .legacy import BioAlgorithmLegacy, DatabaseConnector diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index 0928ba0197c6770448673fb87db7b22825cf39f0..cf0d1092951d80f15ebc01daeecbfa79b242ad0b 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -74,13 +74,14 @@ class DatabaseConnector(Database): allow_scoring_with_all_biometric_references=True, annotation_type="eyes-center", fixed_positions=None, - ** kwargs, + **kwargs, ): self.database = database - self.allow_scoring_with_all_biometric_references = allow_scoring_with_all_biometric_references + self.allow_scoring_with_all_biometric_references = ( + allow_scoring_with_all_biometric_references + ) self.annotation_type = annotation_type - self.fixed_positions=fixed_positions - + self.fixed_positions = fixed_positions def background_model_samples(self): """Returns :py:class:`Sample`'s to train a background model (group @@ -234,8 +235,14 @@ class BioAlgorithmLegacy(BioAlgorithm): @base_dir.setter def base_dir(self, v): self._base_dir = v - self.biometric_reference_dir = os.path.join(self._base_dir, "biometric_references") + self.biometric_reference_dir = os.path.join( + self._base_dir, "biometric_references" + ) self.score_dir = os.path.join(self._base_dir, "scores") + if self.projector_file is not None: + self.projector_file = os.path.join( + self._base_dir, os.path.basename(self.projector_file) + ) def load_legacy_background_model(self): # Loading background model diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py index 15eb02dd37f3f6df3c472816d663f3fe7159e217..08df4bda677ad0667bee327924a97490226dd099 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -11,8 +11,14 @@ import h5py import cloudpickle from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper from .legacy import BioAlgorithmLegacy -from bob.bio.base.transformers import PreprocessorTransformer, ExtractorTransformer, AlgorithmTransformer +from bob.bio.base.transformers import ( + PreprocessorTransformer, + ExtractorTransformer, + AlgorithmTransformer, +) from bob.pipelines.wrappers import SampleWrapper +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster + class BioAlgorithmCheckpointWrapper(BioAlgorithm): """Wrapper used to checkpoint enrolled and Scoring samples. @@ -259,6 +265,28 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None): return pipeline +def dask_get_partition_size(cluster, n_objects): + """ + Heuristics that gives you a number for dask.partition_size. + The heuristics is pretty simple, given the max number of possible workers to be run + in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers: + + Parameters: + ----------- + + cluster: :any:`bob.pipelines.distributed.SGEMultipleQueuesCluster` + Cluster of the type :any:`bob.pipelines.distributed.SGEMultipleQueuesCluster` + + n_objects: int + Number of objects to be processed + + """ + if not isinstance(cluster, SGEMultipleQueuesCluster): + return None + + max_jobs = cluster.sge_job_spec["default"]["max_jobs"] + return n_objects//max_jobs if n_objects>max_jobs else 1 + def checkpoint_vanilla_biometrics(pipeline, base_dir): """ @@ -269,7 +297,7 @@ def checkpoint_vanilla_biometrics(pipeline, base_dir): ---------- pipeline: :any:`VanillaBiometrics` - Vanilla Biometrics based pipeline to be dasked + Vanilla Biometrics based pipeline to be checkpointed base_dir: str Path to store biometric references and scores @@ -280,29 +308,37 @@ def checkpoint_vanilla_biometrics(pipeline, base_dir): for i, name, estimator in sk_pipeline._iter(): # If they are legacy objects, we need to hook their load/save functions - save_func=None - load_func=None + save_func = None + load_func = None if not isinstance(estimator, SampleWrapper): - raise ValueError(f"{estimator} needs to be the type `SampleWrapper` to be checkpointed") + raise ValueError( + f"{estimator} needs to be the type `SampleWrapper` to be checkpointed" + ) if isinstance(estimator.estimator, PreprocessorTransformer): save_func = estimator.estimator.instance.write_data load_func = estimator.estimator.instance.read_data - elif any([isinstance(estimator.estimator, ExtractorTransformer), - isinstance(estimator.estimator, AlgorithmTransformer)]): + elif any( + [ + isinstance(estimator.estimator, ExtractorTransformer), + isinstance(estimator.estimator, AlgorithmTransformer), + ] + ): save_func = estimator.estimator.instance.write_feature load_func = estimator.estimator.instance.read_feature wraped_estimator = bob.pipelines.wrap( - ["checkpoint"], estimator, features_dir=os.path.join(base_dir, name), + ["checkpoint"], + estimator, + features_dir=os.path.join(base_dir, name), load_func=load_func, - save_func=save_func + save_func=save_func, ) sk_pipeline.steps[i] = (name, wraped_estimator) - if isinstance(pipeline.biometric_algorithm, BioAlgorithmLegacy): + if isinstance(pipeline.biometric_algorithm, BioAlgorithmLegacy): pipeline.biometric_algorithm.base_dir = base_dir else: pipeline.biometric_algorithm = BioAlgorithmCheckpointWrapper( diff --git a/bob/bio/base/script/pipelines.py b/bob/bio/base/script/pipelines.py new file mode 100644 index 0000000000000000000000000000000000000000..b17201c83680937a11e8ff917a1aa51b69e965a8 --- /dev/null +++ b/bob/bio/base/script/pipelines.py @@ -0,0 +1,11 @@ +import click +import pkg_resources +from click_plugins import with_plugins +from bob.extension.scripts.click_helper import AliasedGroup + + +@with_plugins(pkg_resources.iter_entry_points("bob.bio.pipelines.cli")) +@click.group(cls=AliasedGroup) +def pipelines(): + """Pipelines commands.""" + pass diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index f54d3f644194c4dbd578769a7e72543881fe4817..fe4338fbdfa86142aedc0fd509b1bcdbe8f48ef5 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -21,15 +21,66 @@ from bob.bio.base.pipelines.vanilla_biometrics import ( VanillaBiometricsPipeline, BioAlgorithmCheckpointWrapper, BioAlgorithmDaskWrapper, - ZTNormPipeline, - ZTNormDaskWrapper, - ZTNormCheckpointWrapper, + checkpoint_vanilla_biometrics, + dask_vanilla_biometrics, + dask_get_partition_size, ) from dask.delayed import Delayed +import pkg_resources +from bob.extension.config import load as chain_load +from bob.pipelines.utils import isinstance_nested + logger = logging.getLogger(__name__) +def get_resource_filename(resource_name, group): + """ + Get the file name of a resource. + + + Parameters + ---------- + resource_name: str + Name of the resource to be searched + + group: str + Entry point group + + Return + ------ + filename: str + The entrypoint file name + + """ + + # Check if it's already a path + if os.path.exists(resource_name): + return resource_name + + # If it's a resource get the path of this resource + resources = [r for r in pkg_resources.iter_entry_points(group)] + + # if resource_name not in [r.name for r in resources]: + # raise ValueError(f"Resource not found: `{resource_name}`") + + for r in resources: + if r.name == resource_name: + resource = r + break + else: + raise ValueError(f"Resource not found: `{resource_name}`") + + # TODO: This get the root path only + # I don't know how to get the filename + return ( + pkg_resources.resource_filename( + resource.module_name, resource.module_name.split(".")[-1] + ) + + ".py" + ) + + EPILOG = """\b @@ -65,30 +116,37 @@ TODO: Work out this help """ +def compute_scores(result, dask_client): + if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag): + if dask_client is not None: + result = result.compute(scheduler=dask_client) + else: + logger.warning("`dask_client` not set. Your pipeline will run locally") + result = result.compute(scheduler="single-threaded") + return result + + +def post_process_scores(pipeline, scores, path): + writed_scores = pipeline.write_scores(scores) + return pipeline.post_process(writed_scores, path) + + @click.command( - entry_point_group="bob.pipelines.config", cls=ConfigCommand, epilog=EPILOG, + entry_point_group="bob.bio.pipeline.config", cls=ConfigCommand, epilog=EPILOG, ) @click.option( - "--pipeline", - "-p", - required=True, - cls=ResourceOption, - entry_point_group="bob.pipelines.pipeline", - help="Feature extraction algorithm", + "--pipeline", "-p", required=True, help="Vanilla biometrics pipeline", ) @click.option( "--database", "-d", required=True, - cls=ResourceOption, - entry_point_group="bob.bio.database", # This should be linked to bob.bio.base help="Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)", ) @click.option( "--dask-client", "-l", required=False, - cls=ResourceOption, help="Dask client for the execution of the pipeline.", ) @click.option( @@ -107,11 +165,8 @@ TODO: Work out this help default="results", help="Name of output directory", ) -@click.option("--ztnorm", is_flag=True, help="If set, run an experiment with ZTNorm") @verbosity_option(cls=ResourceOption) -def vanilla_biometrics( - pipeline, database, dask_client, groups, output, ztnorm, **kwargs -): +def vanilla_biometrics(pipeline, database, dask_client, groups, output, **kwargs): """Runs the simplest biometrics pipeline. Such pipeline consists into three sub-pipelines. @@ -156,78 +211,45 @@ def vanilla_biometrics( """ - def _compute_scores(result, dask_client): - if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag): - if dask_client is not None: - result = result.compute(scheduler=dask_client) - else: - logger.warning("`dask_client` not set. Your pipeline will run locally") - result = result.compute(scheduler="single-threaded") - return result - - def _post_process_scores(pipeline, scores, path): - writed_scores = pipeline.write_scores(scores) - return pipeline.post_process(writed_scores, path) - - def _merge_references_ztnorm(biometric_references,probes,zprobes,treferences): - treferences_sub = [t.subject for t in treferences] - biometric_references_sub = [t.subject for t in biometric_references] - - for i in range(len(zprobes)): - probes[i].references += treferences_sub - - for i in range(len(zprobes)): - zprobes[i].references = biometric_references_sub + treferences_sub - - return probes, zprobes - - def _is_dask_checkpoint(pipeline): - """ - Check if a VanillaPipeline has daskable and checkpointable algorithms - """ - is_dask = False - is_checkpoint = False - algorithm = pipeline.biometric_algorithm - base_dir = "" - - while True: - if isinstance(algorithm, BioAlgorithmDaskWrapper): - is_dask = True - - if isinstance(algorithm, BioAlgorithmCheckpointWrapper): - is_checkpoint = True - base_dir = algorithm.base_dir - - if hasattr(algorithm, "biometric_algorithm"): - algorithm = algorithm.biometric_algorithm - else: - break - return is_dask, is_checkpoint, base_dir - if not os.path.exists(output): os.makedirs(output, exist_ok=True) - - # Patching the pipeline in case of ZNorm - if ztnorm: - pipeline = ZTNormPipeline(pipeline) - is_dask, is_checkpoint, base_dir = _is_dask_checkpoint( - pipeline.vanilla_biometrics_pipeline - ) - - if is_checkpoint: - pipeline.ztnorm_solver = ZTNormCheckpointWrapper( - pipeline.ztnorm_solver, os.path.join(base_dir, "normed-scores") - ) - - if is_dask: - pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver) - - + # It's necessary to chain load 2 resources together + pipeline_config = get_resource_filename(pipeline, "bob.bio.pipeline") + database_config = get_resource_filename(database, "bob.bio.database") + vanilla_pipeline = chain_load([database_config, pipeline_config]) + dask_client = chain_load([dask_client]).dask_client + + # Picking the resources + database = vanilla_pipeline.database + pipeline = vanilla_pipeline.pipeline + + # Check if it's already checkpointed + if not isinstance_nested( + pipeline.biometric_algorithm, + "biometric_algorithm", + BioAlgorithmCheckpointWrapper, + ): + pipeline = checkpoint_vanilla_biometrics(pipeline, output) + + background_model_samples = database.background_model_samples() for group in groups: score_file_name = os.path.join(output, f"scores-{group}") biometric_references = database.references(group=group) + probes = database.probes(group=group) + + if dask_client is not None and not isinstance_nested( + pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper + ): + + n_objects = ( + len(background_model_samples) + len(biometric_references) + len(probes) + ) + pipeline = dask_vanilla_biometrics( + pipeline, + partition_size=dask_get_partition_size(dask_client.cluster, n_objects), + ) logger.info(f"Running vanilla biometrics for group {group}") allow_scoring_with_all_biometric_references = ( @@ -236,68 +258,15 @@ def vanilla_biometrics( else False ) - if ztnorm: - zprobes = database.zprobes() - probes = database.probes(group=group) - treferences = database.treferences() - - probes, zprobes = _merge_references_ztnorm(biometric_references,probes,zprobes,treferences) - raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores = pipeline( - database.background_model_samples(), - biometric_references, - probes, - zprobes, - treferences, - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) - - def _build_filename(score_file_name, suffix): - return os.path.join(score_file_name, suffix) - - # Running RAW_SCORES - raw_scores = _post_process_scores( - pipeline, raw_scores, _build_filename(score_file_name, "raw_scores") - ) - - _ = _compute_scores(raw_scores, dask_client) - - # Z-SCORES - z_normed_scores = _post_process_scores( - pipeline, - z_normed_scores, - _build_filename(score_file_name, "z_normed_scores"), - ) - _ = _compute_scores(z_normed_scores, dask_client) - - # T-SCORES - t_normed_scores = _post_process_scores( - pipeline, - t_normed_scores, - _build_filename(score_file_name, "t_normed_scores"), - ) - _ = _compute_scores(t_normed_scores, dask_client) - - # ZT-SCORES - zt_normed_scores = _post_process_scores( - pipeline, - zt_normed_scores, - _build_filename(score_file_name, "zt_normed_scores"), - ) - _ = _compute_scores(zt_normed_scores, dask_client) - - else: - - result = pipeline( - database.background_model_samples(), - biometric_references, - database.probes(group=group), - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) + result = pipeline( + background_model_samples, + biometric_references, + probes, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) - post_processed_scores = _post_process_scores( - pipeline, result, score_file_name - ) - _ = _compute_scores(post_processed_scores, dask_client) + post_processed_scores = post_process_scores(pipeline, result, score_file_name) + _ = compute_scores(post_processed_scores, dask_client) if dask_client is not None: dask_client.shutdown() diff --git a/bob/bio/base/test/test_database_implementations.py b/bob/bio/base/test/test_database_implementations.py index 17e21147e90ad7b24630be7eba076d5ec8d5f9c3..db19203325df1cb2add6b79536f587a474f9b7e8 100644 --- a/bob/bio/base/test/test_database_implementations.py +++ b/bob/bio/base/test/test_database_implementations.py @@ -13,39 +13,41 @@ from bob.bio.base.database import BioDatabase, ZTBioDatabase def check_database(database, groups=('dev',), protocol=None, training_depends=False, models_depend=False, skip_train=False, check_zt=False): - assert isinstance(database, BioDatabase) + database_legacy = database.database + assert isinstance(database_legacy, BioDatabase) # load the directories if 'HOME' in os.environ: - database.replace_directories(os.path.join(os.environ['HOME'], '.bob_bio_databases.txt')) + database_legacy.replace_directories(os.path.join(os.environ['HOME'], '.bob_bio_databases.txt')) if protocol: - database.protocol = protocol + database_legacy.protocol = protocol if protocol is None: - protocol = database.protocol + protocol = database_legacy.protocol - assert len(database.all_files(add_zt_files=check_zt)) > 0 + assert len(database_legacy.all_files(add_zt_files=check_zt)) > 0 if not skip_train: - assert len(database.training_files('train_extractor')) > 0 - assert len(database.arrange_by_client(database.training_files('train_enroller'))) > 0 + assert len(database_legacy.training_files('train_extractor')) > 0 + assert len(database_legacy.arrange_by_client(database_legacy.training_files('train_enroller'))) > 0 for group in groups: - model_ids = database.model_ids_with_protocol(group, protocol=protocol) + model_ids = database_legacy.model_ids_with_protocol(group, protocol=protocol) assert len(model_ids) > 0 - assert database.client_id_from_model_id(model_ids[0], group) is not None - assert len(database.enroll_files(model_ids[0], group)) > 0 - assert len(database.probe_files(model_ids[0], group)) > 0 + assert database_legacy.client_id_from_model_id(model_ids[0], group) is not None + assert len(database_legacy.enroll_files(model_ids[0], group)) > 0 + assert len(database_legacy.probe_files(model_ids[0], group)) > 0 - assert database.training_depends_on_protocol == training_depends - assert database.models_depend_on_protocol == models_depend + assert database_legacy.training_depends_on_protocol == training_depends + assert database_legacy.models_depend_on_protocol == models_depend def check_database_zt(database, groups=('dev', 'eval'), protocol=None, training_depends=False, models_depend=False): + database_legacy = database.database check_database(database, groups, protocol, training_depends, models_depend, check_zt=True) - assert isinstance(database, ZTBioDatabase) + assert isinstance(database_legacy, ZTBioDatabase) for group in groups: - t_model_ids = database.t_model_ids(group) + t_model_ids = database_legacy.t_model_ids(group) assert len(t_model_ids) > 0 - assert database.client_id_from_model_id(t_model_ids[0], group) is not None - assert len(database.t_enroll_files(t_model_ids[0], group)) > 0 - assert len(database.z_probe_files(group)) > 0 + assert database_legacy.client_id_from_model_id(t_model_ids[0], group) is not None + assert len(database_legacy.t_enroll_files(t_model_ids[0], group)) > 0 + assert len(database_legacy.z_probe_files(group)) > 0 diff --git a/bob/bio/base/transformers/__init__.py b/bob/bio/base/transformers/__init__.py index 4893224a141f179011ceea7b35e2df51af3d94c4..8e88c0758985523eba641dea99b75f1d74db5bfb 100644 --- a/bob/bio/base/transformers/__init__.py +++ b/bob/bio/base/transformers/__init__.py @@ -4,7 +4,7 @@ from pkgutil import extend_path __path__ = extend_path(__path__, __name__) from collections import defaultdict -def split_X_by_y(X, y): +def split_X_by_y(X, y): training_data = defaultdict(list) for x1, y1 in zip(X, y): training_data[y1].append(x1) diff --git a/setup.py b/setup.py index ce5f07a172398d964ad40f0e8ff6afe64d9f3d71..6c9c9445162c73218a95b91890ef4c0914c69b5f 100644 --- a/setup.py +++ b/setup.py @@ -132,6 +132,7 @@ setup( 'evaluate = bob.bio.base.script.commands:evaluate', 'baseline = bob.bio.base.script.baseline:baseline', 'sort = bob.bio.base.script.sort:sort', + 'pipelines = bob.bio.base.script.pipelines:pipelines', ], # annotators @@ -140,9 +141,8 @@ setup( ], # run pipelines - 'bob.pipelines.cli':[ + 'bob.bio.pipelines.cli':[ 'vanilla-biometrics = bob.bio.base.script.vanilla_biometrics:vanilla_biometrics', - 'vanilla-biometrics-template = bob.bio.base.script.vanilla_biometrics:vanilla_biometrics_template' ],