Renamed SGEIdiapCluster to SGEMultipleQueuesCluster

parent c8416be2
Pipeline #39147 canceled with stage
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from dask.distributed import Client
cluster = SGEMultipleQueuesCluster()
dask_client = Client(cluster)
from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
Q_1DAY_IO_BIG_SPEC = {
"default": {
"queue": "q_1day",
"memory": "4GB",
"io_big": True,
"resource_spec": "",
"resources": "",
}
}
n_jobs = 16
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC)
cluster.scale(1)
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
# Here the goal is to wait 2 minutes before scaling down since
# it is very expensive to get jobs on the SGE grid
cluster.adapt(minimum=1, maximum=n_jobs, wait_count=120, interval=1000)
dask_client = Client(cluster)
from bob.pipelines.distributed.sge import SGEIdiapCluster, Q_1DAY_GPU_SPEC
from dask.distributed import Client
n_jobs = 16
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
cluster.scale(1, sge_job_spec_key="gpu")
cluster.scale(2, sge_job_spec_key="default")
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
# Here the goal is to wait 2 minutes before scaling down since
# it is very expensive to get jobs on the SGE grid
cluster.adapt(minimum=1, maximum=n_jobs, wait_count=120, interval=1000)
dask_client = Client(cluster)
from bob.pipelines.distributed.sge import SGEIdiapCluster, Q_1DAY_IO_BIG_SPEC
from dask.distributed import Client
n_jobs = 48
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC)
cluster.scale(10)
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
# Here the goal is to wait 2 minutes before scaling down since
# it is very expensive to get jobs on the SGE grid
cluster.adapt(minimum=10, maximum=n_jobs, wait_count=120, interval=1000)
dask_client = Client(cluster)
from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
from bob.pipelines.distributed.sge_queues import QUEUE_LIGHT
from dask.distributed import Client
cluster = SGEMultipleQueuesCluster(sge_job_spec=QUEUE_LIGHT)
dask_client = Client(cluster)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
def debug_client(n_nodes):
"""
This executor will run everything in **ONE SINGLE PROCESS**
This will return an instance of Dask Distributed Client
https://distributed.dask.org/en/latest/client.html
**Parameters**
n_nodes:
Number of process
"""
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
nanny=False, processes=False, n_workers=1, threads_per_worker=1
)
cluster.scale_up(n_nodes)
client = Client(cluster) # start local workers as threads
return client
......@@ -17,6 +17,8 @@ import dask
from distributed.scheduler import Scheduler
from distributed.deploy import Adaptive
from .sge_queues import QUEUE_DEFAULT
class SGEIdiapJob(Job):
"""
......@@ -98,46 +100,18 @@ class SGEIdiapJob(Job):
logger.debug("Job script: \n %s" % self.job_script())
Q_ALL_SPEC = {
"default": {
"queue": "all.q",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"resources": "",
}
}
Q_1DAY_IO_BIG_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
}
}
Q_1DAY_GPU_SPEC = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": "",
},
"gpu": {
"queue": "q_gpu",
"memory": "12GB",
"io_big": False,
"resource_spec": "",
"resources": {"gpu":1},
},
}
class SGEIdiapCluster(JobQueueCluster):
""" Launch Dask jobs in the IDIAP SGE cluster
def get_max_jobs(queue_dict):
"""
Given a queue list, get the max number of possible jobs
"""
return max(
[queue_dict[r]["max_jobs"] for r in queue_dict if "max_jobs" in queue_dict[r]]
)
class SGEMultipleQueuesCluster(JobQueueCluster):
""" Launch Dask jobs in the SGE cluster allowing the request of multiple queus
Parameters
----------
......@@ -162,6 +136,10 @@ class SGEIdiapCluster(JobQueueCluster):
io_bio: set the io_big flag
resource_spec: Whatever extra argument to be sent to qsub (qsub -l)
tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html)
max_jobs: Maximum number of jobs in the queue
min_jobs: int
Lower bound for the number of jobs for `self.adapt`
Example
......@@ -231,7 +209,8 @@ class SGEIdiapCluster(JobQueueCluster):
protocol="tcp://",
dashboard_address=":8787",
env_extra=None,
sge_job_spec=Q_ALL_SPEC,
sge_job_spec=QUEUE_DEFAULT,
min_jobs=10,
**kwargs,
):
......@@ -255,7 +234,7 @@ class SGEIdiapCluster(JobQueueCluster):
self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)]
scheduler = {
"cls": SchedulerIdiap, # Use local scheduler for now
"cls": SchedulerResourceRestriction, # Use local scheduler for now
"options": {
"protocol": self.protocol,
"interface": interface,
......@@ -280,6 +259,16 @@ class SGEIdiapCluster(JobQueueCluster):
name=name,
)
max_jobs = get_max_jobs(sge_job_spec)
self.scale(min_jobs)
# Adapting to minimim 1 job to maximum 48 jobs
# interval: Milliseconds between checks from the scheduler
# wait_count: Number of consecutive times that a worker should be suggested for
# removal before we remove it.
# Here the goal is to wait 2 minutes before scaling down since
# it is very expensive to get jobs on the SGE grid
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=60, interval=1000)
def _get_worker_spec_options(self, job_spec):
"""
Craft a dask worker_spec to be used in the qsub command
......@@ -358,11 +347,10 @@ class SGEIdiapCluster(JobQueueCluster):
await super().scale_down(workers)
def adapt(self, *args, **kwargs):
super().adapt(*args, Adaptive=AdaptiveIdiap, **kwargs)
super().adapt(*args, Adaptive=AdaptiveMultipleQueue, **kwargs)
class AdaptiveIdiap(Adaptive):
class AdaptiveMultipleQueue(Adaptive):
"""
Custom mechanism to adaptively allocate workers based on scheduler load
......@@ -451,8 +439,7 @@ class AdaptiveIdiap(Adaptive):
await super().scale_down(workers)
class SchedulerIdiap(Scheduler):
class SchedulerResourceRestriction(Scheduler):
"""
Idiap extended distributed scheduler
......@@ -462,7 +449,7 @@ class SchedulerIdiap(Scheduler):
"""
def __init__(self, *args, **kwargs):
super(SchedulerIdiap, self).__init__(*args, **kwargs)
super(SchedulerResourceRestriction, self).__init__(*args, **kwargs)
self.handlers[
"get_no_worker_tasks_resource_restrictions"
] = self.get_no_worker_tasks_resource_restrictions
......@@ -480,4 +467,4 @@ class SchedulerIdiap(Scheduler):
):
resource_restrictions.append(self.tasks[k].resource_restrictions)
return resource_restrictions
return resource_restrictions
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
"""
This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big
This HAS to be the default
"""
QUEUE_DEFAULT = {
"default": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"max_jobs": 48,
"resources": "",
},
"q_1week": {
"queue": "q_1week",
"memory": "4GB",
"io_big": True,
"resource_spec": "",
"max_jobs": 24,
"resources": {"q_1week": 1},
},
"q_short_gpu": {
"queue": "q_short_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
},
}
"""
This queue setup has a light arrangement.
For CPU jobs, it prioritizes all.q and not io_big
"""
QUEUE_LIGHT = {
"default": {
"queue": "all.q",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 96,
"resources": "",
},
"q_1day": {
"queue": "q_1day",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 48,
"resources": {"q_1day": 1},
},
"q_1week": {
"queue": "q_1week",
"memory": "4GB",
"io_big": True,
"resource_spec": "",
"resources": {"q_1week": 1},
},
"q_short_gpu": {
"queue": "q_short_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"max_jobs": 45,
"resources": {"q_short_gpu": 1},
},
"q_gpu": {
"queue": "q_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_gpu": 1},
},
"q_long_gpu": {
"queue": "q_long_gpu",
"memory": "30GB",
"io_big": False,
"resource_spec": "",
"resources": {"q_long_gpu": 1},
},
}
......@@ -76,10 +76,9 @@ The snippet below shows how to deploy the exact same pipeline from the previous
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster() # Creates the SGE launcher that launches jobs in the q_all
>>> cluster.scale(10) # Submite 10 jobs in q_all
>>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day
>>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system
>>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE
......@@ -92,7 +91,7 @@ Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for S
2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues
......@@ -114,6 +113,7 @@ SGE queue specs are defined in python dictionary as in the example below, where,
... "memory": "8GB",
... "io_big": True,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": "",
... },
... "gpu": {
......@@ -121,6 +121,7 @@ SGE queue specs are defined in python dictionary as in the example below, where,
... "memory": "12GB",
... "io_big": False,
... "resource_spec": "",
... "max_jobs": 48,
... "resources": {"GPU":1},
... },
... }
......@@ -130,11 +131,9 @@ Now that the queue specifications are set, let's trigger some jobs.
.. code:: python
>>> from bob.pipelines.distributed.sge import SGEIdiapCluster
>>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster
>>> from dask.distributed import Client
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue
>>> cluster.scale(10) # Submitting 9 jobs in the q_1day queue
>>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC)
>>> client = Client(cluster) # Creating the scheduler
.. note::
......@@ -167,7 +166,7 @@ Every time` cluster.scale` is executed to increase the amount of available SGE j
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :py:class`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
......
......@@ -19,7 +19,6 @@ Idiap SGE Support
-----------------
.. automodule:: bob.pipelines.distributed.sge
.. automodule:: bob.pipelines.distributed.local
Transformers
------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment