Skip to content

Adaptive and heterogenous resource allocation

Tiago de Freitas Pereira requested to merge adaptive-qsub into master

Hi people

With this MR, we can submit jobs via dask SpecCluster with different SGE specifications and I allowed the scheduler to dynamically request for some SGE job specs.

The Dask scheduler has a heuristics to create and destroy workers based on CPU and MEMORY load. The scheduler that I extended does the same (it's the same method), but also request for SGE requirements if necessary.

Follow below a use case.

The code below executes the following graph:

Screen_Shot_2020-04-06_at_12.06.51

All the jobs can be executed in all.q hosts, except the bottleneck one (pointed in the image). For this one, only q_1day hosts can take it.

Our scheduler (IdiapScheduler) recognizes that a task requests an specific type of worker and just ask it. Our SGEIdiapCluster recognizes that kind of requests and does qsub with the proper requirements

# from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
from bob.pipelines.distributed.sge import SGEIdiapCluster, AdaptiveIdiap
from dask.distributed import Client

import time
import os


import logging
logger = logging.getLogger("distributed")
logger.setLevel(0)


def dependency_example(client):
    import dask

    # Defining dummy jobs
    def inc(x, value):
        time.sleep(0.25)
        return x + value

    def bottleneck(x):
        time.sleep(20)
        return sum(x)

    # Defining my graph    
    X = list(range(100))
    #X = list(range(2))
    output = []
    resources = dict()

    paralell_data = []
    for x in X:
        a = dask.delayed(inc)(x, 1)
        b = dask.delayed(inc)(a, 2)
        paralell_data.append(b)

    
    # Bottleneck
    c = dask.delayed(bottleneck)(paralell_data)
    # LOOK HERE
    # Annotating certain jobs to run in the GPU
    resources[tuple(c.__dask_keys__())] = {'q_1day':1}    

    final_parelell_data = []
    for x in paralell_data:
        final_parelell_data.append(dask.delayed(inc)(x, c))

    total = dask.delayed(sum)(final_parelell_data)
    print(total.compute(scheduler=client, resources=resources))    
    #print(total.compute(scheduler="single-threaded"))


Q_1DAY_ALL = {
    "q_1day": {
        "queue": "q_1day",
        "memory": "8GB",
        "io_big": True,
        "resource_spec": "",
        "resources": {"q_1day":1},
    },
    "default": {
        "queue": "all.q",
        "memory": "4GB",
        "io_big": False,
        "resource_spec": "",
        "resources": "",
    }
}


cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_ALL)
cluster.scale(1, sge_job_spec_key="default")
cluster.adapt(minimum=0, maximum=48)

client = Client(cluster)
dependency_example(client)
#dependency_example(None)
Edited by Tiago de Freitas Pereira

Merge request reports