From 39621dcf6a849768190280652eb2ce3091f82626 Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira <tiagofrepereira@gmail.com> Date: Mon, 11 May 2020 19:05:32 +0200 Subject: [PATCH] Finished ZTNormalization --- .../vanilla_biometrics/abstract_classes.py | 16 ++ .../pipelines/vanilla_biometrics/legacy.py | 11 +- .../pipelines/vanilla_biometrics/pipelines.py | 11 +- .../vanilla_biometrics/score_writers.py | 75 ++++++--- .../pipelines/vanilla_biometrics/wrappers.py | 9 +- .../pipelines/vanilla_biometrics/zt_norm.py | 13 +- bob/bio/base/script/vanilla_biometrics.py | 157 +++++++++++++++--- bob/bio/base/test/test_vanilla_biometrics.py | 95 ++++++++--- .../test_vanilla_biometrics_score_norm.py | 104 ++++++++---- 9 files changed, 382 insertions(+), 109 deletions(-) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py b/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py index 56c64bca..c62fa72a 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/abstract_classes.py @@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod from bob.pipelines.sample import Sample, SampleSet, DelayedSample import functools import numpy as np +import os def average_scores(scores): @@ -298,3 +299,18 @@ class ScoreWriter(metaclass=ABCMeta): @abstractmethod def write(self, sampleset, path): pass + + def post_process(self, score_paths, filename): + def _post_process(score_paths, filename): + os.makedirs(os.path.dirname(filename), exist_ok=True) + f = open(filename, "w") + for path in score_paths: + f.writelines(open(path).readlines()) + return filename + + import dask.bag + import dask + if isinstance(score_paths, dask.bag.Bag): + all_paths = dask.delayed(list)(score_paths) + return dask.delayed(_post_process)(all_paths, filename) + return _post_process(score_paths, filename) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py index 9248845b..2a218b7a 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/legacy.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/legacy.py @@ -263,7 +263,16 @@ class BioAlgorithmLegacy(BioAlgorithm): def _load(path): return pickle.loads(open(path, "rb").read()) - path = os.path.join(self.score_dir, str(sampleset.key) + ".pkl") + def _make_name(sampleset, biometric_references): + # The score file name is composed by sampleset key and the + # first 3 biometric_references + name = str(sampleset.key) + suffix = "_".join([str(s.key) for s in biometric_references[0:3]]) + return name + suffix + + path = os.path.join( + self.score_dir, _make_name(sampleset, biometric_references) + ".pkl" + ) if self.force or not os.path.exists(path): diff --git a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py index 0a139fe2..5bf01e09 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/pipelines.py @@ -112,9 +112,6 @@ class VanillaBiometricsPipeline(object): allow_scoring_with_all_biometric_references, ) - if self.score_writer is not None: - return self.write_scores(scores) - return scores def train_background_model(self, background_model_samples): @@ -162,4 +159,12 @@ class VanillaBiometricsPipeline(object): return scores, probe_features def write_scores(self, scores): + if self.score_writer is None: + raise ValueError("No score writer defined in the pipeline") return self.score_writer.write(scores) + + def post_process(self, score_paths, filename): + if self.score_writer is None: + raise ValueError("No score writer defined in the pipeline") + + return self.score_writer.post_process(score_paths, filename) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py index fc927d9d..e70de423 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/score_writers.py @@ -8,6 +8,7 @@ from .abstract_classes import ScoreWriter import functools import csv import uuid +import shutil class FourColumnsScoreWriter(ScoreWriter): """ @@ -20,29 +21,35 @@ class FourColumnsScoreWriter(ScoreWriter): Write scores and returns a :any:`bob.pipelines.DelayedSample` containing the instruction to open the score file """ - - os.makedirs(self.path, exist_ok=True) - n_lines = 0 - filename = os.path.join(self.path, str(uuid.uuid4()) + ".txt") - with open(filename, "w") as f: - for probe in probe_sampleset: - - # If it's delayed, load it - if isinstance(probe[0], DelayedSample): - probe.samples = probe.samples[0].data - - lines = [ - "{0} {1} {2} {3}\n".format( - biometric_reference.subject, - probe.subject, - probe.key, - biometric_reference.data, - ) - for biometric_reference in probe - ] - n_lines += len(probe) - f.writelines(lines) - return [filename] + def _write(probe_sampleset): + os.makedirs(self.path, exist_ok=True) + n_lines = 0 + filename = os.path.join(self.path, str(uuid.uuid4()) + ".txt") + with open(filename, "w") as f: + for probe in probe_sampleset: + + # If it's delayed, load it + if isinstance(probe[0], DelayedSample): + probe.samples = probe.samples[0].data + + lines = [ + "{0} {1} {2} {3}\n".format( + biometric_reference.subject, + probe.subject, + probe.key, + biometric_reference.data, + ) + for biometric_reference in probe + ] + n_lines += len(probe) + f.writelines(lines) + return [filename] + + import dask.bag + import dask + if isinstance(probe_sampleset, dask.bag.Bag): + return probe_sampleset.map_partitions(_write) + return _write(probe_sampleset) class CSVScoreWriter(ScoreWriter): @@ -141,3 +148,25 @@ class CSVScoreWriter(ScoreWriter): csv_writer.writerows(rows) f.close() return filenames + + def post_process(self, score_paths, path): + def _post_process(score_paths, path): + post_process_scores = [] + os.makedirs(path, exist_ok=True) + for i, score in enumerate(score_paths): + fname = os.path.join(path, os.path.basename(score)+"_post_process.csv") + post_process_scores.append(fname) + if i==0: + shutil.move(score, fname) + continue + open(fname, "w").writelines(open(score, "r").readlines()[1:]) + os.remove(score) + return post_process_scores + + + import dask.bag + import dask + if isinstance(score_paths, dask.bag.Bag): + all_paths = dask.delayed(list)(score_paths) + return dask.delayed(_post_process)(all_paths, path) + return _post_process(score_paths, path) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py index a98ac02f..115c5ac9 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/wrappers.py @@ -45,6 +45,7 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): self.biometric_algorithm = biometric_algorithm self.force = force self._biometric_reference_extension = ".hdf5" + self.base_dir = base_dir def enroll(self, enroll_features): return self.biometric_algorithm.enroll(enroll_features) @@ -104,7 +105,7 @@ class BioAlgorithmCheckpointWrapper(BioAlgorithm): # The score file name is composed by sampleset key and the # first 3 biometric_references name = str(sampleset.key) - suffix = "_".join([s.key for s in biometric_references[0:3]]) + suffix = "_".join([str(s.key) for s in biometric_references[0:3]]) return name + suffix path = os.path.join( @@ -197,9 +198,9 @@ def dask_vanilla_biometrics(pipeline, npartitions=None): """ if isinstance(pipeline, ZTNormPipeline): - # Dasking the first part of the pipelines - pipeline = dask_vanilla_biometrics( - pipeline.vanila_biometrics_pipeline, npartitions + # Dasking the first part of the pipelines + pipeline.vanilla_biometrics_pipeline = dask_vanilla_biometrics( + pipeline.vanilla_biometrics_pipeline, npartitions ) pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver) diff --git a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py index b39b8591..b8670ff2 100644 --- a/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py +++ b/bob/bio/base/pipelines/vanilla_biometrics/zt_norm.py @@ -57,7 +57,6 @@ class ZTNormPipeline(object): score_writer=FourColumnsScoreWriter("./scores.txt"), ): self.vanilla_biometrics_pipeline = vanilla_biometrics_pipeline - self.ztnorm_solver = ZTNorm() self.z_norm = z_norm @@ -128,6 +127,11 @@ class ZTNormPipeline(object): t_scores, allow_scoring_with_all_biometric_references, ) + + + # TODO: Do the score write + #if self.vanilla_biometrics_pipeline.score_writer is not None: + # return self.write_scores(scores) return raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores @@ -233,6 +237,11 @@ class ZTNormPipeline(object): return zt_normed_scores + def write_scores(self, scores): + return self.vanilla_biometrics_pipeline.write_scores(scores) + + def post_process(self, score_paths, filename): + return self.vanilla_biometrics_pipeline.post_process(score_paths, filename) class ZTNorm(object): """ @@ -428,6 +437,8 @@ class ZTNormCheckpointWrapper(object): self.znorm_score_path = os.path.join(base_dir, "znorm_scores") self.tnorm_score_path = os.path.join(base_dir, "tnorm_scores") self.force = force + self.base_dir = base_dir + def _write_scores(self, samples, path): os.makedirs(os.path.dirname(path), exist_ok=True) diff --git a/bob/bio/base/script/vanilla_biometrics.py b/bob/bio/base/script/vanilla_biometrics.py index f2f4e713..89516f79 100644 --- a/bob/bio/base/script/vanilla_biometrics.py +++ b/bob/bio/base/script/vanilla_biometrics.py @@ -20,8 +20,12 @@ import dask.bag from bob.bio.base.pipelines.vanilla_biometrics import ( VanillaBiometricsPipeline, BioAlgorithmCheckpointWrapper, + BioAlgorithmDaskWrapper, + ZTNormPipeline, + ZTNormDaskWrapper, + ZTNormCheckpointWrapper, ) - +from dask.delayed import Delayed logger = logging.getLogger(__name__) @@ -103,8 +107,11 @@ TODO: Work out this help default="results", help="Name of output directory", ) +@click.option("--ztnorm", is_flag=True, help="If set, run an experiment with ZTNorm") @verbosity_option(cls=ResourceOption) -def vanilla_biometrics(pipeline, database, dask_client, groups, output, **kwargs): +def vanilla_biometrics( + pipeline, database, dask_client, groups, output, ztnorm, **kwargs +): """Runs the simplest biometrics pipeline. Such pipeline consists into three sub-pipelines. @@ -149,12 +156,77 @@ def vanilla_biometrics(pipeline, database, dask_client, groups, output, **kwargs """ + def _compute_scores(result, dask_client): + if isinstance(result, Delayed): + if dask_client is not None: + result = result.compute(scheduler=dask_client) + else: + logger.warning("`dask_client` not set. Your pipeline will run locally") + result = result.compute(scheduler="single-threaded") + return result + + def _post_process_scores(pipeline, scores, path): + writed_scores = pipeline.write_scores(scores) + return pipeline.post_process(writed_scores, path) + + def _merge_references_ztnorm(biometric_references,probes,zprobes,treferences): + treferences_sub = [t.subject for t in treferences] + biometric_references_sub = [t.subject for t in biometric_references] + + for i in range(len(zprobes)): + probes[i].references += treferences_sub + + for i in range(len(zprobes)): + zprobes[i].references = biometric_references_sub + treferences_sub + + return probes, zprobes + + def _is_dask_checkpoint(pipeline): + """ + Check if a VanillaPipeline has daskable and checkpointable algorithms + """ + is_dask = False + is_checkpoint = False + algorithm = pipeline.biometric_algorithm + base_dir = "" + + while True: + if isinstance(algorithm, BioAlgorithmDaskWrapper): + is_dask = True + + if isinstance(algorithm, BioAlgorithmCheckpointWrapper): + is_checkpoint = True + base_dir = algorithm.base_dir + + if hasattr(algorithm, "biometric_algorithm"): + algorithm = algorithm.biometric_algorithm + else: + break + return is_dask, is_checkpoint, base_dir + if not os.path.exists(output): os.makedirs(output, exist_ok=True) + + # Patching the pipeline in case of ZNorm + if ztnorm: + pipeline = ZTNormPipeline(pipeline) + is_dask, is_checkpoint, base_dir = _is_dask_checkpoint( + pipeline.vanilla_biometrics_pipeline + ) + + if is_checkpoint: + pipeline.ztnorm_solver = ZTNormCheckpointWrapper( + pipeline.ztnorm_solver, os.path.join(base_dir, "normed-scores") + ) + + if is_dask: + pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver) + + for group in groups: - score_file_name = os.path.join(output, f"scores-{group}.txt") + score_file_name = os.path.join(output, f"scores-{group}") biometric_references = database.references(group=group) logger.info(f"Running vanilla biometrics for group {group}") @@ -164,32 +236,67 @@ def vanilla_biometrics(pipeline, database, dask_client, groups, output, **kwargs else False ) - result = pipeline( - database.background_model_samples(), - biometric_references, - database.probes(group=group), - allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, - ) + if ztnorm: + zprobes = database.zprobes() + probes = database.probes(group=group) + treferences = database.treferences() + + probes, zprobes = _merge_references_ztnorm(biometric_references,probes,zprobes,treferences) + raw_scores, z_normed_scores, t_normed_scores, zt_normed_scores = pipeline( + database.background_model_samples(), + biometric_references, + probes, + zprobes, + treferences, + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) - if isinstance(result, dask.bag.core.Bag): - if dask_client is not None: - result = result.compute(scheduler=dask_client) - else: - logger.warning("`dask_client` not set. Your pipeline will run locally") - result = result.compute(scheduler="single-threaded") + def _build_filename(score_file_name, suffix): + return os.path.join(score_file_name, suffix) - # Check if there's a score writer hooked in - if hasattr(pipeline.biometric_algorithm, "score_writer"): - pipeline.biometric_algorithm.score_writer.concatenate_write_scores( - result, score_file_name + # Running RAW_SCORES + raw_scores = _post_process_scores( + pipeline, raw_scores, _build_filename(score_file_name, "raw_scores") ) + _compute_scores(raw_scores, dask_client) + + # Z-SCORES + z_normed_scores = _post_process_scores( + pipeline, + z_normed_scores, + _build_filename(score_file_name, "z_normed_scores"), + ) + _compute_scores(z_normed_scores, dask_client) + + # T-SCORES + t_normed_scores = _post_process_scores( + pipeline, + t_normed_scores, + _build_filename(score_file_name, "t_normed_scores"), + ) + _compute_scores(t_normed_scores, dask_client) + + # ZT-SCORES + zt_normed_scores = _post_process_scores( + pipeline, + zt_normed_scores, + _build_filename(score_file_name, "zt_normed_scores"), + ) + _compute_scores(zt_normed_scores, dask_client) + else: - with open(score_file_name, "w") as f: - # Flatting out the list - result = itertools.chain(*result) - for probe in result: - for sample in probe.samples: - f.writelines(sample.data) + + result = pipeline( + database.background_model_samples(), + biometric_references, + database.probes(group=group), + allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references, + ) + + post_processed_scores = _post_process_scores( + pipeline, result, score_file_name + ) + _compute_scores(post_processed_scores, dask_client) if dask_client is not None: dask_client.shutdown() diff --git a/bob/bio/base/test/test_vanilla_biometrics.py b/bob/bio/base/test/test_vanilla_biometrics.py index fbf329a0..16a8fb37 100644 --- a/bob/bio/base/test/test_vanilla_biometrics.py +++ b/bob/bio/base/test/test_vanilla_biometrics.py @@ -114,7 +114,6 @@ class DummyDatabase: return zprobes - def treferences(self): t_sset = self._create_random_sample_set(self.n_references, self.dim, seed=15) for t in t_sset: @@ -202,8 +201,7 @@ def test_checkpoint_bioalg_as_transformer(): with tempfile.TemporaryDirectory() as dir_name: def run_pipeline( - with_dask, - score_writer=FourColumnsScoreWriter(os.path.join(dir_name, "final_scores")), + with_dask, score_writer=None, ): database = DummyDatabase() @@ -214,7 +212,7 @@ def test_checkpoint_bioalg_as_transformer(): ) vanilla_biometrics_pipeline = VanillaBiometricsPipeline( - transformer, biometric_algorithm, score_writer=score_writer + transformer, biometric_algorithm, score_writer ) if with_dask: @@ -227,36 +225,70 @@ def test_checkpoint_bioalg_as_transformer(): database.references(), database.probes(), allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, - ) + ) - if with_dask: - scores = scores.compute(scheduler="single-threaded") - total_scores = np.sum([len(open(f).readlines()) for f in scores]) + if vanilla_biometrics_pipeline.score_writer is None: + if with_dask: + scores = scores.compute(scheduler="single-threaded") + + assert len(scores) == 10 + for sset in scores: + if isinstance(sset[0], DelayedSample): + for s in sset: + assert len(s.data) == 10 + else: + assert len(sset) == 10 else: - total_scores = len(open(scores[0]).readlines()) + writed_scores = vanilla_biometrics_pipeline.write_scores(scores) + concatenated_scores = vanilla_biometrics_pipeline.post_process( + writed_scores, os.path.join(dir_name, "scores-dev") + ) - if isinstance(score_writer, FourColumnsScoreWriter): - assert total_scores == 100 # counting lines - elif isinstance(score_writer, CSVScoreWriter): + if with_dask: + concatenated_scores = concatenated_scores.compute( + scheduler="single-threaded" + ) - assert ( - total_scores == 100 + 2 if with_dask else 100 + 1 - ) # 100 plus 2 headers + if isinstance(vanilla_biometrics_pipeline.score_writer, FourColumnsScoreWriter): + assert ( + len(open(concatenated_scores).readlines()) == 100 + ) + else: + n_lines = 0 + for s in concatenated_scores: + n_lines+= len(open(s).readlines()) + + assert n_lines == 101 + run_pipeline(False) run_pipeline(False) # Checking if the checkpointng works shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch os.makedirs(dir_name, exist_ok=True) + # Writing scores + run_pipeline( + False, FourColumnsScoreWriter(os.path.join(dir_name, "final_scores")) + ) + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + # Dask run_pipeline(True) run_pipeline(True) # Checking if the checkpointng works shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch os.makedirs(dir_name, exist_ok=True) + + # Writing scores + run_pipeline( + True, FourColumnsScoreWriter(os.path.join(dir_name, "final_scores")) + ) + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) # CSVWriter run_pipeline( - False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) ) run_pipeline( False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) @@ -269,7 +301,7 @@ def test_checkpoint_bioalg_as_transformer(): True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) ) run_pipeline( - True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) ) # Checking if the checkpointng works @@ -306,13 +338,32 @@ def test_checkpoint_bioalg_as_bioalg(): allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, ) - if with_dask: - scores = scores.compute(scheduler="single-threaded") - total_scores = np.sum([len(open(f).readlines()) for f in scores]) + if vanilla_biometrics_pipeline.score_writer is None: + if with_dask: + scores = scores.compute(scheduler="single-threaded") + + assert len(scores) == 10 + for sset in scores: + if isinstance(sset[0], DelayedSample): + for s in sset: + assert len(s.data) == 10 + else: + assert len(sset) == 10 else: - total_scores = len(open(scores[0]).readlines()) + writed_scores = vanilla_biometrics_pipeline.write_scores(scores) + concatenated_scores = vanilla_biometrics_pipeline.post_process( + writed_scores, os.path.join(dir_name, "scores-dev") + ) + + if with_dask: + concatenated_scores = concatenated_scores.compute( + scheduler="single-threaded" + ) + + assert ( + len(open(concatenated_scores).readlines()) == 100 + ) - assert total_scores == 100 # counting lines run_pipeline(False) run_pipeline(False) # Checking if the checkpointng works diff --git a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py index 92834215..b8798d2a 100644 --- a/bob/bio/base/test/test_vanilla_biometrics_score_norm.py +++ b/bob/bio/base/test/test_vanilla_biometrics_score_norm.py @@ -26,7 +26,7 @@ from bob.bio.base.pipelines.vanilla_biometrics import ( BioAlgorithmCheckpointWrapper, dask_vanilla_biometrics, BioAlgorithmLegacy, - FourColumnsScoreWriter, + CSVScoreWriter, ) import bob.pipelines as mario @@ -111,21 +111,24 @@ def test_norm_mechanics(): # We have to transpose because the tests are BIOMETRIC_REFERENCES vs PROBES # and bob.bio.base is PROBES vs BIOMETRIC_REFERENCES if isinstance(scores[0][0], DelayedSample): - return np.array([f.data for sset in scores for s in sset for f in s.data]).reshape(shape).T + return ( + np.array([f.data for sset in scores for s in sset for f in s.data]) + .reshape(shape) + .T + ) else: return np.array([s.data for sset in scores for s in sset]).reshape(shape).T - with tempfile.TemporaryDirectory() as dir_name: def run(with_dask, with_checkpoint=False): ############ # Prepating stubs ############ - n_references = 20 - n_probes = 30 - n_t_references = 40 - n_z_probes = 50 + n_references = 111 + n_probes = 111 + n_t_references = 80 + n_z_probes = 80 dim = 5 references = np.arange(n_references * dim).reshape( @@ -155,15 +158,17 @@ def test_norm_mechanics(): # Creating enrollment samples biometric_reference_sample_sets = _create_sample_sets(references, offset=0) - t_reference_sample_sets = _create_sample_sets(t_references, offset=100) + t_reference_sample_sets = _create_sample_sets(t_references, offset=300) # Fetching ids reference_ids = [r.subject for r in biometric_reference_sample_sets] t_reference_ids = [r.subject for r in t_reference_sample_sets] ids = reference_ids + t_reference_ids - probe_sample_sets = _create_sample_sets(probes, offset=200, references=ids) - z_probe_sample_sets = _create_sample_sets(z_probes, offset=300, references=ids) + probe_sample_sets = _create_sample_sets(probes, offset=600, references=ids) + z_probe_sample_sets = _create_sample_sets( + z_probes, offset=900, references=ids + ) ############ # TESTING REGULAR SCORING @@ -173,7 +178,9 @@ def test_norm_mechanics(): biometric_algorithm = Distance(factor=1) if with_checkpoint: - biometric_algorithm = BioAlgorithmCheckpointWrapper(Distance(factor=1), dir_name) + biometric_algorithm = BioAlgorithmCheckpointWrapper( + Distance(factor=1), dir_name + ) vanilla_pipeline = VanillaBiometricsPipeline( transformer, biometric_algorithm, score_writer=None @@ -187,7 +194,7 @@ def test_norm_mechanics(): probe_sample_sets, allow_scoring_with_all_biometric_references=True, ) - + if with_dask: score_samples = score_samples.compute(scheduler="single-threaded") @@ -276,7 +283,6 @@ def test_norm_mechanics(): vanilla_pipeline, z_norm=True, t_norm=True, ) - if with_checkpoint: zt_vanilla_pipeline.ztnorm_solver = ZTNormCheckpointWrapper( zt_vanilla_pipeline.ztnorm_solver, dir_name @@ -314,7 +320,6 @@ def test_norm_mechanics(): scheduler="single-threaded" ) - raw_scores = _dump_scores_from_samples( raw_score_samples, shape=(n_probes, n_references) ) @@ -334,19 +339,18 @@ def test_norm_mechanics(): zt_normed_score_samples, shape=(n_probes, n_references) ) assert np.allclose(zt_normed_scores, zt_normed_scores_ref) - + # No dask - run(False) # On memory + run(False) # On memory # With checkpoing run(False, with_checkpoint=True) run(False, with_checkpoint=True) - #shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch - #os.makedirs(dir_name, exist_ok=True) - + # shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + # os.makedirs(dir_name, exist_ok=True) # With dask - run(True) # On memory + run(True) # On memory run(True, with_checkpoint=True) run(True, with_checkpoint=True) @@ -355,7 +359,7 @@ def test_znorm_on_memory(): with tempfile.TemporaryDirectory() as dir_name: - def run_pipeline(with_dask): + def run_pipeline(with_dask, score_writer=None): database = DummyDatabase(one_d=False) @@ -364,7 +368,7 @@ def test_znorm_on_memory(): biometric_algorithm = Distance() vanilla_biometrics_pipeline = ZTNormPipeline( - VanillaBiometricsPipeline(transformer, biometric_algorithm) + VanillaBiometricsPipeline(transformer, biometric_algorithm, score_writer) ) if with_dask: @@ -381,20 +385,60 @@ def test_znorm_on_memory(): allow_scoring_with_all_biometric_references=database.allow_scoring_with_all_biometric_references, ) + # if vanilla_biometrics_pipeline.score_writer is not None: + # concatenated_scores + # pass + + def _concatenate(pipeline, scores, path): + writed_scores = pipeline.write_scores(scores) + concatenated_scores = pipeline.post_process( + writed_scores, os.path.join(dir_name, "scores-dev") + ) + return concatenated_scores + + if isinstance(score_writer, CSVScoreWriter): + raw_scores = _concatenate(vanilla_biometrics_pipeline, raw_scores, "scores-dev") + z_scores = _concatenate(vanilla_biometrics_pipeline, z_scores, "scores-dev_zscores") + t_scores = _concatenate(vanilla_biometrics_pipeline, t_scores, "scores-dev_tscores") + zt_scores = _concatenate(vanilla_biometrics_pipeline, zt_scores, "scores-dev_ztscores") + if with_dask: raw_scores = raw_scores.compute(scheduler="single-threaded") z_scores = z_scores.compute(scheduler="single-threaded") t_scores = t_scores.compute(scheduler="single-threaded") zt_scores = zt_scores.compute(scheduler="single-threaded") - assert len(raw_scores) == 10 - assert len(z_scores) == 10 - assert len(t_scores) == 10 - assert len(zt_scores) == 10 + if isinstance(score_writer, CSVScoreWriter): + n_lines = 51 if with_dask else 101 + + assert len(open(raw_scores[0], "r").readlines()) == n_lines + assert len(open(z_scores[0], "r").readlines()) == n_lines + assert len(open(t_scores[0], "r").readlines()) == n_lines + assert len(open(zt_scores[0], "r").readlines()) == n_lines + + else: + assert len(raw_scores) == 10 + assert len(z_scores) == 10 + assert len(t_scores) == 10 + assert len(zt_scores) == 10 run_pipeline(False) run_pipeline(False) # Testing checkpoint - # shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch - # os.makedirs(dir_name, exist_ok=True) - # run_pipeline(True) - # run_pipeline(True) # Testing checkpoint + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + run_pipeline( + False, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + # With DASK + run_pipeline(True) + run_pipeline(True) # Testing checkpoint + shutil.rmtree(dir_name) # Deleting the cache so it runs again from scratch + os.makedirs(dir_name, exist_ok=True) + + run_pipeline( + True, CSVScoreWriter(os.path.join(dir_name, "concatenated_scores")) + ) -- GitLab