Commit 3aef698b authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Ran black and docformatter

parent e6330253
Pipeline #39311 passed with stage
in 10 minutes and 45 seconds
......@@ -21,15 +21,13 @@ from .sge_queues import QUEUE_DEFAULT
class SGEIdiapJob(Job):
"""
Launches a SGE Job in the IDIAP cluster.
This class basically encodes the CLI command that bootstrap the worker
in a SGE job. Check here `https://distributed.dask.org/en/latest/resources.html#worker-resources` for more information
"""Launches a SGE Job in the IDIAP cluster. This class basically encodes
the CLI command that bootstrap the worker in a SGE job. Check here
`https://distributed.dask.org/en/latest/resources.html#worker-resources`
for more information.
..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue.
The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch
"""
submit_command = "qsub"
......@@ -101,9 +99,7 @@ class SGEIdiapJob(Job):
def get_max_jobs(queue_dict):
"""
Given a queue list, get the max number of possible jobs
"""
"""Given a queue list, get the max number of possible jobs."""
return max(
[queue_dict[r]["max_jobs"] for r in queue_dict if "max_jobs" in queue_dict[r]]
......@@ -111,7 +107,8 @@ def get_max_jobs(queue_dict):
class SGEMultipleQueuesCluster(JobQueueCluster):
""" Launch Dask jobs in the SGE cluster allowing the request of multiple queus
"""Launch Dask jobs in the SGE cluster allowing the request of multiple
queus.
Parameters
----------
......@@ -199,8 +196,6 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
"""
def __init__(
......@@ -270,10 +265,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=60, interval=1000)
def _get_worker_spec_options(self, job_spec):
"""
Craft a dask worker_spec to be used in the qsub command
"""
"""Craft a dask worker_spec to be used in the qsub command."""
def _get_key_from_spec(spec, key):
return spec[key] if key in spec else ""
......@@ -307,8 +299,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
}
def scale(self, n_jobs, sge_job_spec_key="default"):
"""
Launch an SGE job in the Idiap SGE cluster
"""Launch an SGE job in the Idiap SGE cluster.
Parameters
----------
......@@ -335,14 +326,18 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
return super(JobQueueCluster, self).scale(n_jobs, memory=None, cores=n_cores)
def scale_up(self, n_jobs, sge_job_spec_key=None):
"""
Scale cluster up. This is supposed to be used by the scheduler while dynamically allocating resources
"""Scale cluster up.
This is supposed to be used by the scheduler while dynamically
allocating resources
"""
return self.scale(n_jobs, sge_job_spec_key)
async def scale_down(self, workers, sge_job_spec_key=None):
"""
Scale cluster down. This is supposed to be used by the scheduler while dynamically allocating resources
"""Scale cluster down.
This is supposed to be used by the scheduler while dynamically
allocating resources
"""
await super().scale_down(workers)
......@@ -351,8 +346,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
class AdaptiveMultipleQueue(Adaptive):
"""
Custom mechanism to adaptively allocate workers based on scheduler load
"""Custom mechanism to adaptively allocate workers based on scheduler load.
This custom implementation extends the `Adaptive.recommendations` by looking
at the `distributed.scheduler.TaskState.resource_restrictions`.
......@@ -362,13 +356,11 @@ class AdaptiveMultipleQueue(Adaptive):
.. note ::
If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should
request a job matching those resource restrictions
"""
async def recommendations(self, target: int) -> dict:
"""
Make scale up/down recommendations based on current state and target
"""
"""Make scale up/down recommendations based on current state and
target."""
plan = self.plan
requested = self.requested
......@@ -440,12 +432,11 @@ class AdaptiveMultipleQueue(Adaptive):
class SchedulerResourceRestriction(Scheduler):
"""
Idiap extended distributed scheduler
"""Idiap extended distributed scheduler.
This scheduler extends `Scheduler` by just adding a handler
that fetches, at every scheduler cycle, the resource restrictions of
a task that has status `no-worker`
This scheduler extends `Scheduler` by just adding a handler that
fetches, at every scheduler cycle, the resource restrictions of a
task that has status `no-worker`
"""
def __init__(self, *args, **kwargs):
......@@ -455,9 +446,8 @@ class SchedulerResourceRestriction(Scheduler):
] = self.get_no_worker_tasks_resource_restrictions
def get_no_worker_tasks_resource_restrictions(self, comm=None):
"""
Get the a task resource restrictions for jobs that has the status 'no-worker'
"""
"""Get the a task resource restrictions for jobs that has the status
'no-worker'."""
resource_restrictions = []
for k in self.tasks:
......
......@@ -3,10 +3,10 @@
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
"""
This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big
This HAS to be the default
"""This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big This HAS to be the
default
"""
QUEUE_DEFAULT = {
"default": {
......
"""Base definition of sample"""
"""Base definition of sample."""
from collections.abc import MutableSequence
def _copy_attributes(s, d):
"""Copies attributes from a dictionary to self
"""
"""Copies attributes from a dictionary to self."""
s.__dict__.update(
dict(
(k, v)
......@@ -25,7 +24,8 @@ class _ReprMixin:
class Sample(_ReprMixin):
"""Representation of sample. A Sample is a simple container that wraps a data-point (see :ref:`bob.pipelines.sample`)
"""Representation of sample. A Sample is a simple container that wraps a
data-point (see :ref:`bob.pipelines.sample`)
Each sample must have the following attributes:
......@@ -41,7 +41,6 @@ class Sample(_ReprMixin):
parent : object
A parent object from which to inherit all other attributes (except
``data``)
"""
def __init__(self, data, parent=None, **kwargs):
......@@ -52,7 +51,7 @@ class Sample(_ReprMixin):
class DelayedSample(_ReprMixin):
"""Representation of sample that can be loaded via a callable
"""Representation of sample that can be loaded via a callable.
The optional ``**kwargs`` argument allows you to attach more attributes to
this sample instance.
......@@ -72,7 +71,6 @@ class DelayedSample(_ReprMixin):
kwargs : dict
Further attributes of this sample, to be stored and eventually
transmitted to transformed versions of the sample
"""
def __init__(self, load, parent=None, **kwargs):
......@@ -84,7 +82,7 @@ class DelayedSample(_ReprMixin):
@property
def data(self):
"""Loads the data from the disk file"""
"""Loads the data from the disk file."""
if self._data is None:
self._data = self.load()
return self._data
......@@ -92,8 +90,8 @@ class DelayedSample(_ReprMixin):
class SampleSet(MutableSequence, _ReprMixin):
"""A set of samples with extra attributes
https://docs.python.org/3/library/collections.abc.html#collections-abstract-base-classes
"""
https://docs.python.org/3/library/collections.abc.html#collections-
abstract-base-classes."""
def __init__(self, samples, parent=None, **kwargs):
self.samples = samples
......
......@@ -18,7 +18,8 @@ def _offset_add_func(X, offset=1):
class DummyWithFit(TransformerMixin, BaseEstimator):
"""See https://scikit-learn.org/stable/developers/develop.html and
https://github.com/scikit-learn-contrib/project-template/blob/master/skltemplate/_template.py"""
https://github.com/scikit-learn-contrib/project-
template/blob/master/skltemplate/_template.py."""
def fit(self, X, y=None):
X = check_array(X)
......@@ -45,7 +46,8 @@ class DummyWithFit(TransformerMixin, BaseEstimator):
class DummyTransformer(TransformerMixin, BaseEstimator):
"""See https://scikit-learn.org/stable/developers/develop.html and
https://github.com/scikit-learn-contrib/project-template/blob/master/skltemplate/_template.py"""
https://github.com/scikit-learn-contrib/project-
template/blob/master/skltemplate/_template.py."""
def __init__(self, picklable=True, i=None, **kwargs):
super().__init__(**kwargs)
......
......@@ -4,17 +4,16 @@ from ..wrappers import wrap
def SampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer
(https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)
work with :any:`Sample`-based pipelines.
"""
"""Class that transforms Scikit learn FunctionTransformer (https://scikit-l
earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer
.html) work with :any:`Sample`-based pipelines."""
return wrap([FunctionTransformer, "sample"], **kwargs)
def CheckpointSampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer
(https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)
work with :any:`Sample`-based pipelines.
"""Class that transforms Scikit learn FunctionTransformer (https://scikit-l
earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer
.html) work with :any:`Sample`-based pipelines.
Furthermore, it makes it checkpointable
"""
......
......@@ -9,8 +9,8 @@ def linearize(X):
class Linearize(FunctionTransformer):
"""Extracts features by simply concatenating all elements of the data into one long vector.
"""
"""Extracts features by simply concatenating all elements of the data into
one long vector."""
def __init__(self, **kwargs):
super().__init__(func=linearize, **kwargs)
......
......@@ -3,14 +3,11 @@ from ..wrappers import wrap
def SamplePCA(**kwargs):
"""
Enables SAMPLE handling for :any:`sklearn.decomposition.PCA`
"""
"""Enables SAMPLE handling for :any:`sklearn.decomposition.PCA`"""
return wrap([PCA, "sample"], **kwargs)
def CheckpointSamplePCA(**kwargs):
"""
Enables SAMPLE and CHECKPOINTIN handling for :any:`sklearn.decomposition.PCA`
"""
"""Enables SAMPLE and CHECKPOINTIN handling for
:any:`sklearn.decomposition.PCA`"""
return wrap([PCA, "sample", "checkpoint"], **kwargs)
......@@ -3,11 +3,8 @@ import nose
import numpy as np
def is_picklable(obj):
"""
Test if an object is picklable or not
"""
"""Test if an object is picklable or not."""
try:
pickle.dumps(obj)
except TypeError:
......@@ -19,9 +16,7 @@ def is_picklable(obj):
def assert_picklable(obj):
"""
Test if an object is picklable or not
"""
"""Test if an object is picklable or not."""
string = pickle.dumps(obj)
new_obj = pickle.loads(string)
......@@ -50,8 +45,8 @@ def is_estimator_stateless(estimator):
def _generate_features(reader, paths, same_size=False):
"""Load and stack features in a memory efficient way. This function is meant
to be used inside :py:func:`vstack_features`.
"""Load and stack features in a memory efficient way. This function is
meant to be used inside :py:func:`vstack_features`.
Parameters
----------
......
"""Scikit-learn Estimator Wrappers"""
"""Scikit-learn Estimator Wrappers."""
from .sample import DelayedSample, SampleSet
from .utils import is_estimator_stateless, samples_to_np_array
import dask.bag
......@@ -28,7 +28,7 @@ def _frmt(estimator, limit=40):
class BaseWrapper(BaseEstimator):
"""The base class for all wrappers"""
"""The base class for all wrappers."""
def _more_tags(self):
return self.estimator._more_tags()
......@@ -66,7 +66,8 @@ class DelayedSamplesCall:
class SampleWrapper(BaseWrapper, TransformerMixin):
"""Wraps scikit-learn estimators to work with :any:`Sample`-based pipelines.
"""Wraps scikit-learn estimators to work with :any:`Sample`-based
pipelines.
Do not use this class except for scikit-learn estimators.
......@@ -164,7 +165,8 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
class CheckpointWrapper(BaseWrapper, TransformerMixin):
"""Wraps :any:`Sample`-based estimators so the results are saved in disk."""
"""Wraps :any:`Sample`-based estimators so the results are saved in
disk."""
def __init__(
self,
......@@ -303,7 +305,7 @@ class CheckpointWrapper(BaseWrapper, TransformerMixin):
class DaskWrapper(BaseWrapper, TransformerMixin):
"""Wraps Scikit estimators to handle Dask Bags as input
"""Wraps Scikit estimators to handle Dask Bags as input.
Parameters
----------
......@@ -319,7 +321,6 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
a future delayed(self.transform).compute(resources=resource_tape) so
dask scheduler can place this task in a particular resource
(e.g GPU)
"""
def __init__(
......@@ -414,7 +415,6 @@ class ToDaskBag(TransformerMixin, BaseEstimator):
def wrap(bases, estimator=None, **kwargs):
"""Wraps several estimators inside each other.
Parameters
----------
bases : list
......@@ -480,9 +480,7 @@ def wrap(bases, estimator=None, **kwargs):
def dask_tags(estimator):
"""
Recursively collects resource_tags in dasked estimators
"""
"""Recursively collects resource_tags in dasked estimators."""
tags = {}
if hasattr(estimator, "estimator"):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment