Skip to content
Snippets Groups Projects

[dask][sge] Multiqueue updates

Merged Tiago de Freitas Pereira requested to merge multiqueue into master
4 unresolved threads

In this merge request I:

  • Simplified the way multi-queue is set in our scripts
  • Updated our Dask documentation

Example

Setting the fit method to run on q_short_gpu

pipeline = mario.wrap(
    ["sample", "checkpoint", "dask"],
    pipeline,
    model_path=model_path,
    fit_tag="q_short_gpu",
)

You have to explicitly set the list of resource tags available.

pipeline.fit_transform(...).compute(
    scheduler=dask_client, resources=cluster.get_sge_resources()

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
267 267
268 268 self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=5, interval=120)
269 269
270 def get_sge_resources(self):
271 """
272 Get the available resources from `SGEMultipleQueuesCluster.sge_job_spec`.
273 This is useful when it's necessary to set the resource available for
274 `.compute` method.
275 Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information
276
277 Example
278 -------
279 >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
280 >>> client = Client(cluster) # doctest: +SKIP
281 >>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP
  • 408 407 """
    409 408
    410 409 def __init__(
    411 self,
    412 estimator,
    413 fit_tag=None,
    414 transform_tag=None,
    415 **kwargs,
    410 self, estimator, fit_tag=None, transform_tag=None, **kwargs,
  • 45
    46 37 # Creating X
    47 38 X = numpy.zeros((2, 2))
    48 39 # Wrapping X with Samples
    49 40 X_as_sample = [Sample(X, key=str(i), metadata=1) for i in range(10)]
    50 41
    51 42 # Building an arbitrary pipeline
    52 pipeline = make_pipeline(
    53 MyBoostedTransformer(
    54 transform_extra_arguments=(("metadata", "metadata"),),
    55 features_dir="./checkpoint_1",
    56 ),
    57 MyBoostedFitTransformer(
    58 features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
    59 ),
    43 model_path = "~/dask_tmp"
  • 54 transform_extra_arguments=(("metadata", "metadata"),),
    55 features_dir="./checkpoint_1",
    56 ),
    57 MyBoostedFitTransformer(
    58 features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
    59 ),
    43 model_path = "~/dask_tmp"
    44 os.makedirs(model_path, exist_ok=True)
    45 pipeline = make_pipeline(MyTransformer(), MyFitTranformer())
    46
    47 # Wrapping with sample, checkpoint and dask
    48 pipeline = bob.pipelines.wrap(
    49 ["sample", "checkpoint", "dask"],
    50 pipeline,
    51 model_path=model_path,
    52 transform_extra_arguments=(("metadata", "metadata"),),
  • added 1 commit

    • e1b35e5b - [sphinx] Fixed SGEMultipleQueuesCluster.get_sge_resources docs

    Compare with previous version

  • added 1 commit

    Compare with previous version

  • Is this good to go?

  • Amir MOHAMMADI mentioned in commit 99fa97c0

    mentioned in commit 99fa97c0

  • Thanks LGTM, I will test this locally more and get back to you.

  • There's a delay for the scheduler to ask for a specific resource. I couldn't figure out how to decrease this delay.

  • Please register or sign in to reply
    Loading