Commit a0de0b05 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Merge branch 'dask-fix-cleanup' into 'dask-pipelines'

Fix Dask shutting down automatically when calling from a script

See merge request !203
parents 4d409e20 82ae4eef
Pipeline #44905 failed with stage
in 58 seconds
......@@ -14,3 +14,5 @@ from .wrappers import (
from .zt_norm import ZTNormPipeline, ZTNormDaskWrapper, ZTNormCheckpointWrapper
from .legacy import BioAlgorithmLegacy, DatabaseConnector
from .vanilla_biometrics import execute_vanilla_biometrics
import logging
import os
import dask.bag
from dask.delayed import Delayed
from bob.bio.base.pipelines.vanilla_biometrics import (
BioAlgorithmDaskWrapper,
checkpoint_vanilla_biometrics,
dask_vanilla_biometrics,
dask_get_partition_size,
FourColumnsScoreWriter,
CSVScoreWriter,
is_checkpointed,
)
from bob.pipelines.utils import isinstance_nested
logger = logging.getLogger(__name__)
def compute_scores(result, dask_client):
if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
if dask_client is not None:
result = result.compute(scheduler=dask_client)
else:
logger.warning("`dask_client` not set. Your pipeline will run locally")
result = result.compute(scheduler="single-threaded")
return result
def post_process_scores(pipeline, scores, path):
written_scores = pipeline.write_scores(scores)
return pipeline.post_process(written_scores, path)
def execute_vanilla_biometrics(
pipeline,
database,
dask_client,
groups,
output,
write_metadata_scores,
checkpoint,
**kwargs,
):
"""
Function that executes the Vanilla Biometrics pipeline.
This is called when using the ``bob bio pipelines vanilla-biometrics``
command.
This is also callable from a script without fear of interrupting the running
Dask instance, allowing chaining multiple experiments while keeping the
workers alive.
Parameters
----------
pipeline: Instance of :py:class:`~bob.bio.base.pipelines.vanilla_biometrics.VanillaBiometricsPipeline`
A constructed vanilla-biometrics pipeline.
database: Instance of :py:class:`~bob.bio.base.pipelines.vanilla_biometrics.abstract_class.Database`
A database interface instance
dask_client: instance of :py:class:`dask.distributed.Client` or ``None``
A Dask client instance used to run the experiment in parallel on multiple
machines, or locally.
Basic configs can be found in ``bob.pipelines.config.distributed``.
groups: list of str
Groups of the dataset that will be requested from the database interface.
output: str
Path where the results and checkpoints will be saved to.
write_metadata_scores: bool
Use the CSVScoreWriter instead of the FourColumnScoreWriter when True.
checkpoint: bool
Whether checkpoint files will be created for every step of the pipelines.
"""
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
if write_metadata_scores:
pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
else:
pipeline.score_writer = FourColumnsScoreWriter(os.path.join(output, "./tmp"))
# Check if it's already checkpointed
if checkpoint and not is_checkpointed(pipeline):
pipeline = checkpoint_vanilla_biometrics(pipeline, output)
background_model_samples = database.background_model_samples()
for group in groups:
score_file_name = os.path.join(output, f"scores-{group}")
biometric_references = database.references(group=group)
probes = database.probes(group=group)
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
pipeline = dask_vanilla_biometrics(
pipeline,
partition_size=dask_get_partition_size(dask_client.cluster, n_objects),
)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
database.allow_scoring_with_all_biometric_references
if hasattr(database, "allow_scoring_with_all_biometric_references")
else False
)
result = pipeline(
background_model_samples,
biometric_references,
probes,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
post_processed_scores = post_process_scores(pipeline, result, score_file_name)
_ = compute_scores(post_processed_scores, dask_client)
......@@ -15,18 +15,7 @@ from bob.extension.scripts.click_helper import (
import logging
import os
import dask.bag
from bob.bio.base.pipelines.vanilla_biometrics import (
BioAlgorithmDaskWrapper,
checkpoint_vanilla_biometrics,
dask_vanilla_biometrics,
dask_get_partition_size,
FourColumnsScoreWriter,
CSVScoreWriter,
is_checkpointed,
)
from dask.delayed import Delayed
from bob.pipelines.utils import isinstance_nested
from bob.bio.base.pipelines.vanilla_biometrics import execute_vanilla_biometrics
logger = logging.getLogger(__name__)
......@@ -62,24 +51,9 @@ It is possible to do it via configuration file
>>> database = .... # Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)"
\b
"""
def compute_scores(result, dask_client):
if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag):
if dask_client is not None:
result = result.compute(scheduler=dask_client)
else:
logger.warning("`dask_client` not set. Your pipeline will run locally")
result = result.compute(scheduler="single-threaded")
return result
def post_process_scores(pipeline, scores, path):
writed_scores = pipeline.write_scores(scores)
return pipeline.post_process(writed_scores, path)
"""
@click.command(
......@@ -200,60 +174,22 @@ def vanilla_biometrics(
This pipeline runs: `BioAlgorithm.score(Pipeline.transform(DATA_SCORE, biometric_references))` >> biometric_references
"""
if not os.path.exists(output):
os.makedirs(output, exist_ok=True)
if write_metadata_scores:
pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp"))
else:
pipeline.score_writer = FourColumnsScoreWriter(os.path.join(output, "./tmp"))
.. Note::
Refrain from calling this function directly from a script. Prefer
:py:func:`~bob.bio.base.pipelines.vanilla_biometrics.execute_vanilla_biometrics`
instead.
# Check if it's already checkpointed
if checkpoint and not is_checkpointed(pipeline):
pipeline = checkpoint_vanilla_biometrics(pipeline, output)
background_model_samples = database.background_model_samples()
for group in groups:
score_file_name = os.path.join(output, f"scores-{group}")
biometric_references = database.references(group=group)
probes = database.probes(group=group)
if dask_client is not None and not isinstance_nested(
pipeline.biometric_algorithm, "biometric_algorithm", BioAlgorithmDaskWrapper
):
n_objects = max(
len(background_model_samples), len(biometric_references), len(probes)
)
pipeline = dask_vanilla_biometrics(
pipeline,
partition_size=dask_get_partition_size(dask_client.cluster, n_objects),
)
logger.info(f"Running vanilla biometrics for group {group}")
allow_scoring_with_all_biometric_references = (
database.allow_scoring_with_all_biometric_references
if hasattr(database, "allow_scoring_with_all_biometric_references")
else False
)
result = pipeline(
background_model_samples,
biometric_references,
probes,
allow_scoring_with_all_biometric_references=allow_scoring_with_all_biometric_references,
)
post_processed_scores = post_process_scores(pipeline, result, score_file_name)
_ = compute_scores(post_processed_scores, dask_client)
"""
logger.debug("Executing Vanilla-biometrics")
execute_vanilla_biometrics(
pipeline,
database,
dask_client,
groups,
output,
write_metadata_scores,
checkpoint,
**kwargs,
)
logger.info("Experiment finished !!!!!")
if dask_client is not None:
logger.info("Shutdown workers !!!!!")
dask_client.shutdown()
logger.info("Done !!!!!")
\ No newline at end of file
......@@ -47,7 +47,7 @@ $ bob pipelines vanilla-biometrics DATABASE PIPELINE -vv
Check out all PIPELINE available by running:
`resource.py --types pipeline`
\b
and all available databases by running:
`resource.py --types database`
......@@ -66,7 +66,7 @@ It is possible to do it via configuration file
>>> database = .... # Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)"
\b
"""
......@@ -292,7 +292,7 @@ def vanilla_biometrics_ztnorm(
def _build_filename(score_file_name, suffix):
return os.path.join(score_file_name, suffix)
# Running RAW_SCORES
# Running RAW_SCORES
raw_scores = post_process_scores(
pipeline, raw_scores, _build_filename(score_file_name, "raw_scores")
)
......@@ -331,7 +331,3 @@ def vanilla_biometrics_ztnorm(
_ = compute_scores(zt_normed_scores, dask_client)
logger.info("Experiment finished !!!!!")
if dask_client is not None:
logger.info("Shutdown workers !!!!!")
dask_client.shutdown()
logger.info("Done !!!!!")
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment