Handling tasks with in heterogeneous workers
Hi guys,
This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.
Solving 1: To launch Heterogeneous jobs, I had to extend dask_jobqueue.core.JobQueueCluster
in such way that every time that the pool of workers needs to be increased,
a new "worker_spec" is defined (https://github.com/dask/distributed/blob/ddbec38ba1ec6de913ccbfcd090f1c85eea1b032/distributed/deploy/spec.py#L443).
This is not the default behaviour of dask_jobqueue
, where it's assumed that everything is homogeneous.
Solving 2: To make the scheduler assign a dask.delayed
task to one specific worker it was supposed to be simple.
In case dask.workers
are manually launched, the script dask-worker
has an argument called --resources
where you can add a personal TAG to the worker https://distributed.dask.org/en/latest/resources.html#example.
Once this is done, a tuple containing the dask-delayed
hash and the personal TAG can be passed to the method dask.delayed.compute
(https://distributed.dask.org/en/latest/resources.html#resources-with-collections).
Then, everything works nicely.
HOWEVER, our dask.workers
are launched by the dask_jobqueue
and there's no way to amend the --resource
command-line argument to it.
I will propose a patch https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/core.py#L77 to solve this issue.
In the mean time, I implemented a work around here https://gitlab.idiap.ch/bob/bob.pipelines/blob/20d1732da5ce341db48a49da49f14b21ddaa2f67/bob/pipelines/distributed/sge.py#L18
It works nicely.
Follow a snippet to use this feature (i will amend it in a test case somehow)
from bob.pipelines.distributed.sge import SGEIdiapCluster, sge_submit_iobig_gpu
from dask.distributed import Client
import dask
from time import sleep
import os
# Defining dummy jobs
def inc(x):
sleep(1)
return x + 1
def double(x):
sleep(1)
print(os.system("nvidia-smi"))
return x + 2
def add(x, y):
sleep(1)
return x + y
# Defining my graph
X = [1,2,3,4,5,6,7]
output = []
resources = dict()
for x in X:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
# LOOK HERE
# Annotating certain jobs to run in the GPU
resources[tuple(b.__dask_keys__())] = {'GPU':1}
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
# Submitting 2 jobs in the iobig and one at the GPU queue
client = sge_submit_iobig_gpu(n_jobs_iobig=2, n_jobs_gpu=1)
# Run the graph with a resource specification
print(total.compute(scheduler=client, resources=resources))
pass
ping @andre.anjos