diff --git a/bob/pipelines/config/distributed/sge_default.py b/bob/pipelines/config/distributed/sge_default.py new file mode 100644 index 0000000000000000000000000000000000000000..e1f287121dbb68baee07b14bb79b0461ca3b2121 --- /dev/null +++ b/bob/pipelines/config/distributed/sge_default.py @@ -0,0 +1,5 @@ +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster +from dask.distributed import Client + +cluster = SGEMultipleQueuesCluster() +dask_client = Client(cluster) diff --git a/bob/pipelines/config/distributed/sge_iobig_16cores.py b/bob/pipelines/config/distributed/sge_iobig_16cores.py deleted file mode 100644 index 99ab839df816266ee60289458e00aed3b0889ea1..0000000000000000000000000000000000000000 --- a/bob/pipelines/config/distributed/sge_iobig_16cores.py +++ /dev/null @@ -1,27 +0,0 @@ -from bob.pipelines.distributed.sge import SGEIdiapCluster -from dask.distributed import Client - - -Q_1DAY_IO_BIG_SPEC = { - "default": { - "queue": "q_1day", - "memory": "4GB", - "io_big": True, - "resource_spec": "", - "resources": "", - } -} - -n_jobs = 16 -cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) -cluster.scale(1) -# Adapting to minimim 1 job to maximum 48 jobs -# interval: Milliseconds between checks from the scheduler -# wait_count: Number of consecutive times that a worker should be suggested for -# removal before we remove it. -# Here the goal is to wait 2 minutes before scaling down since -# it is very expensive to get jobs on the SGE grid -cluster.adapt(minimum=1, maximum=n_jobs, wait_count=120, interval=1000) - - -dask_client = Client(cluster) diff --git a/bob/pipelines/config/distributed/sge_iobig_16cores_1gpu.py b/bob/pipelines/config/distributed/sge_iobig_16cores_1gpu.py deleted file mode 100644 index b2082919c9247886b33fed17f3fadfe30d497e85..0000000000000000000000000000000000000000 --- a/bob/pipelines/config/distributed/sge_iobig_16cores_1gpu.py +++ /dev/null @@ -1,16 +0,0 @@ -from bob.pipelines.distributed.sge import SGEIdiapCluster, Q_1DAY_GPU_SPEC -from dask.distributed import Client - -n_jobs = 16 -cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) -cluster.scale(1, sge_job_spec_key="gpu") -cluster.scale(2, sge_job_spec_key="default") -# Adapting to minimim 1 job to maximum 48 jobs -# interval: Milliseconds between checks from the scheduler -# wait_count: Number of consecutive times that a worker should be suggested for -# removal before we remove it. -# Here the goal is to wait 2 minutes before scaling down since -# it is very expensive to get jobs on the SGE grid -cluster.adapt(minimum=1, maximum=n_jobs, wait_count=120, interval=1000) - -dask_client = Client(cluster) diff --git a/bob/pipelines/config/distributed/sge_iobig_48cores.py b/bob/pipelines/config/distributed/sge_iobig_48cores.py deleted file mode 100644 index 9959ac5a1acf9299697b4f823379229915937db1..0000000000000000000000000000000000000000 --- a/bob/pipelines/config/distributed/sge_iobig_48cores.py +++ /dev/null @@ -1,17 +0,0 @@ -from bob.pipelines.distributed.sge import SGEIdiapCluster, Q_1DAY_IO_BIG_SPEC -from dask.distributed import Client - -n_jobs = 48 -cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) -cluster.scale(10) - -# Adapting to minimim 1 job to maximum 48 jobs -# interval: Milliseconds between checks from the scheduler -# wait_count: Number of consecutive times that a worker should be suggested for -# removal before we remove it. -# Here the goal is to wait 2 minutes before scaling down since -# it is very expensive to get jobs on the SGE grid -cluster.adapt(minimum=10, maximum=n_jobs, wait_count=120, interval=1000) - - -dask_client = Client(cluster) diff --git a/bob/pipelines/config/distributed/sge_light.py b/bob/pipelines/config/distributed/sge_light.py new file mode 100644 index 0000000000000000000000000000000000000000..7efd3014acab06c1adb21f03619eb5f060d50738 --- /dev/null +++ b/bob/pipelines/config/distributed/sge_light.py @@ -0,0 +1,6 @@ +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster +from bob.pipelines.distributed.sge_queues import QUEUE_LIGHT +from dask.distributed import Client + +cluster = SGEMultipleQueuesCluster(sge_job_spec=QUEUE_LIGHT) +dask_client = Client(cluster) diff --git a/bob/pipelines/distributed/local.py b/bob/pipelines/distributed/local.py deleted file mode 100644 index 7e0922143d656e1faccb55d637b331015c55fdbd..0000000000000000000000000000000000000000 --- a/bob/pipelines/distributed/local.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : -# Tiago de Freitas Pereira <tiago.pereira@idiap.ch> - - -def debug_client(n_nodes): - """ - This executor will run everything in **ONE SINGLE PROCESS** - - This will return an instance of Dask Distributed Client - - https://distributed.dask.org/en/latest/client.html - - **Parameters** - - n_nodes: - Number of process - """ - - from dask.distributed import Client, LocalCluster - - cluster = LocalCluster( - nanny=False, processes=False, n_workers=1, threads_per_worker=1 - ) - cluster.scale_up(n_nodes) - client = Client(cluster) # start local workers as threads - - return client diff --git a/bob/pipelines/distributed/sge.py b/bob/pipelines/distributed/sge.py index 1e3eb89bcade7351fca3d6d95789db9d719c70b1..8277bd9032f78e251e5ada6cd1daad10b7efb97a 100644 --- a/bob/pipelines/distributed/sge.py +++ b/bob/pipelines/distributed/sge.py @@ -17,6 +17,8 @@ import dask from distributed.scheduler import Scheduler from distributed.deploy import Adaptive +from .sge_queues import QUEUE_DEFAULT + class SGEIdiapJob(Job): """ @@ -98,46 +100,18 @@ class SGEIdiapJob(Job): logger.debug("Job script: \n %s" % self.job_script()) -Q_ALL_SPEC = { - "default": { - "queue": "all.q", - "memory": "4GB", - "io_big": False, - "resource_spec": "", - "resources": "", - } -} - -Q_1DAY_IO_BIG_SPEC = { - "default": { - "queue": "q_1day", - "memory": "8GB", - "io_big": True, - "resource_spec": "", - "resources": "", - } -} - -Q_1DAY_GPU_SPEC = { - "default": { - "queue": "q_1day", - "memory": "8GB", - "io_big": True, - "resource_spec": "", - "resources": "", - }, - "gpu": { - "queue": "q_gpu", - "memory": "12GB", - "io_big": False, - "resource_spec": "", - "resources": {"gpu":1}, - }, -} - - -class SGEIdiapCluster(JobQueueCluster): - """ Launch Dask jobs in the IDIAP SGE cluster +def get_max_jobs(queue_dict): + """ + Given a queue list, get the max number of possible jobs + """ + + return max( + [queue_dict[r]["max_jobs"] for r in queue_dict if "max_jobs" in queue_dict[r]] + ) + + +class SGEMultipleQueuesCluster(JobQueueCluster): + """ Launch Dask jobs in the SGE cluster allowing the request of multiple queus Parameters ---------- @@ -162,6 +136,10 @@ class SGEIdiapCluster(JobQueueCluster): io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) + max_jobs: Maximum number of jobs in the queue + + min_jobs: int + Lower bound for the number of jobs for `self.adapt` Example @@ -231,7 +209,8 @@ class SGEIdiapCluster(JobQueueCluster): protocol="tcp://", dashboard_address=":8787", env_extra=None, - sge_job_spec=Q_ALL_SPEC, + sge_job_spec=QUEUE_DEFAULT, + min_jobs=10, **kwargs, ): @@ -255,7 +234,7 @@ class SGEIdiapCluster(JobQueueCluster): self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)] scheduler = { - "cls": SchedulerIdiap, # Use local scheduler for now + "cls": SchedulerResourceRestriction, # Use local scheduler for now "options": { "protocol": self.protocol, "interface": interface, @@ -280,6 +259,16 @@ class SGEIdiapCluster(JobQueueCluster): name=name, ) + max_jobs = get_max_jobs(sge_job_spec) + self.scale(min_jobs) + # Adapting to minimim 1 job to maximum 48 jobs + # interval: Milliseconds between checks from the scheduler + # wait_count: Number of consecutive times that a worker should be suggested for + # removal before we remove it. + # Here the goal is to wait 2 minutes before scaling down since + # it is very expensive to get jobs on the SGE grid + self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=60, interval=1000) + def _get_worker_spec_options(self, job_spec): """ Craft a dask worker_spec to be used in the qsub command @@ -358,11 +347,10 @@ class SGEIdiapCluster(JobQueueCluster): await super().scale_down(workers) def adapt(self, *args, **kwargs): - super().adapt(*args, Adaptive=AdaptiveIdiap, **kwargs) + super().adapt(*args, Adaptive=AdaptiveMultipleQueue, **kwargs) - -class AdaptiveIdiap(Adaptive): +class AdaptiveMultipleQueue(Adaptive): """ Custom mechanism to adaptively allocate workers based on scheduler load @@ -451,8 +439,7 @@ class AdaptiveIdiap(Adaptive): await super().scale_down(workers) - -class SchedulerIdiap(Scheduler): +class SchedulerResourceRestriction(Scheduler): """ Idiap extended distributed scheduler @@ -462,7 +449,7 @@ class SchedulerIdiap(Scheduler): """ def __init__(self, *args, **kwargs): - super(SchedulerIdiap, self).__init__(*args, **kwargs) + super(SchedulerResourceRestriction, self).__init__(*args, **kwargs) self.handlers[ "get_no_worker_tasks_resource_restrictions" ] = self.get_no_worker_tasks_resource_restrictions @@ -480,4 +467,4 @@ class SchedulerIdiap(Scheduler): ): resource_restrictions.append(self.tasks[k].resource_restrictions) - return resource_restrictions + return resource_restrictions diff --git a/bob/pipelines/distributed/sge_queues.py b/bob/pipelines/distributed/sge_queues.py new file mode 100644 index 0000000000000000000000000000000000000000..1ff2a505a7e9194a41ac740f9daa5d6a6866add6 --- /dev/null +++ b/bob/pipelines/distributed/sge_queues.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : +# Tiago de Freitas Pereira <tiago.pereira@idiap.ch> + + +""" +This queue setup has a DEMANDING arrangement. +For CPU jobs, it prioritizes q_1day and io_big +This HAS to be the default +""" +QUEUE_DEFAULT = { + "default": { + "queue": "q_1day", + "memory": "8GB", + "io_big": True, + "resource_spec": "", + "max_jobs": 48, + "resources": "", + }, + "q_1week": { + "queue": "q_1week", + "memory": "4GB", + "io_big": True, + "resource_spec": "", + "max_jobs": 24, + "resources": {"q_1week": 1}, + }, + "q_short_gpu": { + "queue": "q_short_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "max_jobs": 45, + "resources": {"q_short_gpu": 1}, + }, + "q_gpu": { + "queue": "q_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_gpu": 1}, + }, + "q_long_gpu": { + "queue": "q_long_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_long_gpu": 1}, + }, +} + + +""" +This queue setup has a light arrangement. +For CPU jobs, it prioritizes all.q and not io_big +""" +QUEUE_LIGHT = { + "default": { + "queue": "all.q", + "memory": "4GB", + "io_big": False, + "resource_spec": "", + "max_jobs": 96, + "resources": "", + }, + "q_1day": { + "queue": "q_1day", + "memory": "4GB", + "io_big": False, + "resource_spec": "", + "max_jobs": 48, + "resources": {"q_1day": 1}, + }, + "q_1week": { + "queue": "q_1week", + "memory": "4GB", + "io_big": True, + "resource_spec": "", + "resources": {"q_1week": 1}, + }, + "q_short_gpu": { + "queue": "q_short_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "max_jobs": 45, + "resources": {"q_short_gpu": 1}, + }, + "q_gpu": { + "queue": "q_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_gpu": 1}, + }, + "q_long_gpu": { + "queue": "q_long_gpu", + "memory": "30GB", + "io_big": False, + "resource_spec": "", + "resources": {"q_long_gpu": 1}, + }, +} diff --git a/doc/dask.rst b/doc/dask.rst index 8b0b89b2deb578741ed6e16974525c605531b65d..1034c4c46b9860aa39bb119914f273819b9d66aa 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -76,10 +76,9 @@ The snippet below shows how to deploy the exact same pipeline from the previous .. code:: python - >>> from bob.pipelines.distributed.sge import SGEIdiapCluster + >>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster >>> from dask.distributed import Client - >>> cluster = SGEIdiapCluster() # Creates the SGE launcher that launches jobs in the q_all - >>> cluster.scale(10) # Submite 10 jobs in q_all + >>> cluster = SGEMultipleQueuesCluster() # Creates the SGE launcher that launches jobs in the q_1day >>> client = Client(cluster) # Creates the scheduler and attaching it to the SGE job queue system >>> dask_pipeline.fit_transform(....).compute(scheduler=client) # Runs my graph in the Idiap SGE @@ -92,7 +91,7 @@ Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for S 2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues. -For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job. +For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job. Launching jobs in different SGE queues @@ -114,6 +113,7 @@ SGE queue specs are defined in python dictionary as in the example below, where, ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", + ... "max_jobs": 48, ... "resources": "", ... }, ... "gpu": { @@ -121,6 +121,7 @@ SGE queue specs are defined in python dictionary as in the example below, where, ... "memory": "12GB", ... "io_big": False, ... "resource_spec": "", + ... "max_jobs": 48, ... "resources": {"GPU":1}, ... }, ... } @@ -130,11 +131,9 @@ Now that the queue specifications are set, let's trigger some jobs. .. code:: python - >>> from bob.pipelines.distributed.sge import SGEIdiapCluster + >>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster >>> from dask.distributed import Client - >>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) - >>> cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue - >>> cluster.scale(10) # Submitting 9 jobs in the q_1day queue + >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) >>> client = Client(cluster) # Creating the scheduler .. note:: @@ -167,7 +166,7 @@ Every time` cluster.scale` is executed to increase the amount of available SGE j Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue. During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example). -For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEIdiapCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`. +For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`. Its usage is pretty simple. The code below determines that to run a :doc:`dask graph <graphs>`, the :py:class`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs. diff --git a/doc/py_api.rst b/doc/py_api.rst index 6156eaa9c9fd073c7c048885a2ac462ec97bbc44..59ff58f64aeeb7bbaac6299b5fcb4d01d46933e2 100644 --- a/doc/py_api.rst +++ b/doc/py_api.rst @@ -19,7 +19,6 @@ Idiap SGE Support ----------------- .. automodule:: bob.pipelines.distributed.sge -.. automodule:: bob.pipelines.distributed.local Transformers ------------ diff --git a/doc/python/pipeline_example_dask_sge.py b/doc/python/pipeline_example_dask_sge.py index 5497ffd4aeb9b1cb1cf4fbfa24e4cce4d8c7b2bb..1113694e5083ffae8192b23fbd2823590db8101b 100644 --- a/doc/python/pipeline_example_dask_sge.py +++ b/doc/python/pipeline_example_dask_sge.py @@ -75,11 +75,11 @@ Q_1DAY_GPU_SPEC = { }, } -from bob.pipelines.distributed.sge import SGEIdiapCluster +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster from dask.distributed import Client -cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) -cluster.scale(1, sge_job_spec_key="gpu") # Submitting 1 job in the q_gpu queue +cluster = SGEMultipleQueuesCluster() +cluster.scale(1, sge_job_spec_key="q_gpu") # Submitting 1 job in the q_gpu queue cluster.scale(10) # Submitting 9 jobs in the q_1day queue client = Client(cluster) # Creating the scheduler diff --git a/doc/python/pipeline_example_dask_sge_adaptive.py b/doc/python/pipeline_example_dask_sge_adaptive.py index 594f5d8f5285d898cf940d35ac8d92072c075ac8..c0bcb87cdd4295b0cc5bd34d4cc51eff3a027042 100644 --- a/doc/python/pipeline_example_dask_sge_adaptive.py +++ b/doc/python/pipeline_example_dask_sge_adaptive.py @@ -75,10 +75,10 @@ Q_1DAY_GPU_SPEC = { }, } -from bob.pipelines.distributed.sge import SGEIdiapCluster +from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster from dask.distributed import Client -cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) +cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) cluster.scale(1) # Submitting 1 job in the q_gpu queue cluster.adapt(minimum=1, maximum=10) client = Client(cluster) # Creating the scheduler