wrappers.py 8.49 KB
Newer Older
1
from bob.pipelines import DelayedSample, SampleSet, Sample
2
3
4
5
6
import bob.io.base
import os
import dask
import functools
from .score_writers import FourColumnsScoreWriter
7
8
from .abstract_classes import BioAlgorithm
import bob.pipelines as mario
9
import numpy as np
10
11
import h5py
import cloudpickle
12
from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40


class BioAlgorithmCheckpointWrapper(BioAlgorithm):
    """Wrapper used to checkpoint enrolled and Scoring samples.

    Parameters
    ----------
        biometric_algorithm: :any:`BioAlgorithm`
           An implemented :any:`BioAlgorithm`
    
        base_dir: str
           Path to store biometric references and scores
        
        extension: str
            File extension

        force: bool
          If True, will recompute scores and biometric references no matter if a file exists

    Examples
    --------

    >>> from bob.bio.base.pipelines.vanilla_biometrics.biometric_algorithm import BioAlgCheckpointWrapper, Distance    
    >>> biometric_algorithm = BioAlgCheckpointWrapper(Distance(), base_dir="./")
    >>> biometric_algorithm.enroll(sample)

    """

41
    def __init__(self, biometric_algorithm, base_dir, group=None, force=False, **kwargs):
42
43
        super().__init__(**kwargs)

44
45
46
        self.base_dir = base_dir
        self.set_score_references_path(group)

47
48
        self.biometric_algorithm = biometric_algorithm
        self.force = force
49
        self._biometric_reference_extension = ".hdf5"
50
51
52
53
54
55
56
57
58
        self._score_extension = ".pkl"        

    def set_score_references_path(self, group):
        if group is None:
            self.biometric_reference_dir = os.path.join(self.base_dir, "biometric_references")
            self.score_dir = os.path.join(self.base_dir, "scores")
        else:
            self.biometric_reference_dir = os.path.join(self.base_dir, group, "biometric_references")
            self.score_dir = os.path.join(self.base_dir, group, "scores")
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

    def enroll(self, enroll_features):
        return self.biometric_algorithm.enroll(enroll_features)

    def score(self, biometric_reference, data):
        return self.biometric_algorithm.score(biometric_reference, data)

    def score_multiple_biometric_references(self, biometric_references, data):
        return self.biometric_algorithm.score_multiple_biometric_references(
            biometric_references, data
        )

    def write_biometric_reference(self, sample, path):
        return bob.io.base.save(sample.data, path, create_directories=True)

74
75
    def write_scores(self, samples, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
76
77
78
        # cleaning parent
        with open(path, "wb") as f:
            f.write(cloudpickle.dumps(samples))
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    def _enroll_sample_set(self, sampleset):
        """
        Enroll a sample set with checkpointing
        """

        # Amending `models` directory
        path = os.path.join(
            self.biometric_reference_dir,
            str(sampleset.key) + self._biometric_reference_extension,
        )
        if self.force or not os.path.exists(path):

            enrolled_sample = self.biometric_algorithm._enroll_sample_set(sampleset)

            # saving the new sample
            self.write_biometric_reference(enrolled_sample, path)

        # This seems inefficient, but it's crucial for large datasets
        delayed_enrolled_sample = DelayedSample(
            functools.partial(bob.io.base.load, path), parent=sampleset
        )

        return delayed_enrolled_sample

    def _score_sample_set(
        self,
        sampleset,
        biometric_references,
        allow_scoring_with_all_biometric_references=False,
    ):
110
        """Given a sampleset for probing, compute the scores and returns a sample set with the scores
111
112
        """

113
        def _load(path):
114
115
116
117
            return cloudpickle.loads(open(path, "rb").read())

            #with h5py.File(path) as hdf5:
            #    return hdf5_to_sample(hdf5)
118

119
120
121
        def _make_name(sampleset, biometric_references):
            # The score file name is composed by sampleset key and the
            # first 3 biometric_references
122
            subject = str(sampleset.subject)
123
            name = str(sampleset.key)
124
            suffix = "_".join([str(s.key) for s in biometric_references[0:3]])
125
            return os.path.join(subject, name + suffix)
126
127

        path = os.path.join(
128
            self.score_dir, _make_name(sampleset, biometric_references) + self._score_extension
129
        )
130

131
        if self.force or not os.path.exists(path):
132

133
134
135
136
137
138
139
            # Computing score
            scored_sample_set = self.biometric_algorithm._score_sample_set(
                sampleset,
                biometric_references,
                allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
            )
            self.write_scores(scored_sample_set.samples, path)
140

141
            scored_sample_set = SampleSet(
142
                DelayedSample(functools.partial(_load, path), parent=sampleset),
143
144
145
                parent=sampleset,
            )
        else:
146
147
            samples = _load(path)
            scored_sample_set = SampleSet(samples, parent=sampleset)
148
149
150
151

        return scored_sample_set


152
class BioAlgorithmDaskWrapper(BioAlgorithm):
153
154
155
156
    """
    Wrap :any:`BioAlgorithm` to work with DASK
    """

157
158
159
    def __init__(self, biometric_algorithm, **kwargs):
        self.biometric_algorithm = biometric_algorithm

160
    def enroll_samples(self, biometric_reference_features):
161

162
        biometric_references = biometric_reference_features.map_partitions(
163
            self.biometric_algorithm.enroll_samples
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
        )
        return biometric_references

    def score_samples(
        self,
        probe_features,
        biometric_references,
        allow_scoring_with_all_biometric_references=False,
    ):

        # TODO: Here, we are sending all computed biometric references to all
        # probes.  It would be more efficient if only the models related to each
        # probe are sent to the probing split.  An option would be to use caching
        # and allow the ``score`` function above to load the required data from
        # the disk, directly.  A second option would be to generate named delays
        # for each model and then associate them here.

        all_references = dask.delayed(list)(biometric_references)

        scores = probe_features.map_partitions(
184
            self.biometric_algorithm.score_samples,
185
186
187
188
            all_references,
            allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
        )
        return scores
189
190
191
192
193
194
195
196
197
198
199
200

    def enroll(self, data):
        return self.biometric_algorithm.enroll(data)

    def score(self, biometric_reference, data):
        return self.biometric_algorithm.score(biometric_reference, data)

    def score_multiple_biometric_references(self, biometric_references, data):
        return self.biometric_algorithm.score_multiple_biometric_references(
            biometric_references, data
        )

201
202
203
    def set_score_references_path(self, group):
        self.biometric_algorithm.set_score_references_path(group)

204

205
def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
206
207
208
209
210
211
212
    """
    Given a :any:`VanillaBiometrics`, wraps :any:`VanillaBiometrics.transformer` and
    :any:`VanillaBiometrics.biometric_algorithm` to be executed with dask

    Parameters
    ----------

213
    pipeline: :any:`VanillaBiometrics`
214
215
216
217
       Vanilla Biometrics based pipeline to be dasked

    npartitions: int
       Number of partitions for the initial :any:`dask.bag`
218
219
220

    partition_size: int
       Size of the partition for the initial :any:`dask.bag`
221
222
    """

223
    if isinstance(pipeline, ZTNormPipeline):
224
225
226
        # Dasking the first part of the pipelines        
        pipeline.vanilla_biometrics_pipeline = dask_vanilla_biometrics(
            pipeline.vanilla_biometrics_pipeline, npartitions
227
        )
228

229
        pipeline.ztnorm_solver = ZTNormDaskWrapper(pipeline.ztnorm_solver)
230

231
    else:
232

233
234
235
236
237
238
239
240
        if partition_size is None:
            pipeline.transformer = mario.wrap(
                ["dask"], pipeline.transformer, npartitions=npartitions
            )
        else:
            pipeline.transformer = mario.wrap(
                ["dask"], pipeline.transformer, partition_size=partition_size
            )
241
242
        pipeline.biometric_algorithm = BioAlgorithmDaskWrapper(
            pipeline.biometric_algorithm
243
244
        )

245
246
        def _write_scores(scores):
            return scores.map_partitions(pipeline.write_scores_on_dask)
247

248
        pipeline.write_scores_on_dask = pipeline.write_scores
249
        pipeline.write_scores = _write_scores
250

251
    return pipeline