vanilla_pad.py 6.84 KB
Newer Older
1 2 3
"""Executes PAD pipeline"""


4
from bob.pipelines.distributed import VALID_DASK_CLIENT_STRINGS
5
import click
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
6 7 8
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
9
from bob.pipelines.distributed import dask_get_partition_size
10 11


Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
12 13 14 15
@click.command(
    entry_point_group="bob.pad.config",
    cls=ConfigCommand,
    epilog="""\b
16 17 18 19 20
 Command line examples\n
 -----------------------


 $ bob pad vanilla-pad my_experiment.py -vv
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
21
""",
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
)
@click.option(
    "--pipeline",
    "-p",
    required=True,
    entry_point_group="sklearn.pipeline",
    help="Feature extraction algorithm",
    cls=ResourceOption,
)
@click.option(
    "--database",
    "-d",
    required=True,
    cls=ResourceOption,
    entry_point_group="bob.pad.database",
    help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)",
)
@click.option(
    "--dask-client",
    "-l",
42 43 44
    entry_point_group="dask.client",
    string_exceptions=VALID_DASK_CLIENT_STRINGS,
    default="single-threaded",
45
    help="Dask client for the execution of the pipeline.",
46
    cls=ResourceOption,
47 48 49 50 51 52 53
)
@click.option(
    "--group",
    "-g",
    "groups",
    type=click.Choice(["dev", "eval"]),
    multiple=True,
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
54
    default=("dev", "eval"),
55 56 57 58 59 60 61
    help="If given, this value will limit the experiments belonging to a particular group",
)
@click.option(
    "-o",
    "--output",
    show_default=True,
    default="results",
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
62
    help="Saves scores (and checkpoints) in this folder.",
63 64 65 66 67 68 69 70
)
@click.option(
    "--checkpoint",
    "-c",
    is_flag=True,
    help="If set, it will checkpoint all steps of the pipeline. Checkpoints will be saved in `--output`.",
    cls=ResourceOption,
)
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
@click.option(
    "--dask-partition-size",
    "-s",
    help="If using Dask, this option defines the size of each dask.bag.partition."
    "Use this option if the current heuristic that sets this value doesn't suit your experiment."
    "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
@click.option(
    "--dask-n-workers",
    "-n",
    help="If using Dask, this option defines the number of workers to start your experiment."
    "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
    "Use this option if the current amount of workers set to start an experiment doesn't suit you.",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
91
@verbosity_option(cls=ResourceOption)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
92
@click.pass_context
93
def vanilla_pad(
94 95 96 97 98 99 100 101 102 103
    ctx,
    pipeline,
    database,
    dask_client,
    groups,
    output,
    checkpoint,
    dask_partition_size,
    dask_n_workers,
    **kwargs,
104
):
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
105
    """Runs the simplest PAD pipeline."""
106 107

    import gzip
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
108
    import logging
109
    import os
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
110
    import sys
111 112
    from glob import glob

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
113
    import bob.pipelines as mario
114
    import dask.bag
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
115
    from bob.extension.scripts.click_helper import log_parameters
116
    from bob.pipelines.distributed.sge import get_resource_requirements
117 118
    from bob.pipelines.utils import isinstance_nested
    from bob.pipelines.wrappers import DaskWrapper
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
119 120 121

    logger = logging.getLogger(__name__)
    log_parameters(logger)
122 123 124 125 126 127 128 129

    os.makedirs(output, exist_ok=True)

    if checkpoint:
        pipeline = mario.wrap(
            ["checkpoint"], pipeline, features_dir=output, model_path=output
        )

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
    # Fetching samples
    fit_samples = database.fit_samples()
    total_samples = len(fit_samples)
    predict_samples = dict()
    for group in groups:
        predict_samples[group] = database.predict_samples(group=group)
        total_samples += len(predict_samples[group])

    # Checking if the pipieline is dask-wrapped
    first_step = pipeline[0]
    if not isinstance_nested(first_step, "estimator", DaskWrapper):

        # Scaling up if necessary
        if dask_n_workers is not None and not isinstance(dask_client, str):
            dask_client.cluster.scale(dask_n_workers)

        # Defining the partition size
        partition_size = None
        if not isinstance(dask_client, str):
            lower_bound = 25  # lower bound of 25 videos per chunk
            partition_size = dask_get_partition_size(
                dask_client.cluster, total_samples, lower_bound=lower_bound
            )
        if dask_partition_size is not None:
            partition_size = dask_partition_size

        pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
157 158 159 160 161 162 163 164

    # create an experiment info file
    with open(os.path.join(output, "Experiment_info.txt"), "wt") as f:
        f.write(f"{sys.argv!r}\n")
        f.write(f"database={database!r}\n")
        f.write("Pipeline steps:\n")
        for i, name, estimator in pipeline._iter():
            f.write(f"Step {i}: {name}\n{estimator!r}\n")
165 166

    # train the pipeline
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
167
    pipeline.fit(fit_samples)
168 169 170 171

    for group in groups:

        logger.info(f"Running vanilla biometrics for group {group}")
172
        result = pipeline.decision_function(predict_samples[group])
173

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
174
        scores_path = os.path.join(output, f"scores-{group}")
175

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
176
        if isinstance(result, dask.bag.core.Bag):
177

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
178 179 180 181 182 183
            # write each partition into a zipped txt file
            result = result.map(pad_predicted_sample_to_score_line)
            prefix, postfix = f"{output}/scores/scores-{group}-", ".txt.gz"
            pattern = f"{prefix}*{postfix}"
            os.makedirs(os.path.dirname(prefix), exist_ok=True)
            logger.info("Writing bag results into files ...")
184
            resources = get_resource_requirements(pipeline)
185 186 187
            result.to_textfiles(
                pattern, last_endline=True, scheduler=dask_client, resources=resources
            )
188

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
189
            with open(scores_path, "w") as f:
190 191 192 193 194 195 196
                # concatenate scores into one score file
                for path in sorted(
                    glob(pattern),
                    key=lambda l: int(l.replace(prefix, "").replace(postfix, "")),
                ):
                    with gzip.open(path, "rt") as f2:
                        f.write(f2.read())
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
197 198
                    # delete intermediate score files
                    os.remove(path)
199

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
200 201
        else:
            with open(scores_path, "w") as f:
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
                for sample in result:
                    f.write(pad_predicted_sample_to_score_line(sample, endl="\n"))


def pad_predicted_sample_to_score_line(sample, endl=""):
    claimed_id, test_label, score = sample.subject, sample.key, sample.data

    # # use the model_label field to indicate frame number
    # model_label = None
    # if hasattr(sample, "frame_id"):
    #     model_label = sample.frame_id

    real_id = claimed_id if sample.is_bonafide else sample.attack_type

    return f"{claimed_id} {real_id} {test_label} {score}{endl}"
    # return f"{claimed_id} {model_label} {real_id} {test_label} {score}{endl}"