Commit f994c1af authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

[dask] Preparing bob.bio.base for dask pipelines

[ci] Accepting private packages TEMPORARILY

[conda] Removed entry-points

[ci] Reverting commit
parent 8c353346
......@@ -3,10 +3,7 @@ from . import database
from . import preprocessor
from . import extractor
from . import algorithm
from . import tools
from . import grid # only one file, not complete directory
from . import annotator
from . import baseline
from . import script
from . import test
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from .. import resource_keys, load_resource
def search_preprocessor(db_name, keys):
"""
Wrapper that searches for preprocessors for specific databases.
If not found, the default preprocessor is returned
"""
for k in keys:
if db_name.startswith(k):
return k
else:
return "default"
def get_available_databases():
"""
Get all the available databases through the database entry-points
"""
available_databases = dict()
all_databases = resource_keys('database', strip=[])
for database in all_databases:
try:
database_entry_point = load_resource(database, 'database')
available_databases[database] = dict()
# Checking if the database has data for the ZT normalization
available_databases[database]["has_zt"] = hasattr(database_entry_point, "zobjects") and hasattr(database_entry_point, "tobjects")
available_databases[database]["groups"] = []
# Searching for database groups
try:
groups = list(database_entry_point.groups()) or ["dev"]
for g in ["dev", "eval"]:
available_databases[database]["groups"] += [g] if g in groups else []
except Exception:
# In case the method groups is not implemented
available_databases[database]["groups"] = ["dev"]
except Exception:
pass
return available_databases
class Baseline(object):
"""
Base class to define baselines
A Baseline is composed by the triplet
:any:`bob.bio.base.preprocessor.Preprocessor`,
:any:`bob.bio.base.extractor.Extractor`, and
:any:`bob.bio.base.algorithm.Algorithm`
Attributes
----------
name : str
Name of the baseline. This name will be displayed in the command line
interface.
preprocessors : dict
Dictionary containing all possible preprocessors
extractor : str
Registered resource or a config file containing the feature extractor
algorithm : str
Registered resource or a config file containing the algorithm
"""
def __init__(self, name, preprocessors, extractor, algorithm, **kwargs):
super(Baseline, self).__init__(**kwargs)
self.name = name
self.preprocessors = preprocessors
self.extractor = extractor
self.algorithm = algorithm
from .Baseline import Baseline, search_preprocessor, get_available_databases
def get_config():
"""Returns a string containing the configuration information.
"""
import bob.extension
return bob.extension.get_config(__name__)
# gets sphinx autodoc done right - don't remove it
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is
shortened. Parameters:
*args: An iterable of objects to modify
Resolves `Sphinx referencing issues
<https://github.com/sphinx-doc/sphinx/issues/3048>`
"""
for obj in args:
obj.__module__ = __name__
__appropriate__(
Baseline,
)
__all__ = [_ for _ in dir() if not _.startswith('_')]
import bob.bio.base
# define a queue with demanding parameters
grid = bob.bio.base.grid.Grid(
training_queue = '32G',
# preprocessing
preprocessing_queue = '4G-io-big',
# feature extraction
extraction_queue = '8G-io-big',
# feature projection
projection_queue = '8G-io-big',
# model enrollment
enrollment_queue = '8G-io-big',
# scoring
scoring_queue = '8G-io-big'
)
import bob.bio.base
# define a queue with demanding parameters
grid = bob.bio.base.grid.Grid(
training_queue = 'GPU',
# preprocessing
preprocessing_queue = '4G',
# feature extraction
extraction_queue = 'GPU',
# feature projection
projection_queue = '4G',
# model enrollment
enrollment_queue = '4G',
# scoring
scoring_queue = '4G'
)
import bob.bio.base
# define the queue using all the default parameters
grid = bob.bio.base.grid.Grid()
import bob.bio.base
# define the queue using all the default parameters
grid = bob.bio.base.grid.Grid(
grid_type = 'local',
number_of_parallel_processes = 4
)
# define a queue that is highly parallelized
grid_p8 = bob.bio.base.grid.Grid(
grid_type = 'local',
number_of_parallel_processes = 8
)
# define a queue that is highly parallelized
grid_p16 = bob.bio.base.grid.Grid(
grid_type = 'local',
number_of_parallel_processes = 16
)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Manuel Guenther <Manuel.Guenther@idiap.ch>
# @date: Tue Oct 2 12:12:39 CEST 2012
import six
PREDEFINED_QUEUES = {
'default' : {},
'2G' : {'queue' : 'all.q', 'memfree' : '2G'},
'4G' : {'queue' : 'all.q', 'memfree' : '4G'},
'4G-q1d' : {'queue' : 'q1d', 'memfree' : '4G'},
'4G-io-big' : {'queue' : 'q1d', 'memfree' : '4G', 'io_big' : True},
'8G' : {'queue' : 'q1d', 'memfree' : '8G'},
'8G-io-big' : {'queue' : 'q1d', 'memfree' : '8G', 'io_big' : True},
'16G' : {'queue' : 'q1dm', 'memfree' : '16G', 'pe_opt' : 'pe_mth 2', 'hvmem' : '8G'},
'16G-io-big' : {'queue' : 'q1dm', 'memfree' : '16G', 'pe_opt' : 'pe_mth 2', 'hvmem' : '8G', 'io_big' : True},
'32G' : {'queue' : 'q1dm', 'memfree' : '32G', 'pe_opt' : 'pe_mth 4', 'hvmem' : '8G', 'io_big' : True},
'64G' : {'queue' : 'q1dm', 'memfree' : '56G', 'pe_opt' : 'pe_mth 8', 'hvmem' : '7G', 'io_big' : True},
'Week' : {'queue' : 'q1wm', 'memfree' : '32G', 'pe_opt' : 'pe_mth 4', 'hvmem' : '8G'},
'GPU' : {'queue' : 'gpu'}
}
from . import utils
class Grid (object):
"""This class is defining the options that are required to submit parallel jobs to the SGE grid, or jobs to the local queue.
If the given ``grid_type`` is ``'sge'`` (the default), this configuration is set up to submit algorithms to the SGE grid.
In this setup, specific SGE queues can be specified for different steps of the tool chain, and different numbers of parallel processes can be specified for each step.
Currently, only the SGE at Idiap_ is tested and supported, for other SGE's we do not assure compatibility.
If the given ``grid_type`` is ``'local'``, this configuration is set up to run using a local scheduler on a single machine.
In this case, only the ``number_of_parallel_processes`` and ``scheduler_sleep_time`` options will be taken into account.
**Parameters:**
grid_type : one of ``('sge', 'local')``
The type of submission system, which should be used.
Currently, only sge and local submissions are supported.
number_of_preprocessing_jobs, number_of_extraction_jobs, number_of_projection_jobs, number_of_enrollment_jobs, number_of_scoring_jobs : int
Only valid if ``grid_type = 'sge'``.
The number of parallel processes that should be executed for preprocessing, extraction, projection, enrollment or scoring.
training_queue, preprocessing_queue, extraction_queue, projection_queue, enrollment_queue, scoring_queue : str or dict
Only valid if ``grid_type = 'sge'``.
SGE queues that should be used for training, preprocessing, extraction, projection, enrollment or scoring.
The queue can be defined using a dictionary of keywords that will directly passed to the :py:func:`gridtk.tools.qsub` function, or one of our :py:data:`PREDEFINED_QUEUES`, which are adapted for Idiap_.
number_of_parallel_processes : int
Only valid if ``grid_type = 'local'``.
The number of parallel processes, with which the preprocessing, extraction, projection, enrollment and scoring should be executed.
scheduler_sleep_time : float
The time (in seconds) that the local scheduler will sleep between its iterations.
"""
def __init__(
self,
# grid type, currently supported 'local' and 'sge'
grid_type = 'sge',
# parameters for the splitting of jobs into array jobs; ignored by the local scheduler
number_of_preprocessing_jobs = 32,
number_of_extraction_jobs = 32,
number_of_projection_jobs = 32,
number_of_enrollment_jobs = 32,
number_of_scoring_jobs = 32,
# queue setup for the SGE grid (only used if grid = 'sge', the default)
training_queue = '8G',
preprocessing_queue = 'default',
extraction_queue = 'default',
projection_queue = 'default',
enrollment_queue = 'default',
scoring_queue = 'default',
# setup of the local submission and execution of job (only used if grid = 'local')
number_of_parallel_processes = 1,
scheduler_sleep_time = 1.0 # sleep time for scheduler in seconds
):
self.grid_type = grid_type
if self.is_local():
self._kwargs = dict(grid_type=grid_type, number_of_parallel_processes=number_of_parallel_processes, scheduler_sleep_time=scheduler_sleep_time)
else:
self._kwargs = dict(
grid_type=grid_type,
number_of_preprocessing_jobs=number_of_preprocessing_jobs, number_of_extraction_jobs=number_of_extraction_jobs, number_of_projection_jobs=number_of_projection_jobs, number_of_enrollment_jobs=number_of_enrollment_jobs,
training_queue=training_queue, preprocessing_queue=preprocessing_queue, extraction_queue=extraction_queue, projection_queue=projection_queue, enrollment_queue=enrollment_queue, scoring_queue=scoring_queue
)
# the numbers
if self.is_local():
self.number_of_preprocessing_jobs = number_of_parallel_processes
self.number_of_extraction_jobs = number_of_parallel_processes
self.number_of_projection_jobs = number_of_parallel_processes
self.number_of_enrollment_jobs = number_of_parallel_processes
self.number_of_scoring_jobs = number_of_parallel_processes
else:
self.number_of_preprocessing_jobs = number_of_preprocessing_jobs
self.number_of_extraction_jobs = number_of_extraction_jobs
self.number_of_projection_jobs = number_of_projection_jobs
self.number_of_enrollment_jobs = number_of_enrollment_jobs
self.number_of_scoring_jobs = number_of_scoring_jobs
# the queues
self.training_queue = self.queue(training_queue)
self.preprocessing_queue = self.queue(preprocessing_queue)
self.extraction_queue = self.queue(extraction_queue)
self.projection_queue = self.queue(projection_queue)
self.enrollment_queue = self.queue(enrollment_queue)
self.scoring_queue = self.queue(scoring_queue)
# the local setup
self.number_of_parallel_processes = number_of_parallel_processes
self.scheduler_sleep_time = scheduler_sleep_time
def __str__(self):
"""Converts this grid configuration into a string, which contains the complete set of parameters."""
return utils.pretty_print(self, self._kwargs)
def queue(self, params):
"""queue(params) -> dict
This helper function translates the given queue parameters to grid options.
When the given ``params`` are a dictionary already, they are simply returned.
If ``params`` is a string, the :py:data:`PREDEFINED_QUEUES` are indexed with them.
If ``params`` is ``None``, or the ``grid_type`` is ``'local'``, an empty dictionary is returned.
"""
if self.is_local():
return {}
if isinstance(params, six.string_types) and params in PREDEFINED_QUEUES:
return PREDEFINED_QUEUES[params]
elif isinstance(params, dict):
return params
elif params is None:
return {}
else:
raise ValueError("The given queue parameters '%s' are not in the predefined queues and neither a dictionary with values." % str(params))
def is_local(self):
"""Returns whether this grid setup should use the local submission or the SGE grid."""
return self.grid_type == 'local'
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
"""Re-usable blocks for legacy bob.bio.base algorithms"""
import os
import copy
import functools
import bob.io.base
from .blocks import DatabaseConnector, SampleLoader
from bob.pipelines.sample.sample import SampleSet, DelayedSample, Sample
class DatabaseConnectorAnnotated(DatabaseConnector):
"""Wraps a bob.bio.base database and generates conforming samples for datasets
that has annotations
This connector allows wrapping generic bob.bio.base datasets and generate
samples that conform to the specifications of biometric pipelines defined
in this package.
Parameters
----------
database : object
An instantiated version of a bob.bio.base.Database object
protocol : str
The name of the protocol to generate samples from.
To be plugged at :py:method:`bob.db.base.Database.objects`.
"""
def __init__(self, database, protocol):
super(DatabaseConnectorAnnotated, self).__init__(database, protocol)
def background_model_samples(self):
"""Returns :py:class:`Sample`'s to train a background model (group
``world``).
Returns
-------
samples : list
List of samples conforming the pipeline API for background
model training. See, e.g., :py:func:`.pipelines.first`.
"""
# TODO: This should be organized by client
retval = []
objects = self.database.objects(protocol=self.protocol, groups="world")
return [
SampleSet(
[
DelayedSample(
load=functools.partial(
k.load,
self.database.original_directory,
self.database.original_extension,
),
id=k.id,
path=k.path,
annotations=self.database.annotations(k)
)
]
)
for k in objects
]
def references(self, group="dev"):
"""Returns :py:class:`Reference`'s to enroll biometric references
Parameters
----------
group : :py:class:`str`, optional
A ``group`` to be plugged at
:py:meth:`bob.db.base.Database.objects`
Returns
-------
references : list
List of samples conforming the pipeline API for the creation of
biometric references. See, e.g., :py:func:`.pipelines.first`.
"""
retval = []
for m in self.database.model_ids_with_protocol(protocol=self.protocol, groups=group):
objects = self.database.objects(
protocol=self.protocol,
groups=group,
model_ids=(m,),
purposes="enroll",
)
retval.append(
SampleSet(
[
DelayedSample(
load=functools.partial(
k.load,
self.database.original_directory,
self.database.original_extension,
),
id=k.id,
path=k.path,
annotations=self.database.annotations(k)
)
for k in objects
],
id=m,
path=str(m),
subject=objects[0].client_id,
)
)
return retval
def probes(self, group):
"""Returns :py:class:`Probe`'s to score biometric references
Parameters
----------
group : str
A ``group`` to be plugged at
:py:meth:`bob.db.base.Database.objects`
Returns
-------
probes : list
List of samples conforming the pipeline API for the creation of
biometric probes. See, e.g., :py:func:`.pipelines.first`.
"""
probes = dict()
for m in self.database.model_ids_with_protocol(protocol=self.protocol, groups=group):
# Getting all the probe objects from a particular biometric
# reference
objects = self.database.objects(
protocol=self.protocol,
groups=group,
model_ids=(m,),
purposes="probe",
)
# Creating probe samples
for o in objects:
if o.id not in probes:
probes[o.id] = SampleSet(
[
DelayedSample(
load=functools.partial(
o.load,
self.database.original_directory,
self.database.original_extension,
),
id=o.id,
path=o.path,
annotations=self.database.annotations(o)
)
],
id=o.id,
path=o.path,
subject=o.client_id,
references=[m],
)
else:
probes[o.id].references.append(m)
return list(probes.values())
class SampleLoaderAnnotated(SampleLoader):
"""Adaptor for loading, preprocessing and feature extracting samples that uses annotations
This adaptor class wraps around sample:
.. code-block:: text
[loading [-> preprocessing [-> extraction]]]
The input sample object must obbey the following (minimal) API:
* attribute ``id``: Contains an unique (string-fiable) identifier for
processed samples
* attribute ``data``: Contains the data for this sample
Optional checkpointing is also implemented for each of the states,
independently. You may check-point just the preprocessing, feature
extraction or both.
Parameters
----------
pipeline : :py:class:`list` of (:py:class:`str`, callable)
A list of doubles in which the first entry are names of each processing
step in the pipeline and second entry must be default-constructible
:py:class:`bob.bio.base.preprocessor.Preprocessor` or
:py:class:`bob.bio.base.preprocessor.Extractor` in any order. Each
of these objects must be a python type, that can be instantiated and
used through its ``__call__()`` interface to process a single entry of
a sample. For python types that you may want to plug-in, but do not
offer a default constructor that you like, pass the result of
:py:func:`functools.partial` instead.
"""
def __init__(self, pipeline):
super(SampleLoaderAnnotated, self).__init__(pipeline)
def _handle_step(self, sset, func, checkpoint):
"""Handles a single step in the pipeline, with optional checkpointing
Parameters
----------
sset : SampleSet
The original sample set to be processed (delayed or pre-loaded)
func : callable
The processing function to call for processing **each** sample in
the set, if needs be
checkpoint : str, None
An optional string that may point to a directory that will be used
for checkpointing the processing phase in question
Returns
-------
r : SampleSet
The prototype processed sample. If no checkpointing required, this
will be of type :py:class:`Sample`. Otherwise, it will be a
:py:class:`DelayedSample`
"""
if checkpoint is not None:
samples = [] # processed samples
for s in sset.samples:
# there can be a checkpoint for the data to be processed
candidate = os.path.join(checkpoint, s.path + ".hdf5")
if not os.path.exists(candidate):
# TODO: Fix this on bob.bio.base
try:
# preprocessing is required, and checkpointing, do it now
data = func(