Adaptive and heterogenous resource allocation
Hi people
With this MR, we can submit jobs via dask SpecCluster with different SGE specifications and I allowed the scheduler to dynamically request for some SGE job specs.
The Dask scheduler has a heuristics to create and destroy workers based on CPU and MEMORY load. The scheduler that I extended does the same (it's the same method), but also request for SGE requirements if necessary.
Follow below a use case.
The code below executes the following graph:
All the jobs can be executed in all.q
hosts, except the bottleneck one (pointed in the image).
For this one, only q_1day
hosts can take it.
Our scheduler (IdiapScheduler
) recognizes that a task requests an specific type of worker and just ask it.
Our SGEIdiapCluster
recognizes that kind of requests and does qsub
with the proper requirements
# from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
from bob.pipelines.distributed.sge import SGEIdiapCluster, AdaptiveIdiap
from dask.distributed import Client
import time
import os
import logging
logger = logging.getLogger("distributed")
logger.setLevel(0)
def dependency_example(client):
import dask
# Defining dummy jobs
def inc(x, value):
time.sleep(0.25)
return x + value
def bottleneck(x):
time.sleep(20)
return sum(x)
# Defining my graph
X = list(range(100))
#X = list(range(2))
output = []
resources = dict()
paralell_data = []
for x in X:
a = dask.delayed(inc)(x, 1)
b = dask.delayed(inc)(a, 2)
paralell_data.append(b)
# Bottleneck
c = dask.delayed(bottleneck)(paralell_data)
# LOOK HERE
# Annotating certain jobs to run in the GPU
resources[tuple(c.__dask_keys__())] = {'q_1day':1}
final_parelell_data = []
for x in paralell_data:
final_parelell_data.append(dask.delayed(inc)(x, c))
total = dask.delayed(sum)(final_parelell_data)
print(total.compute(scheduler=client, resources=resources))
#print(total.compute(scheduler="single-threaded"))
Q_1DAY_ALL = {
"q_1day": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": {"q_1day":1},
},
"default": {
"queue": "all.q",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"resources": "",
}
}
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_ALL)
cluster.scale(1, sge_job_spec_key="default")
cluster.adapt(minimum=0, maximum=48)
client = Client(cluster)
dependency_example(client)
#dependency_example(None)