Commit 60c135b1 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

[scripts][pipelines] allow dask-clients as strings

Some strings like single-threaded, processes, ... are now allowed as
options for the dask-client click option. Depends on bob.extension!122
parent b1e1beb5
Pipeline #46067 passed with stage
in 7 minutes and 43 seconds
import logging
import os
import dask.bag
from dask.delayed import Delayed
from bob.bio.base.pipelines.vanilla_biometrics import (
BioAlgorithmDaskWrapper,
checkpoint_vanilla_biometrics,
dask_vanilla_biometrics,
dask_get_partition_size,
FourColumnsScoreWriter,
CSVScoreWriter,
is_checkpointed,
)
from bob.bio.base.pipelines.vanilla_biometrics import BioAlgorithmDaskWrapper
from bob.bio.base.pipelines.vanilla_biometrics import CSVScoreWriter
from bob.bio.base.pipelines.vanilla_biometrics import FourColumnsScoreWriter
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormCheckpointWrapper
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormPipeline
from bob.bio.base.pipelines.vanilla_biometrics import checkpoint_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import dask_get_partition_size
from bob.bio.base.pipelines.vanilla_biometrics import dask_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import is_checkpointed
from bob.pipelines.utils import isinstance_nested
from bob.bio.base.pipelines.vanilla_biometrics import (
ZTNormPipeline,
ZTNormCheckpointWrapper,
)
from dask.delayed import Delayed
logger = logging.getLogger(__name__)
......@@ -110,19 +105,22 @@ def execute_vanilla_biometrics(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
# Scaling up
if dask_n_workers is not None:
if dask_n_workers is not None and not isinstance(dask_client, str):
dask_client.cluster.scale(dask_n_workers)
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
partition_size = (
dask_get_partition_size(dask_client.cluster, n_objects)
if dask_partition_size is None
else dask_partition_size
)
partition_size = None
if not isinstance(dask_client, str):
partition_size = dask_get_partition_size(dask_client.cluster, n_objects)
if dask_partition_size is not None:
partition_size = dask_partition_size
pipeline = dask_vanilla_biometrics(pipeline, partition_size=partition_size,)
pipeline = dask_vanilla_biometrics(
pipeline,
partition_size=partition_size,
)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
......@@ -193,7 +191,7 @@ def execute_vanilla_biometrics_ztnorm(
dask_partition_size: int
If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).
dask_n_workers: int
dask_n_workers: int
If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you.
ztnorm_cohort_proportion: float
......
......@@ -5,17 +5,15 @@
"""Executes biometric pipeline"""
import click
from bob.extension.scripts.click_helper import (
verbosity_option,
ResourceOption,
ConfigCommand,
)
import logging
import os
import click
from bob.bio.base.pipelines.vanilla_biometrics import execute_vanilla_biometrics
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
VALID_DASK_CLIENT_STRINGS = ("single-threaded", "sync", "threaded", "processes")
logger = logging.getLogger(__name__)
......@@ -57,7 +55,9 @@ It is possible to do it via configuration file
@click.command(
entry_point_group="bob.bio.config", cls=ConfigCommand, epilog=EPILOG,
entry_point_group="bob.bio.config",
cls=ConfigCommand,
epilog=EPILOG,
)
@click.option(
"--pipeline",
......@@ -79,6 +79,8 @@ It is possible to do it via configuration file
"--dask-client",
"-l",
entry_point_group="dask.client",
string_exceptions=VALID_DASK_CLIENT_STRINGS,
default="single-threaded",
help="Dask client for the execution of the pipeline.",
cls=ResourceOption,
)
......
......@@ -5,17 +5,15 @@
"""Executes biometric pipeline"""
import click
from bob.extension.scripts.click_helper import (
verbosity_option,
ResourceOption,
ConfigCommand,
)
import logging
import click
from bob.bio.base.pipelines.vanilla_biometrics import execute_vanilla_biometrics_ztnorm
from bob.extension.scripts.click_helper import ConfigCommand
from bob.extension.scripts.click_helper import ResourceOption
from bob.extension.scripts.click_helper import verbosity_option
from .vanilla_biometrics import VALID_DASK_CLIENT_STRINGS
logger = logging.getLogger(__name__)
......@@ -56,7 +54,9 @@ It is possible to do it via configuration file
@click.command(
entry_point_group="bob.bio.config", cls=ConfigCommand, epilog=EPILOG,
entry_point_group="bob.bio.config",
cls=ConfigCommand,
epilog=EPILOG,
)
@click.option(
"--pipeline",
......@@ -78,6 +78,8 @@ It is possible to do it via configuration file
"--dask-client",
"-l",
entry_point_group="dask.client",
string_exceptions=VALID_DASK_CLIENT_STRINGS,
default="single-threaded",
help="Dask client for the execution of the pipeline.",
cls=ResourceOption,
)
......
......@@ -24,7 +24,7 @@ requirements:
- bob.blitz
- bob.core
- bob.db.base >2.2.0
- bob.extension
- bob.extension >5.0.1
- bob.io.base
- bob.learn.activation
- bob.learn.em
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment