Skip to content
Snippets Groups Projects
Commit 119f5780 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira
Browse files

First building blocks for adaptive cluster

Organized cluster_specs

Fixed job counting

Make method scale request an specific grid spec

Created the basic building blocks heterogeneous adaptation

Finished implementation

Finished implementation

Removed unecessary lines

Fixed scale_down issue

Improved recomendation mechanism

Removed unused variable

Removed unused variable
parent de866405
No related branches found
No related tags found
1 merge request!21Adaptive and heterogenous resource allocation
Pipeline #38763 failed
...@@ -17,4 +17,5 @@ build ...@@ -17,4 +17,5 @@ build
.coverage .coverage
record.txt record.txt
miniconda.sh miniconda.sh
miniconda/ miniconda/
\ No newline at end of file *.DS_Store
...@@ -14,6 +14,8 @@ from dask_jobqueue.sge import SGEJob ...@@ -14,6 +14,8 @@ from dask_jobqueue.sge import SGEJob
from distributed.scheduler import Scheduler from distributed.scheduler import Scheduler
from distributed import SpecCluster from distributed import SpecCluster
import dask import dask
from distributed.scheduler import Scheduler
from distributed.deploy import Adaptive
class SGEIdiapJob(Job): class SGEIdiapJob(Job):
...@@ -55,7 +57,17 @@ class SGEIdiapJob(Job): ...@@ -55,7 +57,17 @@ class SGEIdiapJob(Job):
# Amending the --resources in the `distributed.cli.dask_worker` CLI command # Amending the --resources in the `distributed.cli.dask_worker` CLI command
if "resources" in kwargs and kwargs["resources"]: if "resources" in kwargs and kwargs["resources"]:
resources = kwargs["resources"] resources = kwargs["resources"]
self._command_template += f" --resources {resources}"
# Preparing the string to be sent to `dask-worker` command
def _resource_to_str(resources):
resources_str = ""
for k in resources:
resources_str += f"{k}={resources[k]}"
return resources_str
resources_str = _resource_to_str(resources)
self._command_template += f" --resources {resources_str}"
header_lines = [] header_lines = []
if self.job_name is not None: if self.job_name is not None:
...@@ -86,25 +98,144 @@ class SGEIdiapJob(Job): ...@@ -86,25 +98,144 @@ class SGEIdiapJob(Job):
logger.debug("Job script: \n %s" % self.job_script()) logger.debug("Job script: \n %s" % self.job_script())
Q_ALL_SPEC = {
"default": {
"queue": "all.q",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"resources": "",
}
}
Q_1DAY_IO_BIG_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
}
}
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": "GPU",
},
}
class SGEIdiapCluster(JobQueueCluster): class SGEIdiapCluster(JobQueueCluster):
""" Launch Dask jobs in the IDIAP SGE cluster """ Launch Dask jobs in the IDIAP SGE cluster
Parameters
----------
log_directory: str
Default directory for the SGE logs
protocol: str
Scheduler communication protocol
dashboard_address: str
Default port for the dask dashboard,
env_extra: str,
Extra environment variables to send to the workers
sge_job_spec: dict
Dictionary containing a minimum specification for the qsub command.
It cosists of:
queue: SGE queue
memory: Memory requirement in GB (e.g. 4GB)
io_bio: set the io_big flag
resource_spec: Whatever extra argument to be sent to qsub (qsub -l)
tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html)
Below follow a vanilla-example that will create a set of jobs on all.q
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster()
>>> cluster.scale_up(10)
>>> client = Client(cluster)
It's possible to demand a resource specification yourself:
>>> Q_1DAY_IO_BIG_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
}
}
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC)
>>> cluster.scale_up(10)
>>> client = Client(cluster)
More than one jon spec can be set:
>>> Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"GPU":1},
},
}
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale_up(10)
>>> cluster.scale_up(1, sge_job_spec_key="gpu")
>>> client = Client(cluster)
Adaptive job allocation can also be used via `AdaptiveIdiap` extension
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10)
>>> client = Client(cluster)
""" """
def __init__(self, env_extra=None, **kwargs): def __init__(
self,
log_directory="./logs",
protocol="tcp://",
dashboard_address=":8787",
env_extra=None,
sge_job_spec=Q_ALL_SPEC,
**kwargs,
):
# Defining the job launcher # Defining the job launcher
self.job_cls = SGEIdiapJob self.job_cls = SGEIdiapJob
self.sge_job_spec = sge_job_spec
# we could use self.workers to could the workers self.protocol = protocol
# However, this variable works as async, hence we can't bootstrap self.log_directory = log_directory
# several cluster.scale at once
self.n_workers_sync = 0
# Hard-coding some scheduler info from the time being
self.protocol = "tcp://"
silence_logs = "error" silence_logs = "error"
dashboard_address = ":8787"
secutity = None secutity = None
interface = None interface = None
host = None host = None
...@@ -117,7 +248,7 @@ class SGEIdiapCluster(JobQueueCluster): ...@@ -117,7 +248,7 @@ class SGEIdiapCluster(JobQueueCluster):
self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)] self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)]
scheduler = { scheduler = {
"cls": Scheduler, # Use local scheduler for now "cls": SchedulerIdiap, # Use local scheduler for now
"options": { "options": {
"protocol": self.protocol, "protocol": self.protocol,
"interface": interface, "interface": interface,
...@@ -142,90 +273,204 @@ class SGEIdiapCluster(JobQueueCluster): ...@@ -142,90 +273,204 @@ class SGEIdiapCluster(JobQueueCluster):
name=name, name=name,
) )
def scale(self, n_jobs, queue=None, memory="4GB", io_big=True, resources=None): def _get_worker_spec_options(self, job_spec):
"""
Craft a dask worker_spec to be used in the qsub command
""" """
Launch an SGE job in the Idiap SGE cluster
def _get_key_from_spec(spec, key):
return spec[key] if key in spec else ""
Parameters new_resource_spec = _get_key_from_spec(job_spec, "resource_spec")
----------
n_jobs: int # IO_BIG
Number of jobs to be launched new_resource_spec += (
"io_big=TRUE," if "io_big" in job_spec and job_spec["io_big"] else ""
)
queue = _get_key_from_spec(job_spec, "queue")
if queue != "all.q":
new_resource_spec += f"{queue}=TRUE"
new_resource_spec = None if new_resource_spec == "" else new_resource_spec
return {
"queue": queue,
"memory": _get_key_from_spec(job_spec, "memory"),
"cores": 1,
"processes": 1,
"log_directory": self.log_directory,
"local_directory": self.log_directory,
"resource_spec": new_resource_spec,
"interface": None,
"protocol": self.protocol,
"security": None,
"resources": _get_key_from_spec(job_spec, "resources"),
"env_extra": self.env_extra,
}
queue: str def scale(self, n_jobs, sge_job_spec_key="default"):
Name of the SGE queue """
Launch an SGE job in the Idiap SGE cluster
io_big: bool Parameters
Use the io_big? Note that this is only true for q_1day, q1week, q_1day_mth, q_1week_mth ----------
resources: str n_jobs: int
Tag your workers with meaningful name (e.g GPU=1). In this way, it's possible to redirect certain tasks to certain workers. Quantity of jobs to scale
sge_job_spec_key: str
One of the specs :py:attr:`SGEIdiapCluster.sge_job_spec`
""" """
if n_jobs == 0: if n_jobs == 0:
# Shutting down all workers # Shutting down all workers
return super(JobQueueCluster, self).scale(0, memory=None, cores=0) return super(JobQueueCluster, self).scale(0, memory=None, cores=0)
resource_spec = f"{queue}=TRUE" # We have to set this at Idiap for some reason job_spec = self.sge_job_spec[sge_job_spec_key]
resource_spec += ",io_big=TRUE" if io_big else "" worker_spec_options = self._get_worker_spec_options(job_spec)
log_directory = "./logs"
n_cores = 1 n_cores = 1
worker_spec = { worker_spec = {"cls": self.job_cls, "options": worker_spec_options}
"cls": self.job_cls,
"options": {
"queue": queue,
"memory": memory,
"cores": n_cores,
"processes": n_cores,
"log_directory": log_directory,
"local_directory": log_directory,
"resource_spec": resource_spec,
"interface": None,
"protocol": self.protocol,
"security": None,
"resources": resources,
"env_extra": self.env_extra,
},
}
# Defining a new worker_spec with some SGE characteristics # Defining a new worker_spec with some SGE characteristics
self.new_spec = worker_spec self.new_spec = worker_spec
# Launching the jobs according to the new worker_spec return super(JobQueueCluster, self).scale(n_jobs, memory=None, cores=n_cores)
n_workers = self.n_workers_sync
self.n_workers_sync += n_jobs
return super(JobQueueCluster, self).scale(
n_workers + n_jobs, memory=None, cores=n_cores
)
def scale_up(self, n_jobs, sge_job_spec_key=None):
"""
Scale cluster up. This is supposed to be used by the scheduler while dynamically allocating resources
"""
return self.scale(n_jobs, sge_job_spec_key)
def sge_iobig_client( async def scale_down(self, workers, sge_job_spec_key=None):
n_jobs, """
queue="q_1day", Scale cluster down. This is supposed to be used by the scheduler while dynamically allocating resources
queue_resource_spec="q_1day=TRUE,io_big=TRUE", """
memory="8GB", await super().scale_down(workers)
sge_log="./logs",
): def adapt(self, *args, **kwargs):
super().adapt(*args, Adaptive=AdaptiveIdiap, **kwargs)
from dask_jobqueue import SGECluster
from dask.distributed import Client
env_extra = ["export PYTHONPATH=" + ":".join(sys.path)]
cluster = SGECluster( class AdaptiveIdiap(Adaptive):
queue=queue, """
memory=memory, Custom mechanism to adaptively allocate workers based on scheduler load
cores=1,
processes=1, This custom implementation extends the :py:meth:`Adaptive.recommendations` by looking
log_directory=sge_log, at the :py:meth:`distributed.scheduler.TaskState.resource_restrictions`.
local_directory=sge_log,
resource_spec=queue_resource_spec, The heristics is:
env_extra=env_extra,
) .. note ::
If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should
request a job matching those resource restrictions
"""
async def recommendations(self, target: int) -> dict:
"""
Make scale up/down recommendations based on current state and target
"""
plan = self.plan
requested = self.requested
observed = self.observed
# Get tasks with no worker associated due to
# resource restrictions
resource_restrictions = (
await self.scheduler.get_no_worker_tasks_resource_restrictions()
)
# If the amount of resources requested is bigger
# than what available and those jobs has restrictions
if target > len(plan):
self.close_counts.clear()
if len(resource_restrictions) > 0:
return {
"status": "up",
"n": target,
"sge_job_spec_key": list(resource_restrictions[0].keys())[0],
}
else:
return {"status": "up", "n": target}
# If the amount of resources requested is lower
# than what is available, is time to downscale
elif target < len(plan):
to_close = set()
# Get the worksers that can be closed.
if target < len(plan) - len(to_close):
L = await self.workers_to_close(target=target)
to_close.update(L)
firmly_close = set()
# COUNTING THE AMOUNT OF SCHEDULER CYCLES THAT WE SHOULD KEEP
# THIS WORKER BEFORE DESTROYING IT
for w in to_close:
self.close_counts[w] += 1
if self.close_counts[w] >= self.wait_count:
firmly_close.add(w)
for k in list(self.close_counts): # clear out unseen keys
if k in firmly_close or k not in to_close:
del self.close_counts[k]
# Send message to destroy workers
if firmly_close:
return {"status": "down", "workers": list(firmly_close)}
# If the amount of available workers is ok
# for the current demand, BUT
# there are tasks that need some special worker:
# SCALE EVERYTHING UP
if target == len(plan) and len(resource_restrictions) > 0:
return {
"status": "up",
"n": target + 1,
"sge_job_spec_key": list(resource_restrictions[0].keys())[0],
}
else:
return {"status": "same"}
async def scale_up(self, n, sge_job_spec_key="default"):
await self.cluster.scale(n, sge_job_spec_key=sge_job_spec_key)
async def scale_down(self, workers, sge_job_spec_key="default"):
await super().scale_down(workers)
class SchedulerIdiap(Scheduler):
"""
Idiap extended distributed scheduler
This scheduler extends :py:class:`Scheduler` by just adding a handler
that fetches, at every scheduler cycle, the resource restrictions of
a task that has status `no-worker`
"""
def __init__(self, *args, **kwargs):
super(SchedulerIdiap, self).__init__(*args, **kwargs)
self.handlers[
"get_no_worker_tasks_resource_restrictions"
] = self.get_no_worker_tasks_resource_restrictions
def get_no_worker_tasks_resource_restrictions(self, comm=None):
"""
Get the a task resource restrictions for jobs that has the status 'no-worker'
"""
cluster.scale_up(n_jobs) resource_restrictions = []
client = Client(cluster) # start local workers as threads for k in self.tasks:
if (
self.tasks[k].state == "no-worker"
and self.tasks[k].resource_restrictions is not None
):
resource_restrictions.append(self.tasks[k].resource_restrictions)
return client return resource_restrictions
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment