vanilla_pad.py 7.06 KB
Newer Older
1 2 3
"""Executes PAD pipeline"""


4
from bob.pipelines.distributed import VALID_DASK_CLIENT_STRINGS
5
import click
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
6 7 8
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
9
from bob.pipelines.distributed import dask_get_partition_size
10 11


Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
12 13 14 15
@click.command(
    entry_point_group="bob.pad.config",
    cls=ConfigCommand,
    epilog="""\b
16 17 18 19 20
 Command line examples\n
 -----------------------


 $ bob pad vanilla-pad my_experiment.py -vv
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
21
""",
22 23 24 25 26 27 28 29 30
)
@click.option(
    "--pipeline",
    "-p",
    required=True,
    entry_point_group="sklearn.pipeline",
    help="Feature extraction algorithm",
    cls=ResourceOption,
)
31 32 33 34 35 36 37
@click.option(
    "--decision_function",
    "-f",
    show_default=True,
    default="decision_function",
    help="Name of the Pipeline step to call for results, eg 'score' or 'predict'"
)
38 39 40 41 42 43 44 45 46 47 48
@click.option(
    "--database",
    "-d",
    required=True,
    cls=ResourceOption,
    entry_point_group="bob.pad.database",
    help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)",
)
@click.option(
    "--dask-client",
    "-l",
49 50 51
    entry_point_group="dask.client",
    string_exceptions=VALID_DASK_CLIENT_STRINGS,
    default="single-threaded",
52
    help="Dask client for the execution of the pipeline.",
53
    cls=ResourceOption,
54 55 56 57 58 59 60
)
@click.option(
    "--group",
    "-g",
    "groups",
    type=click.Choice(["dev", "eval"]),
    multiple=True,
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
61
    default=("dev", "eval"),
62 63 64 65 66 67 68
    help="If given, this value will limit the experiments belonging to a particular group",
)
@click.option(
    "-o",
    "--output",
    show_default=True,
    default="results",
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
69
    help="Saves scores (and checkpoints) in this folder.",
70 71 72 73 74 75 76 77
)
@click.option(
    "--checkpoint",
    "-c",
    is_flag=True,
    help="If set, it will checkpoint all steps of the pipeline. Checkpoints will be saved in `--output`.",
    cls=ResourceOption,
)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
@click.option(
    "--dask-partition-size",
    "-s",
    help="If using Dask, this option defines the size of each dask.bag.partition."
    "Use this option if the current heuristic that sets this value doesn't suit your experiment."
    "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
@click.option(
    "--dask-n-workers",
    "-n",
    help="If using Dask, this option defines the number of workers to start your experiment."
    "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
    "Use this option if the current amount of workers set to start an experiment doesn't suit you.",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
98
@verbosity_option(cls=ResourceOption)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
99
@click.pass_context
100
def vanilla_pad(
101 102
    ctx,
    pipeline,
103
    decision_function,
104 105 106 107 108 109 110 111
    database,
    dask_client,
    groups,
    output,
    checkpoint,
    dask_partition_size,
    dask_n_workers,
    **kwargs,
112
):
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
113
    """Runs the simplest PAD pipeline."""
114 115

    import gzip
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
116
    import logging
117
    import os
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
118
    import sys
119 120
    from glob import glob

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
121
    import bob.pipelines as mario
122
    import dask.bag
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
123
    from bob.extension.scripts.click_helper import log_parameters
124
    from bob.pipelines.distributed.sge import get_resource_requirements
125 126
    from bob.pipelines.utils import isinstance_nested
    from bob.pipelines.wrappers import DaskWrapper
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
127 128 129

    logger = logging.getLogger(__name__)
    log_parameters(logger)
130 131 132 133 134 135 136 137

    os.makedirs(output, exist_ok=True)

    if checkpoint:
        pipeline = mario.wrap(
            ["checkpoint"], pipeline, features_dir=output, model_path=output
        )

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    # Fetching samples
    fit_samples = database.fit_samples()
    total_samples = len(fit_samples)
    predict_samples = dict()
    for group in groups:
        predict_samples[group] = database.predict_samples(group=group)
        total_samples += len(predict_samples[group])

    # Checking if the pipieline is dask-wrapped
    first_step = pipeline[0]
    if not isinstance_nested(first_step, "estimator", DaskWrapper):

        # Scaling up if necessary
        if dask_n_workers is not None and not isinstance(dask_client, str):
            dask_client.cluster.scale(dask_n_workers)

        # Defining the partition size
        partition_size = None
        if not isinstance(dask_client, str):
            lower_bound = 25  # lower bound of 25 videos per chunk
            partition_size = dask_get_partition_size(
                dask_client.cluster, total_samples, lower_bound=lower_bound
            )
        if dask_partition_size is not None:
            partition_size = dask_partition_size

        pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
165 166 167 168 169 170 171 172

    # create an experiment info file
    with open(os.path.join(output, "Experiment_info.txt"), "wt") as f:
        f.write(f"{sys.argv!r}\n")
        f.write(f"database={database!r}\n")
        f.write("Pipeline steps:\n")
        for i, name, estimator in pipeline._iter():
            f.write(f"Step {i}: {name}\n{estimator!r}\n")
173 174

    # train the pipeline
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
175
    pipeline.fit(fit_samples)
176 177 178 179

    for group in groups:

        logger.info(f"Running vanilla biometrics for group {group}")
180
        result = getattr(pipeline, decision_function)(predict_samples[group])
181

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
182
        scores_path = os.path.join(output, f"scores-{group}")
183

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
184
        if isinstance(result, dask.bag.core.Bag):
185

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
186 187 188 189 190 191
            # write each partition into a zipped txt file
            result = result.map(pad_predicted_sample_to_score_line)
            prefix, postfix = f"{output}/scores/scores-{group}-", ".txt.gz"
            pattern = f"{prefix}*{postfix}"
            os.makedirs(os.path.dirname(prefix), exist_ok=True)
            logger.info("Writing bag results into files ...")
192
            resources = get_resource_requirements(pipeline)
193 194 195
            result.to_textfiles(
                pattern, last_endline=True, scheduler=dask_client, resources=resources
            )
196

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
197
            with open(scores_path, "w") as f:
198 199 200 201 202 203 204
                # concatenate scores into one score file
                for path in sorted(
                    glob(pattern),
                    key=lambda l: int(l.replace(prefix, "").replace(postfix, "")),
                ):
                    with gzip.open(path, "rt") as f2:
                        f.write(f2.read())
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
205 206
                    # delete intermediate score files
                    os.remove(path)
207

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
208 209
        else:
            with open(scores_path, "w") as f:
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
                for sample in result:
                    f.write(pad_predicted_sample_to_score_line(sample, endl="\n"))


def pad_predicted_sample_to_score_line(sample, endl=""):
    claimed_id, test_label, score = sample.subject, sample.key, sample.data

    # # use the model_label field to indicate frame number
    # model_label = None
    # if hasattr(sample, "frame_id"):
    #     model_label = sample.frame_id

    real_id = claimed_id if sample.is_bonafide else sample.attack_type

    return f"{claimed_id} {real_id} {test_label} {score}{endl}"
    # return f"{claimed_id} {model_label} {real_id} {test_label} {score}{endl}"