Commit 842c5992 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

Updated the partition_size heuristics

parent 3445bbe1
Pipeline #46265 passed with stage
in 6 minutes and 12 seconds
......@@ -300,12 +300,15 @@ def dask_vanilla_biometrics(pipeline, npartitions=None, partition_size=None):
return pipeline
def dask_get_partition_size(cluster, n_objects):
def dask_get_partition_size(cluster, n_objects, lower_bound=200):
"""
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions
for best practices
Parameters
----------
......@@ -315,12 +318,19 @@ def dask_get_partition_size(cluster, n_objects):
n_objects: int
Number of objects to be processed
lower_bound: int
Minimum partitions size.
"""
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
return n_objects // (max_jobs * 2) if n_objects > max_jobs else 1
# Trying to set a lower bound for the
return (
max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects
)
def checkpoint_vanilla_biometrics(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment