Commit 0a328a44 authored by Yannick DAYER's avatar Yannick DAYER
Browse files

[py] Ported the 'annotate' command to pipelines.

Modified the 'annotate' command to use transformers and Dask.
Added a 'SaveAnnotationsWrapper' inheriting of 'CheckpointWrapper'.
Fixed tests for annotator and FailSafe annnotator.
Ported the dummy annotators.
Ported the dummy database.
parent ee8a62a7
Pipeline #44662 passed with stage
in 29 minutes and 53 seconds
......@@ -34,31 +34,33 @@ class FailSafe(Annotator):
self.required_keys = list(required_keys)
self.only_required_keys = only_required_keys
def annotate(self, sample, **kwargs):
if 'annotations' not in kwargs or kwargs['annotations'] is None:
kwargs['annotations'] = {}
for annotator in self.annotators:
try:
annotations = annotator(sample, **kwargs)
except Exception:
logger.debug(
"The annotator `%s' failed to annotate!", annotator,
exc_info=True)
annotations = None
if not annotations:
logger.debug(
"Annotator `%s' returned empty annotations.", annotator)
else:
logger.debug("Annotator `%s' succeeded!", annotator)
kwargs['annotations'].update(annotations or {})
# check if we have all the required annotations
if all(key in kwargs['annotations'] for key in self.required_keys):
break
else: # this else is for the for loop
# we don't want to return half of the annotations
kwargs['annotations'] = None
if self.only_required_keys:
for key in list(kwargs['annotations'].keys()):
if key not in self.required_keys:
del kwargs['annotations'][key]
return kwargs['annotations']
def transform(self, samples, **kwargs):
for sample in samples:
if 'annotations' not in kwargs or kwargs['annotations'] is None:
kwargs['annotations'] = {}
for annotator in self.annotators:
try:
sample = annotator([sample], **kwargs)[0]
except Exception:
logger.debug(
"The annotator `%s' failed to annotate!", annotator,
exc_info=True)
sample.annotations = None
if not sample.annotations:
  • This argument was called samples but it predates bob.pipelines.samples. These are not bob.pipeline.samples and shouldn't work on those by default.

  • Shouldn't FailSafe be a Transformer that accepts Samples from bob.pipelines?

  • Hey, sorry guys, but I don't understand this discussion.

    In my understanding our annotators should be regular scikit-learn Transformers that accept raw data as input and not Sample objects. Am I right?

  • You're right.

  • Yeah, I was wrong about that. The Sample objects should not be passed down to the Transformers.

Please register or sign in to reply
logger.debug(
"Annotator `%s' returned empty annotations.", annotator)
else:
logger.debug("Annotator `%s' succeeded!", annotator)
kwargs['annotations'].update(sample.annotations or {})
# check if we have all the required annotations
if all(key in kwargs['annotations'] for key in self.required_keys):
break
else: # this else is for the for loop
# we don't want to return half of the annotations
kwargs['annotations'] = None
if self.only_required_keys:
for key in list(kwargs['annotations'].keys()):
if key not in self.required_keys:
del kwargs['annotations'][key]
sample.annotations = kwargs['annotations']
return samples
from bob.pipelines import CheckpointWrapper, SampleSet
from bob.pipelines.wrappers import _frmt
from os.path import dirname, isfile, expanduser, join
from os import makedirs
import logging
import json
logger = logging.getLogger(__name__)
class SaveAnnotationsWrapper(CheckpointWrapper):
"""
A specialization of bob.pipelines.CheckpointWrapper that saves annotations.
Saves :py:attr:`~bob.pipelines.Sample.annotations` to the disk instead of
:py:attr:`~bob.pipelines.Sample.data` (default in
:py:class:`~bob.pipelines.CheckpointWrapper`).
The annotations of each sample will be "dumped" with json in a file
corresponding to the one in the original dataset (following the same path
structure, ie. using the :py:attr:`~bob.pipelines.Sample.key` attribute of
each sample).
Parameters
----------
estimator: Annotator Transformer
Transformer that places samples annotations in
:py:attr:`~bob.pipelines.Sample.annotations`.
annotations_dir: str
The root path where the annotations will be saved.
extension: str
The extension of the annotations files [default: ``.json``].
save_func: function
The function used to save each sample [default: :py:func:`json.dump`].
overwrite: bool
when ``True``, will overwrite any existing files. Otherwise, will skip
samples when an annotation file with the same ``key`` exists.
"""
def __init__(
self,
estimator,
annotations_dir,
extension=".json",
save_func=None,
overwrite=False,
**kwargs,
):
save_func = save_func or self._save_json
super(SaveAnnotationsWrapper, self).__init__(
estimator=estimator,
features_dir=annotations_dir,
extension=extension,
save_func=save_func,
**kwargs,
)
self.overwrite = overwrite
def save(self, sample):
"""
Saves one sample's annotations to a file on disk.
Overrides :py:meth:`bob.pipelines.CheckpointWrapper.save`
Parameters
----------
sample: :py:class:`~bob.pipelines.Sample`
One sample containing an :py:attr:`~bob.pipelinessSample.annotations`
attribute.
"""
path = self.make_path(sample)
makedirs(dirname(path), exist_ok=True)
try:
self.save_func(sample.annotations, path)
except Exception as e:
raise RuntimeError(
f"Could not save annotations of {sample}\n"
f"(annotations are: {sample.annotations})\n"
f"during {self}.save"
) from e
def _checkpoint_transform(self, samples, method_name):
"""
Checks if a transform needs to be saved to the disk.
Overrides :py:meth:`bob.pipelines.CheckpointWrapper._checkpoint_transform`
"""
# Transform either samples or samplesets
method = getattr(self.estimator, method_name)
logger.debug(f"{_frmt(self)}.{method_name}")
# if features_dir is None, just transform all samples at once
if self.features_dir is None:
return method(samples)
def _transform_samples(samples):
paths = [self.make_path(s) for s in samples]
should_compute_list = [
p is None or not isfile(p) or self.overwrite
for p in paths
]
skipped_count = len([s for s in should_compute_list if s==False])
if skipped_count != 0:
logger.info(f"Skipping {skipped_count} already existing files.")
# call method on non-checkpointed samples
non_existing_samples = [
s
for s, should_compute in zip(samples, should_compute_list)
if should_compute
]
# non_existing_samples could be empty
computed_features = []
if non_existing_samples:
computed_features = method(non_existing_samples)
# return computed features and checkpointed features
features, com_feat_index = [], 0
for s, p, should_compute in zip(samples, paths, should_compute_list):
if should_compute:
feat = computed_features[com_feat_index]
com_feat_index += 1
# save the computed feature
if p is not None:
self.save(feat)
feat = self.load(s, p)
features.append(feat)
else:
features.append(self.load(s, p))
return features
if isinstance(samples[0], SampleSet):
return [SampleSet(_transform_samples(s.samples), parent=s) for s in samples]
else:
return _transform_samples(samples)
def _save_json(self, annot, path):
"""
Saves the annotations in json format in the file ``path``.
This is the default ``save_func`` if it is not passed as parameters of
:py:class:`~bob.bio.base.annotator.SaveAnnotationsWrapper`.
Parameters
----------
annot: dict
Any dictionary (containing annotations for example).
path: str
A filename pointing in an existing directory.
"""
logger.debug(f"Writing annotations '{annot}' to file '{path}'.")
with open(path, "w") as f:
json.dump(annot, f, indent=1, allow_nan=False)
\ No newline at end of file
from .Annotator import Annotator
from .FailSafe import FailSafe
from .Callable import Callable
from .SaveAnnotationsWrapper import SaveAnnotationsWrapper
# gets sphinx autodoc done right - don't remove it
......@@ -26,6 +27,7 @@ __appropriate__(
Annotator,
FailSafe,
Callable,
SaveAnnotationsWrapper,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
import bob.bio.base.annotator
import logging
import numpy
import time
logger = logging.getLogger(__name__)
class DummyAnnotator(bob.bio.base.annotator.Annotator):
def __init__(self, **kwargs):
super(DummyAnnotator, self).__init__(**kwargs)
def transform(self, sample, **kwargs):
for s in sample:
logger.debug(f"Annotating sample: {s.key}")
s.annotations = {
"time": time.localtime(),
"rand": list(numpy.random.uniform(0,1,2))
}
time.sleep(0.1)
return sample
annotator = DummyAnnotator()
Please register or sign in to reply
\ No newline at end of file
......@@ -56,18 +56,34 @@ class BioFile(bob.db.base.File, _ReprMixin):
self.annotation_extension = annotation_extension or ".json"
self.annotation_type = annotation_type or "json"
def load(self):
def load(self, original_directory=None, original_extension=None):
"""Loads the data at the specified location and using the given extension.
Override it if you need to load differently.
Parameters
----------
original_directory: str (optional)
The path to the root of the dataset structure.
If `None`, will try to use `self.original_directory`.
original_extension: str (optional)
The filename extension of every files in the dataset.
If `None`, will try to use `self.original_extension`.
Returns
-------
object
The loaded data (normally :py:class:`numpy.ndarray`).
"""
if original_directory is None:
original_directory = self.original_directory
if original_extension is None:
original_extension = self.original_extension
# get the path
path = self.make_path(
self.original_directory or "", self.original_extension or ""
original_directory or "", original_extension or ""
)
return bob.io.base.load(path)
......
......@@ -136,7 +136,7 @@ class DatabaseConnector(Database):
return retval
def probes(self, group):
def probes(self, group="dev"):
"""Returns :py:class:`Probe`'s to score biometric references
......
......@@ -201,14 +201,14 @@ def check_valid_pipeline(vanilla_pipeline):
else:
raise ValueError(
f"VanillaBiometricsPipeline.transformer should be instance of either `sklearn.pipeline.Pipeline` or"
"sklearn.base.BaseEstimator, not {vanilla_pipeline.transformer}"
f"sklearn.base.BaseEstimator, not {vanilla_pipeline.transformer}"
)
## Checking the Biometric algorithm
if not isinstance(vanilla_pipeline.biometric_algorithm, BioAlgorithm):
raise ValueError(
f"VanillaBiometricsPipeline.biometric_algorithm should be instance of `BioAlgorithm`"
"not {vanilla_pipeline.biometric_algorithm}"
f"not {vanilla_pipeline.biometric_algorithm}"
)
return True
"""A script to help annotate databases.
"""
import logging
import json
import click
import functools
from os.path import dirname, isfile, expanduser
from os import makedirs
from bob.extension.scripts.click_helper import (
verbosity_option,
ConfigCommand,
ResourceOption,
log_parameters,
)
from bob.pipelines import wrap, ToDaskBag
from bob.bio.base.annotator import SaveAnnotationsWrapper
logger = logging.getLogger(__name__)
......@@ -23,8 +21,8 @@ def annotate_common_options(func):
required=True,
cls=ResourceOption,
entry_point_group="bob.bio.annotator",
help="A callable that takes the database and a sample (biofile) "
"of the database and returns the annotations in a dictionary.",
help="A Transformer instance that takes a series of sample and returns "
"the modified samples with annotations as a dictionary.",
)
@click.option(
"--output-dir",
......@@ -40,6 +38,15 @@ def annotate_common_options(func):
cls=ResourceOption,
help="Whether to overwrite existing annotations.",
)
@click.option(
"--dask-client",
"-l",
"dask_client",
entry_point_group="dask.client",
help="Dask client for the execution of the pipeline. If not specified, "
"uses a single threaded, local Dask Client.",
cls=ResourceOption,
)
@functools.wraps(func)
def wrapper(*args, **kwds):
return func(*args, **kwds)
......@@ -62,18 +69,21 @@ Examples:
required=True,
cls=ResourceOption,
entry_point_group="bob.bio.database",
help="""The database that you want to annotate.""",
help="Biometric Database (class that implements the methods: "
"`background_model_samples`, `references` and `probes`).",
)
@annotate_common_options
@click.option(
"--database-directories-file",
cls=ResourceOption,
default=expanduser("~/.bob_bio_databases.txt"),
help="(Deprecated) To support loading of old databases.",
"--groups",
"-g",
multiple=True,
default=["dev", "eval"],
show_default=True,
help="Biometric Database group that will be annotated.",
)
@annotate_common_options
@verbosity_option(cls=ResourceOption)
def annotate(
database, annotator, output_dir, force, database_directories_file, **kwargs
database, groups, annotator, output_dir, force, dask_client, **kwargs
):
"""Annotates a database.
......@@ -82,21 +92,50 @@ def annotate(
"""
log_parameters(logger)
# Some databases need their original_directory to be replaced
database.replace_directories(database_directories_file)
# Wrapping that will save each sample at {output_dir}/{sample.key}.json
annotator = SaveAnnotationsWrapper(
annotator,
annotations_dir=output_dir,
overwrite=force,
)
biofiles = database.objects(groups=None, protocol=database.protocol)
samples = sorted(biofiles)
# Allows reception of Dask Bags
annotator = wrap(["dask"], annotator)
def reader(biofile):
return annotator.read_original_data(
biofile, database.original_directory, database.original_extension
)
# Transformer that splits the samples into several Dask Bags
to_dask_bags = ToDaskBag()
def make_path(biofile, output_dir):
return biofile.make_path(output_dir, ".json")
return annotate_generic(samples, reader, make_path, annotator, output_dir, force)
logger.debug("Retrieving background model samples from database.")
background_model_samples = database.background_model_samples()
logger.debug("Retrieving references and probes samples from database.")
references_samplesets = []
probes_samplesets = []
for group in groups:
references_samplesets.extend(database.references(group=group))
probes_samplesets.extend(database.probes(group=group))
# Unravels all samples in one list (no SampleSets)
samples = background_model_samples
samples.extend([sample for r in references_samplesets for sample in r.samples])
samples.extend([sample for p in probes_samplesets for sample in p.samples])
# Sets the scheduler to local if no dask_client is specified
if dask_client is not None:
scheduler=dask_client
else:
scheduler="single-threaded"
logger.info(f"Saving annotations in {output_dir}.")
logger.info(f"Annotating {len(samples)} samples...")
dask_bags = to_dask_bags.transform(samples)
annotator.transform(dask_bags).compute(scheduler=scheduler)
if dask_client is not None:
logger.info("Shutdown workers...")
dask_client.shutdown()
logger.info("Done.")
@click.command(
......@@ -107,86 +146,51 @@ Examples:
$ bob bio annotate-samples -vvv config.py -a <annotator> -o /tmp/annotations
You have to define samples, reader, and make_path in a python file (config.py) as in
examples.
You have to define ``samples`` in a python file (config.py) as in examples.
""",
)
@click.option(
"--samples",
required=True,
cls=ResourceOption,
help="A list of all samples that you want to annotate. The list must be sorted or "
"deterministic in consequent calls. This is needed so that this script works "
"correctly on the grid.",
)
@click.option(
"--reader",
required=True,
cls=ResourceOption,
help="A function with the signature of ``data = reader(sample)`` which takes a "
"sample and returns the loaded data. The data is given to the annotator.",
)
@click.option(
"--make-path",
required=True,
cls=ResourceOption,
help="A function with the signature of ``path = make_path(sample, output_dir)`` "
"which takes a sample and output_dir and returns the unique path for that sample "
"to be saved in output_dir. The extension of the path must be '.json'.",
help="A list of all samples that you want to annotate.",
)
@annotate_common_options
@verbosity_option(cls=ResourceOption)
def annotate_samples(
samples, reader, make_path, annotator, output_dir, force, **kwargs
samples, make_path, annotator, output_dir, force, dask_client, **kwargs
):
"""Annotates a list of samples.
This command is very similar to ``bob bio annotate`` except that it works without a
database interface. You only need to provide a list of **sorted** samples to be
annotated and two functions::
This command is very similar to ``bob bio annotate`` except that it works
without a database interface.
"""
log_parameters(logger, ignore=("samples",))
# Wrapping that will save each sample at {output_dir}/{sample.key}.json
annotator = SaveAnnotationsWrapper(
annotator,
annotations_dir=output_dir,
overwrite=force,
)
def reader(sample):
# load data from sample here
# for example:
data = bob.io.base.load(sample)
# data will be given to the annotator
return data
# Allows reception of Dask Bags
annotator = wrap(["dask"], annotator)
def make_path(sample, output_dir):
# create a unique path for this sample in the output_dir
# for example:
return os.path.join(output_dir, str(sample) + ".json")
# Transformer that splits the samples into several Dask Bags
to_dask_bags = ToDaskBag()
Please note that your samples must be a list and must be sorted!
"""
log_parameters(logger, ignore=("samples",))
logger.debug("len(samples): %d", len(samples))
return annotate_generic(samples, reader, make_path, annotator, output_dir, force)
def annotate_generic(samples, reader, make_path, annotator, output_dir, force):
total = len(samples)
logger.info("Saving annotations in %s", output_dir)
logger.info("Annotating %d samples ...", total)
for i, sample in enumerate(samples):
outpath = make_path(sample, output_dir)
if not outpath.endswith(".json"):
outpath += ".json"
if isfile(outpath):
if force:
logger.info("Overwriting the annotations file `%s'", outpath)
else:
logger.info("The annotation `%s' already exists", outpath)
continue
logger.info(
"Extracting annotations for sample %d out of %d: %s", i + 1, total, outpath
)
data = reader(sample)
annot = annotator(data)
makedirs(dirname(outpath), exist_ok=True)
with open(outpath, "w") as f:
json.dump(annot, f, indent=1, allow_nan=False)
if dask_client is not None:
scheduler=dask_client
else:
scheduler="single-threaded"
logger.info(f"Saving annotations in {output_dir}")
logger.info(f"Annotating {len(samples)} samples...")
dask_bags = to_dask_bags.transform(samples)
annotator.transform(dask_bags).compute(scheduler=scheduler)
if dask_client is not None:
logger.info("Shutdown workers...")
dask_client.shutdown()
logger.info("Done.")
from random import random
from bob.bio.base.annotator import FailSafe, Callable
from bob.bio.base.annotator import FailSafe, Annotator
def simple_annotator(image, **kwargs):
return {