Commit acc2f34d authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Solved issue when the function is not picklable

parent c840367f
Pipeline #37644 failed with stage
in 7 minutes and 18 seconds
......@@ -274,7 +274,7 @@ class SampleLoaderAnnotated(SampleLoader):
try:
# preprocessing is required, and checkpointing, do it now
data = func(s.data, annotations=s.annotations)
except:
except:
data = func(s.data)
......@@ -294,15 +294,17 @@ class SampleLoaderAnnotated(SampleLoader):
# because we are checkpointing, we return a DelayedSample
# instead of normal (preloaded) sample. This allows the next
# phase to avoid loading it would it be unnecessary (e.g. next
# phase is already check-pointed)
# phase is already check-pointed)
#reader = bob.io.base.load
reader = (
getattr(func, "read_data")
if hasattr(func, "read_data")
else getattr(func, "read_feature")
)
reader = reader.__func__ # The reader object might not be picklable
samples.append(
DelayedSample(
functools.partial(reader, candidate), parent=s
functools.partial(reader, None, candidate), parent=s
)
)
else:
......
......@@ -107,8 +107,9 @@ class SampleLoader:
if hasattr(func, "read_data")
else getattr(func, "read_feature")
)
reader = reader.__func__ # The reader object might not be picklable
samples.append(
DelayedSample(functools.partial(reader, candidate), parent=s)
DelayedSample(functools.partial(reader, None, candidate), parent=s)
)
else:
# if checkpointing is not required, load the data and preprocess it
......
......@@ -9,7 +9,7 @@ import functools
import bob.io.base
from bob.pipelines.sample.sample import DelayedSample, SampleSet, Sample
import numpy
class DatabaseConnector:
"""Wraps a bob.bio.base database and generates conforming samples
......@@ -316,17 +316,17 @@ class AlgorithmAdaptor:
self.load()
if self.model.requires_projector_training:
return self.model.enroll(
[self.model.project(s.data) for s in k.samples]
numpy.array([self.model.project(s.data) for s in k.samples])
)
else:
return [s.data for s in k.samples]
return self.model.enroll(numpy.array([s.data for s in k.samples]))
def write_enrolled(self, k, path):
self.model.write_model(k, path)
model = _CachedModel(self.algorithm, path)
retval = []
retval = []
for k in references:
if checkpoint is not None:
candidate = os.path.join(os.path.join(checkpoint, k.path + ".hdf5"))
......@@ -334,7 +334,7 @@ class AlgorithmAdaptor:
# create new checkpoint
bob.io.base.create_directories_safe(os.path.dirname(candidate))
enrolled = model.enroll(k)
model.model.write_model(enrolled, candidate)
model.model.write_model(enrolled, candidate)
retval.append(
DelayedSample(
functools.partial(model.model.read_model, candidate), parent=k
......
......@@ -108,7 +108,7 @@ def biometric_pipeline(
## Create biometric samples
biometric_references = create_biometric_reference(background_model,references,loader,algorithm,npartitions,checkpoints)
## Scores all probes
## Scores all probes
return compute_scores(background_model, biometric_references, probes, loader, algorithm, npartitions, checkpoints)
......@@ -354,12 +354,13 @@ def compute_scores(
db = dask.bag.from_sequence(probes, npartitions=npartitions)
db = db.map_partitions(loader, checkpoints.get("probes", {}))
## TODO: Here, we are sending all computed biometric references to all
## probes. It would be more efficient if only the models related to each
## probe are sent to the probing split. An option would be to use caching
## and allow the ``score`` function above to load the required data from
## the disk, directly. A second option would be to generate named delays
## for each model and then associate them here.
all_references = dask.delayed(list)(references)
## for each model and then associate them here.
all_references = dask.delayed(list)(references)
return db.map_partitions(algorithm.score, all_references, background_model)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment