From 4b4ec3fcce5a7eead9502a4afac46c03868f040d Mon Sep 17 00:00:00 2001 From: Tiago Freitas Pereira <tiagofrepereira@gmail.com> Date: Mon, 7 Dec 2020 14:00:29 +0100 Subject: [PATCH] [dask] Moved dask_get_partition_size from bob.bio.base to bob.pipelines --- bob/pipelines/distributed/__init__.py | 35 +++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/bob/pipelines/distributed/__init__.py b/bob/pipelines/distributed/__init__.py index 33a51fa..77c779e 100644 --- a/bob/pipelines/distributed/__init__.py +++ b/bob/pipelines/distributed/__init__.py @@ -16,3 +16,38 @@ __path__ = extend_path(__path__, __name__) # ) VALID_DASK_CLIENT_STRINGS = ("single-threaded", "sync", "threaded", "processes") + + +def dask_get_partition_size(cluster, n_objects, lower_bound=200): + """ + Heuristics that gives you a number for dask.partition_size. + The heuristics is pretty simple, given the max number of possible workers to be run + in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers: + + Check https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions + for best practices + + Parameters + ---------- + + cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` + Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` + + n_objects: int + Number of objects to be processed + + lower_bound: int + Minimum partition size. + + """ + from .sge import SGEMultipleQueuesCluster + + if not isinstance(cluster, SGEMultipleQueuesCluster): + return None + + max_jobs = cluster.sge_job_spec["default"]["max_jobs"] + + # Trying to set a lower bound for the + return ( + max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects + ) -- GitLab