Skip to content
Snippets Groups Projects
Commit 6de057b4 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Checkpoint the scores with pickle and attached the score writer to the pipeline

parent d458eba4
Branches master
No related tags found
2 merge requests!188Score normalizations,!180[dask] Preparing bob.bio.base for dask pipelines
Pipeline #39691 passed
......@@ -131,7 +131,6 @@ class BioAlgorithm(metaclass=ABCMeta):
):
"""Given one sampleset for probing, compute the scores and returns a sample set with the scores
"""
scores_biometric_references = []
if allow_scoring_with_all_biometric_references:
# Optimized scoring
......@@ -291,17 +290,10 @@ class ScoreWriter(metaclass=ABCMeta):
for :any:`BioAlgorithm`
"""
def __init__(self, extension=".txt"):
def __init__(self, path, extension=".txt"):
self.path = path
self.extension = extension
@abstractmethod
def write(self, sampleset, path):
pass
@abstractmethod
def read(self, path):
pass
@abstractmethod
def concatenate_write_scores(self, sampleset, path):
pass
......@@ -16,7 +16,7 @@ from bob.io.base import HDF5File
from bob.pipelines import DelayedSample, SampleSet, Sample
import logging
import copy
import pickle
from .score_writers import FourColumnsScoreWriter
from bob.bio.base.algorithm import Algorithm
......@@ -181,13 +181,7 @@ class BioAlgorithmLegacy(BioAlgorithm):
"""
def __init__(
self,
instance,
base_dir,
force=False,
projector_file=None,
score_writer=FourColumnsScoreWriter(),
**kwargs,
self, instance, base_dir, force=False, projector_file=None, **kwargs,
):
super().__init__(**kwargs)
......@@ -207,7 +201,6 @@ class BioAlgorithmLegacy(BioAlgorithm):
self.biometric_reference_dir = os.path.join(base_dir, "biometric_references")
self._biometric_reference_extension = ".hdf5"
self.score_dir = os.path.join(base_dir, "scores")
self.score_writer = score_writer
self.force = force
def load_legacy_background_model(self):
......@@ -257,20 +250,36 @@ class BioAlgorithmLegacy(BioAlgorithm):
return delayed_enrolled_sample
def write_scores(self, samples, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
open(path, "wb").write(pickle.dumps(samples))
def _score_sample_set(
self,
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=False,
):
path = os.path.join(self.score_dir, str(sampleset.key))
# Computing score
scored_sample_set = super()._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
def _load(path):
return pickle.loads(open(path, "rb").read())
path = os.path.join(self.score_dir, str(sampleset.key) + ".pkl")
scored_sample_set = self.score_writer.write(scored_sample_set, path)
if self.force or not os.path.exists(path):
# Computing score
scored_sample_set = super()._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
self.write_scores(scored_sample_set.samples, path)
scored_sample_set = SampleSet(
[DelayedSample(functools.partial(_load, path), parent=sampleset)],
parent=sampleset,
)
else:
scored_sample_set = SampleSet(_load(path), parent=sampleset)
return scored_sample_set
......@@ -10,6 +10,8 @@ for bob.bio experiments
import logging
import numpy
from .score_writers import FourColumnsScoreWriter
logger = logging.getLogger(__name__)
......@@ -61,12 +63,20 @@ class VanillaBiometricsPipeline(object):
biometric_algorithm: :py:class:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classes.BioAlgorithm`
Biometrics algorithm object that implements the methods `enroll` and `score` methods
score_writer: :any:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classe.ScoreWriter`
Format to write scores. Default to :any:`FourColumnsScoreWriter`
"""
def __init__(self, transformer, biometric_algorithm):
def __init__(
self,
transformer,
biometric_algorithm,
score_writer=FourColumnsScoreWriter("./scores.txt"),
):
self.transformer = transformer
self.biometric_algorithm = biometric_algorithm
self.score_writer = score_writer
def __call__(
self,
......@@ -97,12 +107,17 @@ class VanillaBiometricsPipeline(object):
)
# Scores all probes
return self.compute_scores(
scores = self.compute_scores(
probe_samples,
biometric_references,
allow_scoring_with_all_biometric_references,
)
if self.score_writer is not None:
return self.write_scores(scores)
return scores
def train_background_model(self, background_model_samples):
# background_model_samples is a list of Samples
......@@ -147,6 +162,9 @@ class VanillaBiometricsPipeline(object):
# scores is a list of Samples
return scores
def write_scores(self, scores):
return self.score_writer.write(scores)
class ZNormVanillaBiometricsPipeline(VanillaBiometricsPipeline):
def __init__(self, vanilla_biometrics_pipeline):
......@@ -203,12 +221,9 @@ class ZNormVanillaBiometricsPipeline(VanillaBiometricsPipeline):
)
def compute_znorm_scores(self, zprobe_samples, probe_scores, biometric_references):
import ipdb; ipdb.set_trace()
z_scores = self.vanilla_biometrics_pipeline.compute_scores(
zprobe_samples, biometric_references
)
pass
......@@ -7,7 +7,7 @@ from bob.pipelines import SampleSet, DelayedSample
from .abstract_classes import ScoreWriter
import functools
import csv
import uuid
class FourColumnsScoreWriter(ScoreWriter):
"""
......@@ -15,51 +15,34 @@ class FourColumnsScoreWriter(ScoreWriter):
:any:`bob.bio.base.score.load.four_column`
"""
def write(self, probe_sampleset, path):
def write(self, probe_sampleset):
"""
Write scores and returns a :any:`bob.pipelines.DelayedSample` containing
the instruction to open the score file
"""
os.makedirs(path, exist_ok=True)
checkpointed_scores = []
lines = [
"{0} {1} {2} {3}\n".format(
biometric_reference.subject,
probe_sampleset.subject,
probe_sampleset.key,
biometric_reference.data,
)
for biometric_reference in probe_sampleset
]
filename = os.path.join(path, str(probe_sampleset.subject)) + ".txt"
open(filename, "w").writelines(lines)
return SampleSet(
[
DelayedSample(
functools.partial(self.read, filename), parent=probe_sampleset
)
],
parent=probe_sampleset,
)
def read(self, path):
"""
Base Instruction to load a score file
"""
return open(path).readlines()
def concatenate_write_scores(self, samplesets, filename):
"""
Given a list of samplsets, write them all in a single file
"""
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, "w")
for sset in samplesets:
for scores in sset:
f.writelines(scores.data)
os.makedirs(self.path, exist_ok=True)
n_lines = 0
filename = os.path.join(self.path, str(uuid.uuid4()) + ".txt")
with open(filename, "w") as f:
for probe in probe_sampleset:
# If it's delayed, load it
if isinstance(probe[0], DelayedSample):
probe.samples = probe.samples[0].data
lines = [
"{0} {1} {2} {3}\n".format(
biometric_reference.subject,
probe.subject,
probe.key,
biometric_reference.data,
)
for biometric_reference in probe
]
n_lines += len(probe)
f.writelines(lines)
return [filename]
class CSVScoreWriter(ScoreWriter):
......@@ -69,35 +52,46 @@ class CSVScoreWriter(ScoreWriter):
Parameters
----------
n_sample_sets:
Number of samplesets in one chunk
path: str
Directory to save the scores
n_sample_sets: int
Number of samplesets in one chunk of scores
exclude_list: list
List of metadata to exclude from the CSV file
"""
def __init__(self, n_sample_sets=1000):
def __init__(
self,
path,
n_sample_sets=1000,
exclude_list=["samples", "key", "data", "load", "_data", "references", "annotations"],
):
super().__init__(path)
self.n_sample_sets = n_sample_sets
self.exclude_list = exclude_list
def write(self, probe_sampleset, path):
def write(self, probe_sampleset):
"""
Write scores and returns a :any:`bob.pipelines.DelayedSample` containing
the instruction to open the score file
"""
exclude_list = ["samples", "key", "data", "load", "_data", "references"]
def create_csv_header(probe_sampleset):
first_biometric_reference = probe_sampleset[0]
probe_dict = dict(
(k, f"probe_{k}")
for k in probe_sampleset.__dict__.keys()
if k not in exclude_list
if k not in self.exclude_list
)
bioref_dict = dict(
(k, f"bio_ref_{k}")
for k in first_biometric_reference.__dict__.keys()
if k not in exclude_list
if k not in self.exclude_list
)
header = (
......@@ -108,23 +102,35 @@ class CSVScoreWriter(ScoreWriter):
)
return header, probe_dict, bioref_dict
os.makedirs(path, exist_ok=True)
checkpointed_scores = []
os.makedirs(self.path, exist_ok=True)
header, probe_dict, bioref_dict = create_csv_header(probe_sampleset[0])
header, probe_dict, bioref_dict = create_csv_header(probe_sampleset)
filename = os.path.join(path, str(probe_sampleset.subject)) + ".csv"
with open(filename, "w") as f:
f = None
filename = os.path.join(self.path, str(uuid.uuid4()))
filenames = []
for i, probe in enumerate(probe_sampleset):
if i % self.n_sample_sets == 0:
filename = filename + "_" + f"chunk_{i}.csv"
filenames.append(filename)
if f is not None:
f.close()
del f
csv_write = csv.writer(f)
csv_write.writerow(header)
f = open(filename, "w")
csv_writer = csv.writer(f)
if i == 0:
csv_writer.writerow(header)
rows = []
probe_row = [str(probe_sampleset.key)] + [
str(probe_sampleset.__dict__[k]) for k in probe_dict.keys()
probe_row = [str(probe.key)] + [
str(probe.__dict__[k]) for k in probe_dict.keys()
]
for biometric_reference in probe_sampleset:
# If it's delayed, load it
if isinstance(probe[0], DelayedSample):
probe.samples = probe.samples[0].data
for biometric_reference in probe:
bio_ref_row = [
str(biometric_reference.__dict__[k])
for k in list(bioref_dict.keys()) + ["data"]
......@@ -132,45 +138,6 @@ class CSVScoreWriter(ScoreWriter):
rows.append(probe_row + bio_ref_row)
csv_write.writerows(rows)
return SampleSet(
[
DelayedSample(
functools.partial(self.read, filename), parent=probe_sampleset
)
],
parent=probe_sampleset,
)
def read(self, path):
"""
Base Instruction to load a score file
"""
return open(path).readlines()
def concatenate_write_scores(self, samplesets, filename):
"""
Given a list of samplsets, write them all in a single file
"""
# CSV files tends to be very big
# here, here we write them in chunks
base_dir = os.path.splitext(filename)[0]
os.makedirs(base_dir, exist_ok=True)
f = None
for i, sset in enumerate(samplesets):
if i % self.n_sample_sets == 0:
if f is not None:
f.close()
del f
filename = os.path.join(base_dir, f"chunk_{i}.csv")
f = open(filename, "w")
for scores in sset:
if i == 0:
f.writelines(scores.data)
else:
f.writelines(scores.data[1:])
sset.samples = None
csv_writer.writerows(rows)
f.close()
return filenames
from bob.pipelines import DelayedSample
from bob.pipelines import DelayedSample, SampleSet
import bob.io.base
import os
import dask
import functools
from .score_writers import FourColumnsScoreWriter
from .abstract_classes import BioAlgorithm
import pickle
import bob.pipelines as mario
......@@ -23,9 +23,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
extension: str
File extension
score_writer: :any:`bob.bio.base.pipelines.vanilla_biometrics.abstract_classe.ScoreWriter`
Format to write scores. Default to :any:`FourColumnsScoreWriter`
force: bool
If True, will recompute scores and biometric references no matter if a file exists
......@@ -42,7 +39,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
self,
biometric_algorithm,
base_dir,
score_writer=FourColumnsScoreWriter(),
force=False,
**kwargs
):
......@@ -53,7 +49,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
self.biometric_algorithm = biometric_algorithm
self.force = force
self._biometric_reference_extension = ".hdf5"
self.score_writer = score_writer
def enroll(self, enroll_features):
return self.biometric_algorithm.enroll(enroll_features)
......@@ -69,6 +64,10 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
def write_biometric_reference(self, sample, path):
return bob.io.base.save(sample.data, path, create_directories=True)
def write_scores(self, samples, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
open(path, "wb").write(pickle.dumps(samples))
def _enroll_sample_set(self, sampleset):
"""
Enroll a sample set with checkpointing
......@@ -102,19 +101,30 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
"""Given a sampleset for probing, compute the scores and returns a sample set with the scores
"""
# TODO: WE CAN'T REUSE THE ALREADY WRITTEN SCORE FILE FOR LOADING
# UNLESS WE SAVE THE PICKLED SAMPLESET WITH THE SCORES
def _load(path):
return pickle.loads(open(path, "rb").read())
path = os.path.join(self.score_dir, str(sampleset.key))
path = os.path.join(self.score_dir, str(sampleset.key) + ".pkl")
# Computing score
scored_sample_set = self.biometric_algorithm._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
if self.force or not os.path.exists(path):
scored_sample_set = self.score_writer.write(scored_sample_set, path)
# Computing score
scored_sample_set = self.biometric_algorithm._score_sample_set(
sampleset,
biometric_references,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
self.write_scores(scored_sample_set.samples, path)
scored_sample_set = SampleSet(
[
DelayedSample(
functools.partial(_load, path), parent=sampleset
)
],
parent=sampleset,
)
else:
scored_sample_set = SampleSet(_load(path), parent=sampleset)
return scored_sample_set
......@@ -122,9 +132,6 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm):
class BioAlgorithmDaskWrapper(BioAlgorithm):
def __init__(self, biometric_algorithm, **kwargs):
self.biometric_algorithm = biometric_algorithm
# Copying attribute
if hasattr(biometric_algorithm, "score_writer"):
self.score_writer = biometric_algorithm.score_writer
def enroll_samples(self, biometric_reference_features):
......@@ -190,4 +197,10 @@ def dask_vanilla_biometrics(vanila_biometrics_pipeline, npartitions=None):
vanila_biometrics_pipeline.biometric_algorithm
)
def _write_scores(scores):
return scores.map_partitions(vanila_biometrics_pipeline.write_scores_on_dask)
vanila_biometrics_pipeline.write_scores_on_dask = vanila_biometrics_pipeline.write_scores
vanila_biometrics_pipeline.write_scores = _write_scores
return vanila_biometrics_pipeline
......@@ -157,7 +157,7 @@ def test_on_memory():
biometric_algorithm = Distance()
vanilla_biometrics_pipeline = VanillaBiometricsPipeline(
transformer, biometric_algorithm
transformer, biometric_algorithm, None,
)
if with_dask:
......@@ -193,17 +193,20 @@ def test_checkpoint_bioalg_as_transformer():
with tempfile.TemporaryDirectory() as dir_name:
def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()):
def run_pipeline(
with_dask,
score_writer=FourColumnsScoreWriter(os.path.join(dir_name, "final_scores")),
):
database = DummyDatabase()
transformer = _make_transformer(dir_name)
biometric_algorithm = BioAlgorithmCheckpointWrapper(
Distance(), base_dir=dir_name, score_writer=score_writer
Distance(), base_dir=dir_name
)
vanilla_biometrics_pipeline = VanillaBiometricsPipeline(
transformer, biometric_algorithm
transformer, biometric_algorithm, score_writer=score_writer
)
if with_dask:
......@@ -220,17 +223,17 @@ def test_checkpoint_bioalg_as_transformer():
if with_dask:
scores = scores.compute(scheduler="single-threaded")
total_scores = np.sum([len(open(f).readlines()) for f in scores])
else:
total_scores = len(open(scores[0]).readlines())
if isinstance(score_writer, FourColumnsScoreWriter):
assert total_scores == 100 # counting lines
elif isinstance(score_writer, CSVScoreWriter):
if isinstance(score_writer, CSVScoreWriter):
base_path = os.path.join(dir_name, "concatenated_scores")
score_writer.concatenate_write_scores(scores, base_path)
assert (
len(open(os.path.join(base_path, "chunk_0.csv")).readlines()) == 101
)
else:
filename = os.path.join(dir_name, "concatenated_scores.txt")
score_writer.concatenate_write_scores(scores, filename)
assert len(open(filename).readlines()) == 100
total_scores == 100 + 2 if with_dask else 100 + 1
) # 100 plus 2 headers
run_pipeline(False)
run_pipeline(False) # Checking if the checkpointng works
......@@ -244,21 +247,29 @@ def test_checkpoint_bioalg_as_transformer():
os.makedirs(dir_name, exist_ok=True)
# CSVWriter
run_pipeline(False, CSVScoreWriter())
run_pipeline(False, CSVScoreWriter()) # Checking if the checkpointng works
run_pipeline(
False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores"))
)
run_pipeline(
False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores"))
) # Checking if the checkpointng works
shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch
os.makedirs(dir_name, exist_ok=True)
# CSVWriter + Dask
run_pipeline(True, CSVScoreWriter())
run_pipeline(True, CSVScoreWriter()) # Checking if the checkpointng works
run_pipeline(
True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores"))
)
run_pipeline(
True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores"))
) # Checking if the checkpointng works
def test_checkpoint_bioalg_as_bioalg():
with tempfile.TemporaryDirectory() as dir_name:
def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter()):
def run_pipeline(with_dask, score_writer=FourColumnsScoreWriter(dir_name)):
database = DummyDatabase()
transformer = _make_transformer_with_algorithm(dir_name)
......@@ -287,13 +298,13 @@ def test_checkpoint_bioalg_as_bioalg():
allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references,
)
filename = os.path.join(dir_name, "concatenated_scores.txt")
score_writer.concatenate_write_scores(scores, filename)
if isinstance(score_writer, CSVScoreWriter):
assert len(open(filename).readlines()) == 101
if with_dask:
scores = scores.compute(scheduler="single-threaded")
total_scores = np.sum([len(open(f).readlines()) for f in scores])
else:
assert len(open(filename).readlines()) == 100
total_scores = len(open(scores[0]).readlines())
assert total_scores == 100 # counting lines
run_pipeline(False)
run_pipeline(False) # Checking if the checkpointng works
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment