vanilla_pad.py 7.13 KB
Newer Older
1 2 3
"""Executes PAD pipeline"""


4
from bob.pipelines.distributed import VALID_DASK_CLIENT_STRINGS
5
import click
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
6 7 8
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
9
from bob.pipelines.distributed import dask_get_partition_size
10 11


Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
12 13 14 15
@click.command(
    entry_point_group="bob.pad.config",
    cls=ConfigCommand,
    epilog="""\b
16 17 18 19 20
 Command line examples\n
 -----------------------


 $ bob pad vanilla-pad my_experiment.py -vv
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
21
""",
22 23 24 25 26 27 28 29 30
)
@click.option(
    "--pipeline",
    "-p",
    required=True,
    entry_point_group="sklearn.pipeline",
    help="Feature extraction algorithm",
    cls=ResourceOption,
)
31 32 33 34 35
@click.option(
    "--decision_function",
    "-f",
    show_default=True,
    default="decision_function",
36 37
    help="Name of the Pipeline step to call for results, eg 'score' or 'predict'",
    cls=ResourceOption,
38
)
39 40 41 42 43 44
@click.option(
    "--database",
    "-d",
    required=True,
    entry_point_group="bob.pad.database",
    help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)",
45
    cls=ResourceOption,
46 47 48 49
)
@click.option(
    "--dask-client",
    "-l",
50 51 52
    entry_point_group="dask.client",
    string_exceptions=VALID_DASK_CLIENT_STRINGS,
    default="single-threaded",
53
    help="Dask client for the execution of the pipeline.",
54
    cls=ResourceOption,
55 56 57 58 59 60 61
)
@click.option(
    "--group",
    "-g",
    "groups",
    type=click.Choice(["dev", "eval"]),
    multiple=True,
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
62
    default=("dev", "eval"),
63
    help="If given, this value will limit the experiments belonging to a particular group",
64
    cls=ResourceOption,
65 66 67 68 69 70
)
@click.option(
    "-o",
    "--output",
    show_default=True,
    default="results",
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
71
    help="Saves scores (and checkpoints) in this folder.",
72
    cls=ResourceOption,
73 74 75 76 77 78 79 80
)
@click.option(
    "--checkpoint",
    "-c",
    is_flag=True,
    help="If set, it will checkpoint all steps of the pipeline. Checkpoints will be saved in `--output`.",
    cls=ResourceOption,
)
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
@click.option(
    "--dask-partition-size",
    "-s",
    help="If using Dask, this option defines the size of each dask.bag.partition."
    "Use this option if the current heuristic that sets this value doesn't suit your experiment."
    "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
@click.option(
    "--dask-n-workers",
    "-n",
    help="If using Dask, this option defines the number of workers to start your experiment."
    "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved."
    "Use this option if the current amount of workers set to start an experiment doesn't suit you.",
    default=None,
    type=click.INT,
    cls=ResourceOption,
)
101
@verbosity_option(cls=ResourceOption)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
102
@click.pass_context
103
def vanilla_pad(
104 105
    ctx,
    pipeline,
106
    decision_function,
107 108 109 110 111 112 113 114
    database,
    dask_client,
    groups,
    output,
    checkpoint,
    dask_partition_size,
    dask_n_workers,
    **kwargs,
115
):
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
116
    """Runs the simplest PAD pipeline."""
117 118

    import gzip
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
119
    import logging
120
    import os
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
121
    import sys
122 123
    from glob import glob

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
124
    import bob.pipelines as mario
125
    import dask.bag
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
126
    from bob.extension.scripts.click_helper import log_parameters
127
    from bob.pipelines.distributed.sge import get_resource_requirements
128 129
    from bob.pipelines.utils import isinstance_nested
    from bob.pipelines.wrappers import DaskWrapper
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
130 131 132

    logger = logging.getLogger(__name__)
    log_parameters(logger)
133 134 135 136 137 138 139 140

    os.makedirs(output, exist_ok=True)

    if checkpoint:
        pipeline = mario.wrap(
            ["checkpoint"], pipeline, features_dir=output, model_path=output
        )

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    # Fetching samples
    fit_samples = database.fit_samples()
    total_samples = len(fit_samples)
    predict_samples = dict()
    for group in groups:
        predict_samples[group] = database.predict_samples(group=group)
        total_samples += len(predict_samples[group])

    # Checking if the pipieline is dask-wrapped
    first_step = pipeline[0]
    if not isinstance_nested(first_step, "estimator", DaskWrapper):

        # Scaling up if necessary
        if dask_n_workers is not None and not isinstance(dask_client, str):
            dask_client.cluster.scale(dask_n_workers)

        # Defining the partition size
        partition_size = None
        if not isinstance(dask_client, str):
            lower_bound = 25  # lower bound of 25 videos per chunk
            partition_size = dask_get_partition_size(
                dask_client.cluster, total_samples, lower_bound=lower_bound
            )
        if dask_partition_size is not None:
            partition_size = dask_partition_size

        pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size)
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
168 169 170 171 172 173 174 175

    # create an experiment info file
    with open(os.path.join(output, "Experiment_info.txt"), "wt") as f:
        f.write(f"{sys.argv!r}\n")
        f.write(f"database={database!r}\n")
        f.write("Pipeline steps:\n")
        for i, name, estimator in pipeline._iter():
            f.write(f"Step {i}: {name}\n{estimator!r}\n")
176 177

    # train the pipeline
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
178
    pipeline.fit(fit_samples)
179 180 181 182

    for group in groups:

        logger.info(f"Running vanilla biometrics for group {group}")
183
        result = getattr(pipeline, decision_function)(predict_samples[group])
184

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
185
        scores_path = os.path.join(output, f"scores-{group}")
186

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
187
        if isinstance(result, dask.bag.core.Bag):
188

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
189 190 191 192 193 194
            # write each partition into a zipped txt file
            result = result.map(pad_predicted_sample_to_score_line)
            prefix, postfix = f"{output}/scores/scores-{group}-", ".txt.gz"
            pattern = f"{prefix}*{postfix}"
            os.makedirs(os.path.dirname(prefix), exist_ok=True)
            logger.info("Writing bag results into files ...")
195
            resources = get_resource_requirements(pipeline)
196 197 198
            result.to_textfiles(
                pattern, last_endline=True, scheduler=dask_client, resources=resources
            )
199

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
200
            with open(scores_path, "w") as f:
201 202 203 204 205 206 207
                # concatenate scores into one score file
                for path in sorted(
                    glob(pattern),
                    key=lambda l: int(l.replace(prefix, "").replace(postfix, "")),
                ):
                    with gzip.open(path, "rt") as f2:
                        f.write(f2.read())
Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
208 209
                    # delete intermediate score files
                    os.remove(path)
210

Amir MOHAMMADI's avatar
Amir MOHAMMADI committed
211 212
        else:
            with open(scores_path, "w") as f:
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
                for sample in result:
                    f.write(pad_predicted_sample_to_score_line(sample, endl="\n"))


def pad_predicted_sample_to_score_line(sample, endl=""):
    claimed_id, test_label, score = sample.subject, sample.key, sample.data

    # # use the model_label field to indicate frame number
    # model_label = None
    # if hasattr(sample, "frame_id"):
    #     model_label = sample.frame_id

    real_id = claimed_id if sample.is_bonafide else sample.attack_type

    return f"{claimed_id} {real_id} {test_label} {score}{endl}"
    # return f"{claimed_id} {model_label} {real_id} {test_label} {score}{endl}"