Skip to content

Handling tasks with in heterogeneous workers

Tiago de Freitas Pereira requested to merge heterogenous_queue into master

Hi guys,

This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.

Solving 1: To launch Heterogeneous jobs, I had to extend dask_jobqueue.core.JobQueueCluster in such way that every time that the pool of workers needs to be increased, a new "worker_spec" is defined (https://github.com/dask/distributed/blob/ddbec38ba1ec6de913ccbfcd090f1c85eea1b032/distributed/deploy/spec.py#L443). This is not the default behaviour of dask_jobqueue, where it's assumed that everything is homogeneous.

Solving 2: To make the scheduler assign a dask.delayed task to one specific worker it was supposed to be simple. In case dask.workers are manually launched, the script dask-worker has an argument called --resources where you can add a personal TAG to the worker https://distributed.dask.org/en/latest/resources.html#example. Once this is done, a tuple containing the dask-delayed hash and the personal TAG can be passed to the method dask.delayed.compute (https://distributed.dask.org/en/latest/resources.html#resources-with-collections). Then, everything works nicely.

HOWEVER, our dask.workers are launched by the dask_jobqueue and there's no way to amend the --resource command-line argument to it. I will propose a patch https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/core.py#L77 to solve this issue. In the mean time, I implemented a work around here https://gitlab.idiap.ch/bob/bob.pipelines/blob/20d1732da5ce341db48a49da49f14b21ddaa2f67/bob/pipelines/distributed/sge.py#L18

It works nicely.

Follow a snippet to use this feature (i will amend it in a test case somehow)

from bob.pipelines.distributed.sge import SGEIdiapCluster, sge_submit_iobig_gpu
from dask.distributed import Client
import dask
from time import sleep
import os

# Defining dummy jobs
def inc(x):
    sleep(1)
    return x + 1

def double(x):    
    sleep(1)
    print(os.system("nvidia-smi"))
    return x + 2

def add(x, y):
    sleep(1)
    return x + y

# Defining my graph
X = [1,2,3,4,5,6,7]
output = []
resources = dict()

for x in X:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)

    # LOOK HERE
    # Annotating certain jobs to run in the GPU
    resources[tuple(b.__dask_keys__())] = {'GPU':1}

    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

# Submitting 2 jobs in the iobig and one at the GPU queue
client = sge_submit_iobig_gpu(n_jobs_iobig=2, n_jobs_gpu=1)

# Run the graph with a resource specification
print(total.compute(scheduler=client, resources=resources))

pass

ping @andre.anjos

Merge request reports