Commit 1adbb4a4 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

[dask] Moved dask_get_partition_size from bob.bio.base to bob.pipelines

parent 4e34fdfd
Pipeline #46383 passed with stage
in 5 minutes and 47 seconds
......@@ -7,7 +7,6 @@ from .wrappers import (
BioAlgorithmDaskWrapper,
dask_vanilla_biometrics,
checkpoint_vanilla_biometrics,
dask_get_partition_size,
is_checkpointed,
)
......@@ -51,7 +50,6 @@ __appropriate__(
BioAlgorithmDaskWrapper,
dask_vanilla_biometrics,
checkpoint_vanilla_biometrics,
dask_get_partition_size,
is_checkpointed,
ZTNormPipeline,
ZTNormDaskWrapper,
......
......@@ -8,9 +8,9 @@ from bob.bio.base.pipelines.vanilla_biometrics import FourColumnsScoreWriter
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormCheckpointWrapper
from bob.bio.base.pipelines.vanilla_biometrics import ZTNormPipeline
from bob.bio.base.pipelines.vanilla_biometrics import checkpoint_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import dask_get_partition_size
from bob.bio.base.pipelines.vanilla_biometrics import dask_vanilla_biometrics
from bob.bio.base.pipelines.vanilla_biometrics import is_checkpointed
from bob.pipelines.distributed import dask_get_partition_size
from bob.pipelines.utils import isinstance_nested
from dask.delayed import Delayed
......
......@@ -300,39 +300,6 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
return pipeline
def dask_get_partition_size(cluster, n_objects, lower_bound=200):
"""
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions
for best practices
Parameters
----------
cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
n_objects: int
Number of objects to be processed
lower_bound: int
Minimum partitions size.
"""
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
# Trying to set a lower bound for the
return (
max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects
)
def checkpoint_vanilla_biometrics(
pipeline, base_dir, biometric_algorithm_dir=None, hash_fn=None
):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment