Skip to content

Handling tasks with in heterogeneous workers

Tiago de Freitas Pereira requested to merge heterogenous_queue into master

Hi guys,

This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.

Solving 1: To launch Heterogeneous jobs, I had to extend dask_jobqueue.core.JobQueueCluster in such way that every time that the pool of workers needs to be increased, a new "worker_spec" is defined ( This is not the default behaviour of dask_jobqueue, where it's assumed that everything is homogeneous.

Solving 2: To make the scheduler assign a dask.delayed task to one specific worker it was supposed to be simple. In case dask.workers are manually launched, the script dask-worker has an argument called --resources where you can add a personal TAG to the worker Once this is done, a tuple containing the dask-delayed hash and the personal TAG can be passed to the method dask.delayed.compute ( Then, everything works nicely.

HOWEVER, our dask.workers are launched by the dask_jobqueue and there's no way to amend the --resource command-line argument to it. I will propose a patch to solve this issue. In the mean time, I implemented a work around here

It works nicely.

Follow a snippet to use this feature (i will amend it in a test case somehow)

from bob.pipelines.distributed.sge import SGEIdiapCluster, sge_submit_iobig_gpu
from dask.distributed import Client
import dask
from time import sleep
import os

# Defining dummy jobs
def inc(x):
    return x + 1

def double(x):    
    return x + 2

def add(x, y):
    return x + y

# Defining my graph
X = [1,2,3,4,5,6,7]
output = []
resources = dict()

for x in X:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)

    # Annotating certain jobs to run in the GPU
    resources[tuple(b.__dask_keys__())] = {'GPU':1}

    c = dask.delayed(add)(a, b)

total = dask.delayed(sum)(output)

# Submitting 2 jobs in the iobig and one at the GPU queue
client = sge_submit_iobig_gpu(n_jobs_iobig=2, n_jobs_gpu=1)

# Run the graph with a resource specification
print(total.compute(scheduler=client, resources=resources))


ping @andre.anjos

Merge request reports