Commit 9284e2a7 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch 'dask-pipelines' into 'master'

[vanilla-pad] Improve the dask client option and delayed annotations

See merge request !80
parents b22ac980 4d84dfb5
Pipeline #46367 passed with stages
in 5 minutes and 9 seconds
......@@ -15,7 +15,7 @@ def _padfile_to_delayed_sample(padfile, database):
subject=str(padfile.client_id),
attack_type=padfile.attack_type,
key=padfile.path,
annotations=padfile.annotations,
delayed_attributes=dict(annotations=lambda : padfile.annotations),
is_bonafide=padfile.attack_type is None,
)
......
"""Executes PAD pipeline"""
from bob.pipelines.distributed import VALID_DASK_CLIENT_STRINGS
import click
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
......@@ -37,9 +38,11 @@ from bob.extension.scripts.click_helper import verbosity_option
@click.option(
"--dask-client",
"-l",
required=False,
cls=ResourceOption,
entry_point_group="dask.client",
string_exceptions=VALID_DASK_CLIENT_STRINGS,
default="single-threaded",
help="Dask client for the execution of the pipeline.",
cls=ResourceOption,
)
@click.option(
"--group",
......@@ -66,7 +69,9 @@ from bob.extension.scripts.click_helper import verbosity_option
)
@verbosity_option(cls=ResourceOption)
@click.pass_context
def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint, **kwargs):
def vanilla_pad(
ctx, pipeline, database, dask_client, groups, output, checkpoint, **kwargs
):
"""Runs the simplest PAD pipeline."""
import gzip
......@@ -78,6 +83,7 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint
import bob.pipelines as mario
import dask.bag
from bob.extension.scripts.click_helper import log_parameters
from bob.pipelines.distributed.sge import get_resource_requirements
logger = logging.getLogger(__name__)
log_parameters(logger)
......@@ -120,7 +126,10 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint
pattern = f"{prefix}*{postfix}"
os.makedirs(os.path.dirname(prefix), exist_ok=True)
logger.info("Writing bag results into files ...")
result.to_textfiles(pattern, last_endline=True, scheduler=dask_client)
resources = get_resource_requirements(pipeline)
result.to_textfiles(
pattern, last_endline=True, scheduler=dask_client, resources=resources
)
with open(scores_path, "w") as f:
# concatenate scores into one score file
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment