[dask] Moved dask_get_partition_size from bob.bio.base to bob.pipelines

parent a3210f40
Pipeline #46379 passed with stage
in 3 minutes and 45 seconds
......@@ -16,3 +16,36 @@ __path__ = extend_path(__path__, __name__)
# )
VALID_DASK_CLIENT_STRINGS = ("single-threaded", "sync", "threaded", "processes")
def dask_get_partition_size(cluster, n_objects, lower_bound=200):
"""
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions
for best practices
Parameters
----------
cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
n_objects: int
Number of objects to be processed
lower_bound: int
Minimum partition size.
"""
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
# Trying to set a lower bound for the
return (
max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment