Commit 6469d19b authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch 'updates' into 'master'

Two new features

See merge request !56
parents 7b8e32c9 cf626943
Pipeline #46380 passed with stages
in 4 minutes and 46 seconds
......@@ -16,3 +16,36 @@ __path__ = extend_path(__path__, __name__)
# )
VALID_DASK_CLIENT_STRINGS = ("single-threaded", "sync", "threaded", "processes")
def dask_get_partition_size(cluster, n_objects, lower_bound=200):
Heuristics that gives you a number for dask.partition_size.
The heuristics is pretty simple, given the max number of possible workers to be run
in a queue (not the number of current workers running) and a total number objects to be processed do n_objects/n_max_workers:
for best practices
cluster: :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
Cluster of the type :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`
n_objects: int
Number of objects to be processed
lower_bound: int
Minimum partition size.
if not isinstance(cluster, SGEMultipleQueuesCluster):
return None
max_jobs = cluster.sge_job_spec["default"]["max_jobs"]
# Trying to set a lower bound for the
return (
max(n_objects // max_jobs, lower_bound) if n_objects > max_jobs else n_objects
......@@ -290,10 +290,14 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
# Here the goal is to wait 2 minutes before scaling down since
# it is very expensive to get jobs on the SGE grid
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=10)
def _get_worker_spec_options(self, job_spec):
"""Craft a dask worker_spec to be used in the qsub command."""
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment