vanilla_pad.py 7.14 KB
Newer Older
1
2
3
"""Executes PAD pipeline"""


4
from bob.pipelines.distributed import VALID_DASK_CLIENT_STRINGS
5
import click
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
6
7
8
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
9
from bob.pipelines.distributed import dask_get_partition_size
10
11


Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
12
13
14
15
@click.command(
    entry_point_group="bob.pad.config",
    cls=ConfigCommand,
    epilog="""\b
16
17
18
19
20
 Command line examples\n
 -----------------------


 $ bob pad vanilla-pad my_experiment.py -vv
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
21
""",
22
23
24
25
26
27
28
29
30
)
@click.option(
    "--pipeline",
    "-p",
    required=True,
    entry_point_group="sklearn.pipeline",
    help="Feature extraction algorithm",
    cls=ResourceOption,
)
31
32
33
34
35
@click.option(
    "--decision_function",
    "-f",
    show_default=True,
    default="decision_function",
36
37
    help="Name of the Pipeline step to call for results, eg 'score' or 'predict'",
    cls=ResourceOption,
38
)
39
40
41
42
43
44
@click.option(
    "--database",
    "-d",
    required=True,
    entry_point_group="bob.pad.database",
    help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)",
45
    cls=ResourceOption,
46
47
48
49
)
@click.option(
    "--dask-client",
    "-l",
50
51
52
    entry_point_group="dask.client",
    string_exceptions=VALID_DASK_CLIENT_STRINGS,
    default="single-threaded",
53
    help="Dask client for the execution of the pipeline.",
54
    cls=ResourceOption,
55
56
57
58
59
)
@click.option(
    "--group",
    "-g",
    "groups",
60
    type=click.Choice(["train", "dev", "eval"]),
61
    multiple=True,
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
62
    default=("dev", "eval"),
63
    help="If given, this value will limit the experiments belonging to a particular group",
64
    cls=ResourceOption,
65
66
67
68
69
70
)
@click.option(
    "-o",
    "--output",
    show_default=True,
    default="results",
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
71
    help="Saves scores (and checkpoints) in this folder.",
72
    cls=ResourceOption,
73
74
75
76
77
78
79
80
)
@click.option(
    "--checkpoint",
    "-c",
    is_flag=True,
    help="If set, it will checkpoint all steps of the pipeline. Checkpoints will be saved in `--output`.",
    cls=ResourceOption,
)
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@click.option(
    "--dask-partition-size",
    "-s",
    help="If using Dask, this option defines the size of each dask.bag.partition."
    "Use this option if the current heuristic that sets this value doesn't suit your experiment."
    "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
@click.option(
    "--dask-n-workers",
    "-n",
    help="If using Dask, this option defines the number of workers to start your experiment."
    "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
    "Use this option if the current amount of workers set to start an experiment doesn't suit you.",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
101
@verbosity_option(cls=ResourceOption)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
102
@click.pass_context
103
def vanilla_pad(
104
105
    ctx,
    pipeline,
106
    decision_function,
107
108
109
110
111
112
113
114
    database,
    dask_client,
    groups,
    output,
    checkpoint,
    dask_partition_size,
    dask_n_workers,
    **kwargs,
115
):
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
116
    """Runs the simplest PAD pipeline."""
117
118

    import gzip
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
119
    import logging
120
    import os
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
121
    import sys
122
123
    from glob import glob

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
124
    import bob.pipelines as mario
125
    import dask.bag
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
126
    from bob.extension.scripts.click_helper import log_parameters
127
    from bob.pipelines.distributed.sge import get_resource_requirements
128
129
    from bob.pipelines.utils import isinstance_nested
    from bob.pipelines.wrappers import DaskWrapper
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
130
131
132

    logger = logging.getLogger(__name__)
    log_parameters(logger)
133
134
135
136
137
138
139
140

    os.makedirs(output, exist_ok=True)

    if checkpoint:
        pipeline = mario.wrap(
            ["checkpoint"], pipeline, features_dir=output, model_path=output
        )

141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
    # Fetching samples
    fit_samples = database.fit_samples()
    total_samples = len(fit_samples)
    predict_samples = dict()
    for group in groups:
        predict_samples[group] = database.predict_samples(group=group)
        total_samples += len(predict_samples[group])

    # Checking if the pipieline is dask-wrapped
    first_step = pipeline[0]
    if not isinstance_nested(first_step, "estimator", DaskWrapper):

        # Scaling up if necessary
        if dask_n_workers is not None and not isinstance(dask_client, str):
            dask_client.cluster.scale(dask_n_workers)

        # Defining the partition size
        partition_size = None
        if not isinstance(dask_client, str):
            lower_bound = 25  # lower bound of 25 videos per chunk
            partition_size = dask_get_partition_size(
                dask_client.cluster, total_samples, lower_bound=lower_bound
            )
        if dask_partition_size is not None:
            partition_size = dask_partition_size

        pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
168
169
170
171
172
173
174
175

    # create an experiment info file
    with open(os.path.join(output, "Experiment_info.txt"), "wt") as f:
        f.write(f"{sys.argv!r}\n")
        f.write(f"database={database!r}\n")
        f.write("Pipeline steps:\n")
        for i, name, estimator in pipeline._iter():
            f.write(f"Step {i}: {name}\n{estimator!r}\n")
176
177

    # train the pipeline
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
178
    pipeline.fit(fit_samples)
179
180
181
182

    for group in groups:

        logger.info(f"Running vanilla biometrics for group {group}")
183
        result = getattr(pipeline, decision_function)(predict_samples[group])
184

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
185
        scores_path = os.path.join(output, f"scores-{group}")
186

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
187
        if isinstance(result, dask.bag.core.Bag):
188

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
189
190
191
192
193
194
            # write each partition into a zipped txt file
            result = result.map(pad_predicted_sample_to_score_line)
            prefix, postfix = f"{output}/scores/scores-{group}-", ".txt.gz"
            pattern = f"{prefix}*{postfix}"
            os.makedirs(os.path.dirname(prefix), exist_ok=True)
            logger.info("Writing bag results into files ...")
195
            resources = get_resource_requirements(pipeline)
196
197
198
            result.to_textfiles(
                pattern, last_endline=True, scheduler=dask_client, resources=resources
            )
199

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
200
            with open(scores_path, "w") as f:
201
202
203
204
205
206
207
                # concatenate scores into one score file
                for path in sorted(
                    glob(pattern),
                    key=lambda l: int(l.replace(prefix, "").replace(postfix, "")),
                ):
                    with gzip.open(path, "rt") as f2:
                        f.write(f2.read())
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
208
209
                    # delete intermediate score files
                    os.remove(path)
210

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
211
212
        else:
            with open(scores_path, "w") as f:
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
                for sample in result:
                    f.write(pad_predicted_sample_to_score_line(sample, endl="\n"))


def pad_predicted_sample_to_score_line(sample, endl=""):
    claimed_id, test_label, score = sample.subject, sample.key, sample.data

    # # use the model_label field to indicate frame number
    # model_label = None
    # if hasattr(sample, "frame_id"):
    #     model_label = sample.frame_id

    real_id = claimed_id if sample.is_bonafide else sample.attack_type

    return f"{claimed_id} {real_id} {test_label} {score}{endl}"
    # return f"{claimed_id} {model_label} {real_id} {test_label} {score}{endl}"