diff --git a/bob/bio/base/__init__.py b/bob/bio/base/__init__.py index a2c2e098cd431ac6554e7abed3531e0bb956a301..fece5d108fd0c281ef7e5df3e669e0716776dcfc 100644 --- a/bob/bio/base/__init__.py +++ b/bob/bio/base/__init__.py @@ -4,7 +4,7 @@ from . import preprocessor from . import extractor from . import algorithm from . import annotator -from . import processor +from . import mixins from . import script from . import test diff --git a/bob/bio/base/config/baselines/pca_atnt.py b/bob/bio/base/config/baselines/pca_atnt.py index 7766f67f89bb014999bb2c7052616689105eebfa..1be261ad4559321999d1f459db2395136dc1c9c6 100644 --- a/bob/bio/base/config/baselines/pca_atnt.py +++ b/bob/bio/base/config/baselines/pca_atnt.py @@ -1,11 +1,28 @@ -from bob.bio.base.pipelines.vanilla_biometrics.legacy import DatabaseConnector, AlgorithmAdaptor +#from bob.bio.base.pipelines.vanilla_biometrics.legacy import DatabaseConnector, AlgorithmAdaptor import bob.db.atnt - +from bob.bio.base.pipelines.vanilla_biometrics.legacy import DatabaseConnector database = DatabaseConnector(bob.db.atnt.Database(), protocol="Default") preprocessor = "face-detect" -extractor = 'linearize' +from sklearn.pipeline import Pipeline, make_pipeline +from sklearn.decomposition import PCA + +from bob.pipelines.mixins import CheckpointMixin, SampleMixin +from bob.bio.base.mixins import CheckpointSampleLinearize + +class CheckpointSamplePCA(CheckpointMixin, SampleMixin, PCA): + """ + Enables SAMPLE and CHECKPOINTIN handling for https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html + """ + pass + +#extractor = make_pipeline([CheckpointSampleLinearize(), CheckpointSamplePCA()]) +from bob.pipelines.mixins import dask_it +extractor = Pipeline(steps=[('0',CheckpointSampleLinearize(features_dir="./example/extractor0")), + ('1',CheckpointSamplePCA(features_dir="./example/extractor1", model_path="./example/pca.pkl"))]) +#extractor = dask_it(extractor) -algorithm = 'pca' +from bob.bio.base.pipelines.vanilla_biometrics.comparator import DistanceComparator +algorithm = DistanceComparator() diff --git a/bob/bio/base/processor/__init__.py b/bob/bio/base/mixins/__init__.py similarity index 61% rename from bob/bio/base/processor/__init__.py rename to bob/bio/base/mixins/__init__.py index 729af155f8f3f72cd1e3805f4a6efe40e2787dd7..62acd81c13d30ee5eb3adb9526d466727af2bfda 100644 --- a/bob/bio/base/processor/__init__.py +++ b/bob/bio/base/mixins/__init__.py @@ -1,2 +1,2 @@ from .linearize import Linearize, SampleLinearize, CheckpointSampleLinearize -from .pca import CheckpointSamplePCA, SamplePCA +#from .pca import CheckpointSamplePCA, SamplePCA diff --git a/bob/bio/base/processor/linearize.py b/bob/bio/base/mixins/linearize.py similarity index 95% rename from bob/bio/base/processor/linearize.py rename to bob/bio/base/mixins/linearize.py index 4815f468050cdc55f3c88034bb086827f74c48b0..9d676a476e7116fc6c181a866eda69ade8e46446 100644 --- a/bob/bio/base/processor/linearize.py +++ b/bob/bio/base/mixins/linearize.py @@ -3,7 +3,7 @@ # @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch> -from bob.pipelines.processor import CheckpointMixin, SampleMixin +from bob.pipelines.mixins import CheckpointMixin, SampleMixin from sklearn.base import TransformerMixin from sklearn.utils.validation import check_array, check_is_fitted import numpy diff --git a/bob/bio/base/processor/pca.py b/bob/bio/base/mixins/pca.py similarity index 100% rename from bob/bio/base/processor/pca.py rename to bob/bio/base/mixins/pca.py diff --git a/bob/bio/base/pipelines/vanilla_biometrics/comparator.py b/bob/bio/base/pipelines/vanilla_biometrics/comparator.py index 9dafd25757e6997bcb90c903fd9a98ccdf727d17..5d9a3abea1276d80e9c1c214b1f5bdce52ff29c6 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/comparator.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/comparator.py @@ -18,42 +18,8 @@ class Comparator(object): def __init__(self): pass - def _stack_samples_2_ndarray(self, samplesets, stack_per_sampleset=False): - """ - Stack a set of :py:class:`bob.pipelines.sample.sample.SampleSet` - and convert them to :py:class:`numpy.ndarray` - - Parameters - ---------- - - samplesets: :py:class:`bob.pipelines.sample.sample.SampleSet` - Set of samples to be stackted - - stack_per_sampleset: bool - If true will return a list of :py:class:`numpy.ndarray`, each one for a sample set - - """ - - if stack_per_sampleset: - # TODO: Make it more efficient - all_data = [] - for sampleset in samplesets: - all_data.append( - numpy.array([sample.data for sample in sampleset.samples]) - ) - return all_data - else: - return numpy.array( - [ - sample.data - for sampleset in samplesets - for sample in sampleset.samples - ] - ) - - - def enroll_samples( - self, references, background_model=None, checkpoint=None, *args, **kwargs + def _enroll_samples( + self, biometric_references, extractor=None, checkpoint=None, *args, **kwargs ): """This method should implement the sub-pipeline 1 of the Vanilla Biometrics Pipeline :ref:`_vanilla-pipeline-1`. @@ -61,7 +27,7 @@ class Comparator(object): Parameters ---------- - references : list + biometric_references : list A list of :py:class:`SampleSet` objects to be used for creating biometric references. The sets must be identified with a unique id and a path, for eventual checkpointing. @@ -82,42 +48,13 @@ class Comparator(object): """ retval = [] - for k in references: + for k in biometric_references: # compute on-the-fly data = [s.data for s in k.samples] retval.append(Sample(self.enroll(data), parent=k)) return retval - def write_biometric_reference(self, biometric_reference, filename): - """Writes the enrolled model to the given file. - In this base class implementation: - - - If the given model has a 'save' attribute, it calls ``model.save(bob.io.base.HDF5File(model_file), 'w')``. - In this case, the given model_file might be either a file name or a :py:class:`bob.io.base.HDF5File`. - - Otherwise, it uses :py:func:`bob.io.base.save` to do that. - - If you have a different format, please overwrite this function. - - **Parameters:** - - model : object - A model as returned by the :py:meth:`enroll` function, which should be written. - - model_file : str or :py:class:`bob.io.base.HDF5File` - The file open for writing, or the file name to write to. - """ - import h5py - - with h5py.File(filename, "w") as f: - f.create_dataset("biometric_reference", data=biometric_reference) - - def read_biometric_reference(self, filename): - import h5py - - with h5py.File(filename, "r") as f: - data = f["biometric_reference"].value - return data def enroll(self, data, **kwargs): """ @@ -134,7 +71,7 @@ class Comparator(object): raise NotImplemented("Please, implement me") - def score_samples(self, probes, references, background_model=None, *args, **kwargs): + def _score_samples(self, probes, biometric_references, extractor=None, *args, **kwargs): """Scores a new sample against multiple (potential) references Parameters @@ -144,12 +81,12 @@ class Comparator(object): A list of :py:class:`SampleSet` objects to be used for scoring the input references - references : list + biometric_references : list A list of :py:class:`Sample` objects to be used for scoring the input probes, must have an ``id`` attribute that will be used to cross-reference which probes need to be scored. - background_model : + extractor : Path pointing to stored model on disk *args, **kwargs : @@ -176,7 +113,7 @@ class Comparator(object): for subprobe_id, (s, parent) in enumerate(zip(data, p.samples)): # each sub-probe in the probe needs to be checked subprobe_scores = [] - for ref in [r for r in references if r.key in p.references]: + for ref in [r for r in biometric_references if r.key in p.references]: subprobe_scores.append( Sample(self.score(ref.data, s), parent=ref) ) @@ -214,7 +151,7 @@ import scipy.spatial.distance from sklearn.utils.validation import check_array class DistanceComparator(Comparator): - def __init__(self,distance_function = scipy.spatial.distance.euclidean,factor=1): + def __init__(self,distance_function = scipy.spatial.distance.euclidean,factor=-1): self.distance_function = distance_function self.factor = factor diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index 549b0156c228b5f1df9967d124d0e3f7b9449b76..791111b96ab5cb946e2e9def98c017aa27348358 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -8,7 +8,7 @@ import copy import functools import bob.io.base -from bob.pipelines.sample.sample import DelayedSample, SampleSet, Sample +from bob.pipelines.sample import DelayedSample, SampleSet, Sample import numpy import logging import dask @@ -72,10 +72,11 @@ class DatabaseConnector: self.database.original_directory, self.database.original_extension, ), - id=k.id, + key=str(k.id), path=k.path, ) - ] + ], + key=str(k.client_id) ) for k in objects ] @@ -118,14 +119,14 @@ class DatabaseConnector: self.database.original_directory, self.database.original_extension, ), - id=k.id, + key=str(k.id), path=k.path, ) for k in objects ], - id=m, + key=m, path=str(m), - subject=objects[0].client_id, + subject=str(objects[0].client_id), ) ) @@ -173,13 +174,13 @@ class DatabaseConnector: self.database.original_directory, self.database.original_extension, ), - id=o.id, + key=str(o.id), path=o.path, ) ], - id=o.id, + key=str(o.client_id), path=o.path, - subject=o.client_id, + subject=str(o.client_id), references=[m], ) else: diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py index 767c1f64623450487f62eb00f975fe549e7562fa..2dd1eddc97d4df8d2a8290e30fa4a8a9c751babc 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipeline.py @@ -10,357 +10,68 @@ for bob.bio experiments import dask.bag import dask.delayed - +from bob.pipelines.sample import samplesets_to_samples, transform_sample_sets def biometric_pipeline( background_model_samples, - references, - probes, - loader, - algorithm, - npartitions, - checkpoints, + biometric_reference_samples, + probe_samples, + extractor, + biometric_algorithm, ): - """Creates a simple pipeline for **biometric** experiments. - - This contains the following steps: - - 1. Train background model (without labels) - 2. Create biometric references (requires identity) - 3. Scoring (requires probe/reference matching and probe identity) - - - Parameters - ---------- - - background_model_samples : list - List of samples to be used for training an background model. Elements - provided must conform to the :py:class:`.samples.Sample` API, or be a - delayed version of such. - - references : list - List of references to be created in this biometric pipeline. Elements - provided must conform to the :py:class:`.samples.Reference` API, or be - a delayed version of such. - - probes : list - List of probes to be scored in this biometric pipeline. Elements - provided must conform to the :py:class:`.samples.Probe` API, or be - a delayed version of such. - - loader : object - An object that conforms to the :py:class:`.blocks.SampleLoader` API and - can load samples - - algorithm : object - An object that conforms to the :py:class:`.blocks.AlgorithmAdaptor` API - - npartitions : :py:class:`int`, optional - Number of partitions to use when processing this pipeline. Notice that - the number of partitions dictate how many preprocessor/feature - extraction/algorithms objects will be effectively initialized (that is, - will have their constructor called). Internally, we use - :py:class:`dask.bag`'s and :py:meth:`dask.bag.map_partitions` to - process one full partition in a single pass. - - checkpoints : :py:class:`dict` - A dictionary that maps processing phase names to paths that will be - used to create checkpoints of the different processing phases in this - pipeline. Checkpointing may speed up your processing. Existing files - that have been previously check-pointed will not be recomputed. - - Here is an example with all supported options for this pipeline: - - .. code-block:: python - - checkpoints = { - "background": { - "preprocessor": os.path.join("background", "preprocessed"), - "extractor": os.path.join("background", "extracted"), - # at least, the next stage must be provided! - "model": os.path.join("background", "model"), - }, - "references": { - "preprocessor": os.path.join("references", "preprocessed"), - "extractor": os.path.join("references", "extracted"), - "enrolled": os.path.join("references", "enrolled"), - }, - "probes": { - "preprocessor": os.path.join("probes", "preprocessed"), - "extractor": os.path.join("probes", "extracted"), - }, - } - - - - Returns - ------- - - scores: list - A delayed list of scores, that can be obtained by computing the graph - - """ ## Training background model (fit will return even if samples is ``None``, ## in which case we suppose the algorithm is not trainable in any way) - background_model = train_background_model(background_model_samples, loader, algorithm, npartitions, checkpoints) - - ## Create biometric samples - biometric_references = create_biometric_reference(background_model,references,loader,algorithm,npartitions,checkpoints) - - ## Scores all probes - return compute_scores(background_model, biometric_references, probes, loader, algorithm, npartitions, checkpoints) - - -def train_background_model( - background_model_samples, - loader, - algorithm, - npartitions, - checkpoints, -): - """ - Train background model (without labels) :ref:`_vanilla-pipeline-1` - - Parameters - ---------- - - background_model_samples : list - List of samples to be used for training an background model. Elements - provided must conform to the :py:class:`.samples.Sample` API, or be a - delayed version of such. - - loader : object - An object that conforms to the :py:class:`.blocks.SampleLoader` API and - can load samples - - algorithm : object - An object that conforms to the :py:class:`.blocks.AlgorithmAdaptor` API - - npartitions : :py:class:`int`, optional - Number of partitions to use when processing this pipeline. Notice that - the number of partitions dictate how many preprocessor/feature - extraction/algorithms objects will be effectively initialized (that is, - will have their constructor called). Internally, we use - :py:class:`dask.bag`'s and :py:meth:`dask.bag.map_partitions` to - process one full partition in a single pass. - - checkpoints : :py:class:`dict` - A dictionary that maps processing phase names to paths that will be - used to create checkpoints of the different processing phases in this - pipeline. Checkpointing may speed up your processing. Existing files - that have been previously check-pointed will not be recomputed. - - Here is an example with all supported options for this pipeline: - - .. code-block:: python - - checkpoints = { - "background": { - "preprocessor": os.path.join("background", "preprocessed"), - "extractor": os.path.join("background", "extracted"), - # at least, the next stage must be provided! - "model": os.path.join("background", "model"), - }, - "references": { - "preprocessor": os.path.join("references", "preprocessed"), - "extractor": os.path.join("references", "extracted"), - "enrolled": os.path.join("references", "enrolled"), - }, - "probes": { - "preprocessor": os.path.join("probes", "preprocessed"), - "extractor": os.path.join("probes", "extracted"), - }, - } - - """ - ## Training background model (fit will return even if samples is ``None``, - ## in which case we suppose the algorithm is not trainable in any way) - db = dask.bag.from_sequence( - background_model_samples, npartitions=npartitions - ) - db = db.map_partitions(loader, checkpoints.get("background", {})) - background_model = dask.delayed(algorithm.fit)( - db, checkpoints["background"]["model"] + extractor = train_background_model( + background_model_samples, + extractor ) - return background_model - - -def create_biometric_reference( - background_model, - references, - loader, - algorithm, - npartitions, - checkpoints, -): - """ - Create biometric references :ref:`_vanilla-pipeline-2` - - Parameters - ---------- - - background_model: dask.delayed - Trained background model (trained with :py:meth:`train_background_model`) - - references : list - List of references to be created in this biometric pipeline. Elements - provided must conform to the :py:class:`.samples.Reference` API, or be - a delayed version of such. - - loader : object - An object that conforms to the :py:class:`.blocks.SampleLoader` API and - can load samples - - algorithm : object - An object that conforms to the :py:class:`.blocks.AlgorithmAdaptor` API - - npartitions : :py:class:`int`, optional - Number of partitions to use when processing this pipeline. Notice that - the number of partitions dictate how many preprocessor/feature - extraction/algorithms objects will be effectively initialized (that is, - will have their constructor called). Internally, we use - :py:class:`dask.bag`'s and :py:meth:`dask.bag.map_partitions` to - process one full partition in a single pass. - - checkpoints : :py:class:`dict` - A dictionary that maps processing phase names to paths that will be - used to create checkpoints of the different processing phases in this - pipeline. Checkpointing may speed up your processing. Existing files - that have been previously check-pointed will not be recomputed. - - Here is an example with all supported options for this pipeline: - - .. code-block:: python - - checkpoints = { - "background": { - "preprocessor": os.path.join("background", "preprocessed"), - "extractor": os.path.join("background", "extracted"), - # at least, the next stage must be provided! - "model": os.path.join("background", "model"), - }, - "references": { - "preprocessor": os.path.join("references", "preprocessed"), - "extractor": os.path.join("references", "extracted"), - "enrolled": os.path.join("references", "enrolled"), - }, - "probes": { - "preprocessor": os.path.join("probes", "preprocessed"), - "extractor": os.path.join("probes", "extracted"), - }, - } - - Returns - ------- - - Return trained models - - - """ - - ## Enroll biometric references - db = dask.bag.from_sequence(references, npartitions=npartitions) - db = db.map_partitions(loader, checkpoints.get("references", {})) - references = db.map_partitions( - algorithm.enroll, - background_model, - checkpoints.get("references", {}).get("enrolled"), + ## Create biometric samples + biometric_references = create_biometric_reference( + biometric_reference_samples, extractor, biometric_algorithm ) - return references + ## Scores all probes + return compute_scores( + probe_samples, + biometric_references, + extractor, + biometric_algorithm, + ) -def compute_scores( - background_model, - references, - probes, - loader, - algorithm, - npartitions, - checkpoints, -): - """ Compute biometric scores :ref:`_vanilla-pipeline-2` - - Parameters - ---------- - - background_model: dask.delayed - Trained background model (trained with :py:meth:`train_background_model`) - - references: dask.delayed - Trained biometric references - - probes : list - List of probes to be scored in this biometric pipeline. Elements - provided must conform to the :py:class:`.samples.Probe` API, or be - a delayed version of such. - - loader : object - An object that conforms to the :py:class:`.blocks.SampleLoader` API and - can load samples - - algorithm : object - An object that conforms to the :py:class:`.blocks.AlgorithmAdaptor` API - - npartitions : :py:class:`int`, optional - Number of partitions to use when processing this pipeline. Notice that - the number of partitions dictate how many preprocessor/feature - extraction/algorithms objects will be effectively initialized (that is, - will have their constructor called). Internally, we use - :py:class:`dask.bag`'s and :py:meth:`dask.bag.map_partitions` to - process one full partition in a single pass. - - checkpoints : :py:class:`dict` - A dictionary that maps processing phase names to paths that will be - used to create checkpoints of the different processing phases in this - pipeline. Checkpointing may speed up your processing. Existing files - that have been previously check-pointed will not be recomputed. - - Here is an example with all supported options for this pipeline: - - .. code-block:: python - checkpoints = { - "background": { - "preprocessor": os.path.join("background", "preprocessed"), - "extractor": os.path.join("background", "extracted"), - # at least, the next stage must be provided! - "model": os.path.join("background", "model"), - }, - "references": { - "preprocessor": os.path.join("references", "preprocessed"), - "extractor": os.path.join("references", "extracted"), - "enrolled": os.path.join("references", "enrolled"), - }, - "probes": { - "preprocessor": os.path.join("probes", "preprocessed"), - "extractor": os.path.join("probes", "extracted"), - }, - } +def train_background_model(background_model_samples, extractor): + # TODO: Maybe here is supervised + X, y = samplesets_to_samples(background_model_samples) + extractor = extractor.fit(X, y=y) - Returns - ------- + return extractor - scores: list - A delayed list of scores, that can be obtained by computing the graph - """ +def create_biometric_reference( + biometric_reference_samples, extractor, biometric_algorithm +): + + biometric_reference_features = transform_sample_sets(extractor, biometric_reference_samples) + + # features is a list of SampleSets + biometric_references = biometric_algorithm._enroll_samples(biometric_reference_features) + + # models is a list of Samples + return biometric_references - ## Scores all probes - db = dask.bag.from_sequence(probes, npartitions=npartitions) - db = db.map_partitions(loader, checkpoints.get("probes", {})) +def compute_scores(probe_samples, biometric_references, extractor, algorithm): + # probes is a list of SampleSets + probe_features = transform_sample_sets(extractor, probe_samples) + # models is a list of Samples + # features is a list of SampleSets - ## TODO: Here, we are sending all computed biometric references to all - ## probes. It would be more efficient if only the models related to each - ## probe are sent to the probing split. An option would be to use caching - ## and allow the ``score`` function above to load the required data from - ## the disk, directly. A second option would be to generate named delays - ## for each model and then associate them here. - all_references = dask.delayed(list)(references) - return db.map_partitions(algorithm.score, all_references, background_model, checkpoints.get("probes", {}).get("scores") ) + scores = algorithm._score_samples(probe_features, biometric_references, extractor) + # scores is a list of Samples + return scores diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index a523d6986ce768509159e29d3d9ed1ae729f2ec3..d0be19a650f18bdad55ce83dac688b6d8517060d 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -11,7 +11,7 @@ import functools import click from bob.extension.scripts.click_helper import verbosity_option, ResourceOption, ConfigCommand -from bob.pipelines.sample.sample import DelayedSample, Sample +from bob.pipelines.sample import DelayedSample, Sample import logging logger = logging.getLogger(__name__) @@ -57,14 +57,6 @@ TODO: Work out this help entry_point_group='bob.pipelines.config', cls=ConfigCommand, epilog=EPILOG, ) -@click.option( - "--preprocessor", - "-p", - required=True, - cls=ResourceOption, - entry_point_group="bob.bio.preprocessor", # This should be linked to bob.bio.base - help="Data preprocessing algorithm", -) @click.option( "--extractor", "-e", @@ -92,14 +84,11 @@ TODO: Work out this help @click.option( "--dask-client", "-l", - required=True, + required=False, cls=ResourceOption, entry_point_group="bob.pipelines.client", # This should be linked to bob.bio.base help="Dask client for the execution of the pipeline.", ) -@click.option( - "--checkpointing", "-c", is_flag=True, help="Save checkpoints in this experiment?" -) @click.option( "--group", "-g", @@ -117,12 +106,10 @@ TODO: Work out this help ) @verbosity_option(cls=ResourceOption) def vanilla_biometrics( - preprocessor, extractor, algorithm, database, dask_client, - checkpointing, group, output, **kwargs @@ -180,38 +167,6 @@ def vanilla_biometrics( if not os.path.exists(output): os.makedirs(output) - if checkpointing: - checkpoints = { - "background": { - "preprocessor": os.path.join(output, "background", "preprocessed"), - "extractor": os.path.join(output, "background", "extracted"), - # at least, the next stage must be provided! - "model": os.path.join(output, "background", "model"), - }, - "references": { - "preprocessor": os.path.join(output, "references", "preprocessed"), - "extractor": os.path.join(output, "references", "extracted"), - "enrolled": os.path.join(output, "references", "enrolled"), - }, - "probes": { - "preprocessor": os.path.join(output, "probes", "preprocessed"), - "extractor": os.path.join(output, "probes", "extracted"), - "scores": os.path.join(output, "probes", "scores"), - }, - - } - - - # Defines the processing pipeline for loading samples - # Can add any number of steps! - pipeline = [("preprocessor",preprocessor), - ("extractor", extractor)] - - # Mechanism that loads samples - # from ..bob_bio.blocks import SampleLoader - from bob.bio.base.pipelines.vanilla_biometrics.annotated_legacy import SampleLoaderAnnotated as SampleLoader - loader = SampleLoader(pipeline) - for g in group: with open(os.path.join(output,f"scores-{g}"), "w") as f: @@ -221,21 +176,21 @@ def vanilla_biometrics( database.background_model_samples(), biometric_references, database.probes(group=g), - loader, + extractor, algorithm, - npartitions=len(dask_client.cluster.workers), - checkpoints=checkpoints, + ) - # result.visualize(os.path.join(output, "graph.pdf"), rankdir="LR") - result = result.compute(scheduler=dask_client) - #result = result.compute(scheduler="single-threaded") + + if dask_client is not None: + result = result.compute(scheduler=dask_client) + result = result.compute(scheduler="single-threaded") #import ipdb; ipdb.set_trace() for probe in result: for sample in probe.samples: if isinstance(sample, Sample): - line = "{0} {1} {2} {3}\n".format(reference.subject, probe.subject, probe.path, reference.data) + line = "{0} {1} {2} {3}\n".format(sample.key, probe.key, probe.path, sample.data) f.write(line) elif isinstance(sample, DelayedSample): lines = sample.load().readlines() @@ -243,7 +198,7 @@ def vanilla_biometrics( else: raise TypeError("The output of the pipeline is not writeble") - dask_client.shutdown() + #dask_client.shutdown()