Commit 0ee60e58 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

[vanilla-pad] Improve the dask client option

parent da4bdcd5
Pipeline #46422 passed with stage
in 8 minutes and 18 seconds
"""Executes PAD pipeline""" """Executes PAD pipeline"""
from bob.bio.base.script.vanilla_biometrics import VALID_DASK_CLIENT_STRINGS
import click import click
from bob.extension.scripts.click_helper import ConfigCommand from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption from bob.extension.scripts.click_helper import ResourceOption
...@@ -37,9 +38,11 @@ from bob.extension.scripts.click_helper import verbosity_option ...@@ -37,9 +38,11 @@ from bob.extension.scripts.click_helper import verbosity_option
@click.option( @click.option(
"--dask-client", "--dask-client",
"-l", "-l",
required=False, entry_point_group="dask.client",
cls=ResourceOption, string_exceptions=VALID_DASK_CLIENT_STRINGS,
default="single-threaded",
help="Dask client for the execution of the pipeline.", help="Dask client for the execution of the pipeline.",
cls=ResourceOption,
) )
@click.option( @click.option(
"--group", "--group",
...@@ -78,6 +81,7 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint ...@@ -78,6 +81,7 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint
import bob.pipelines as mario import bob.pipelines as mario
import dask.bag import dask.bag
from bob.extension.scripts.click_helper import log_parameters from bob.extension.scripts.click_helper import log_parameters
from bob.pipelines.distributed.sge import get_resource_requirements
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
log_parameters(logger) log_parameters(logger)
...@@ -120,7 +124,8 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint ...@@ -120,7 +124,8 @@ def vanilla_pad(ctx, pipeline, database, dask_client, groups, output, checkpoint
pattern = f"{prefix}*{postfix}" pattern = f"{prefix}*{postfix}"
os.makedirs(os.path.dirname(prefix), exist_ok=True) os.makedirs(os.path.dirname(prefix), exist_ok=True)
logger.info("Writing bag results into files ...") logger.info("Writing bag results into files ...")
result.to_textfiles(pattern, last_endline=True, scheduler=dask_client) resources = get_resource_requirements(pipeline)
result.to_textfiles(pattern, last_endline=True, scheduler=dask_client, resources=resources)
with open(scores_path, "w") as f: with open(scores_path, "w") as f:
# concatenate scores into one score file # concatenate scores into one score file
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment