Commit dd0b6df2 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Merge branch 'refactor' into 'master'

Refactor Mixins to Aggregators

Closes #11

See merge request !26
parents c917ba39 3aef698b
Pipeline #39312 passed with stages
in 10 minutes and 54 seconds
from . import utils
from .sample import Sample, DelayedSample, SampleSet
from .wrappers import (
BaseWrapper,
DelayedSamplesCall,
SampleWrapper,
CheckpointWrapper,
DaskWrapper,
ToDaskBag,
wrap,
dask_tags,
)
from . import distributed
from . import transformers
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is
shortened.
Parameters
----------
*args
The objects that you want sphinx to beleive that are defined here.
Resolves `Sphinx referencing issues <https//github.com/sphinx-
doc/sphinx/issues/3048>`
"""
for obj in args:
obj.__module__ = __name__
__appropriate__(
Sample,
DelayedSample,
SampleSet,
BaseWrapper,
DelayedSamplesCall,
SampleWrapper,
CheckpointWrapper,
DaskWrapper,
ToDaskBag,
)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
__all__ = [_ for _ in dir() if not _.startswith("_")]
import bob.pipelines as mario
pipeline = mario.wrap(["dask"], pipeline)
from dask.distributed import Client, LocalCluster
from multiprocessing import cpu_count
n_nodes = 4
n_nodes = cpu_count()
threads_per_worker = 1
cluster = LocalCluster(
......
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
\ No newline at end of file
__path__ = extend_path(__path__, __name__)
......@@ -21,15 +21,13 @@ from .sge_queues import QUEUE_DEFAULT
class SGEIdiapJob(Job):
"""
Launches a SGE Job in the IDIAP cluster.
This class basically encodes the CLI command that bootstrap the worker
in a SGE job. Check here `https://distributed.dask.org/en/latest/resources.html#worker-resources` for more information
"""Launches a SGE Job in the IDIAP cluster. This class basically encodes
the CLI command that bootstrap the worker in a SGE job. Check here
`https://distributed.dask.org/en/latest/resources.html#worker-resources`
for more information.
..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue.
The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch
"""
submit_command = "qsub"
......@@ -101,9 +99,7 @@ class SGEIdiapJob(Job):
def get_max_jobs(queue_dict):
"""
Given a queue list, get the max number of possible jobs
"""
"""Given a queue list, get the max number of possible jobs."""
return max(
[queue_dict[r]["max_jobs"] for r in queue_dict if "max_jobs" in queue_dict[r]]
......@@ -111,7 +107,8 @@ def get_max_jobs(queue_dict):
class SGEMultipleQueuesCluster(JobQueueCluster):
""" Launch Dask jobs in the SGE cluster allowing the request of multiple queus
"""Launch Dask jobs in the SGE cluster allowing the request of multiple
queus.
Parameters
----------
......@@ -199,8 +196,6 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
>>> cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP
>>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP
>>> client = Client(cluster) # doctest: +SKIP
"""
def __init__(
......@@ -270,10 +265,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
self.adapt(minimum=min_jobs, maximum=max_jobs, wait_count=60, interval=1000)
def _get_worker_spec_options(self, job_spec):
"""
Craft a dask worker_spec to be used in the qsub command
"""
"""Craft a dask worker_spec to be used in the qsub command."""
def _get_key_from_spec(spec, key):
return spec[key] if key in spec else ""
......@@ -307,8 +299,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
}
def scale(self, n_jobs, sge_job_spec_key="default"):
"""
Launch an SGE job in the Idiap SGE cluster
"""Launch an SGE job in the Idiap SGE cluster.
Parameters
----------
......@@ -317,7 +308,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
Quantity of jobs to scale
sge_job_spec_key: str
One of the specs `SGEIdiapCluster.sge_job_spec`
One of the specs `SGEIdiapCluster.sge_job_spec`
"""
if n_jobs == 0:
......@@ -335,14 +326,18 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
return super(JobQueueCluster, self).scale(n_jobs, memory=None, cores=n_cores)
def scale_up(self, n_jobs, sge_job_spec_key=None):
"""
Scale cluster up. This is supposed to be used by the scheduler while dynamically allocating resources
"""Scale cluster up.
This is supposed to be used by the scheduler while dynamically
allocating resources
"""
return self.scale(n_jobs, sge_job_spec_key)
async def scale_down(self, workers, sge_job_spec_key=None):
"""
Scale cluster down. This is supposed to be used by the scheduler while dynamically allocating resources
"""Scale cluster down.
This is supposed to be used by the scheduler while dynamically
allocating resources
"""
await super().scale_down(workers)
......@@ -351,9 +346,8 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
class AdaptiveMultipleQueue(Adaptive):
"""
Custom mechanism to adaptively allocate workers based on scheduler load
"""Custom mechanism to adaptively allocate workers based on scheduler load.
This custom implementation extends the `Adaptive.recommendations` by looking
at the `distributed.scheduler.TaskState.resource_restrictions`.
......@@ -362,13 +356,11 @@ class AdaptiveMultipleQueue(Adaptive):
.. note ::
If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should
request a job matching those resource restrictions
"""
async def recommendations(self, target: int) -> dict:
"""
Make scale up/down recommendations based on current state and target
"""
"""Make scale up/down recommendations based on current state and
target."""
plan = self.plan
requested = self.requested
......@@ -440,12 +432,11 @@ class AdaptiveMultipleQueue(Adaptive):
class SchedulerResourceRestriction(Scheduler):
"""
Idiap extended distributed scheduler
"""Idiap extended distributed scheduler.
This scheduler extends `Scheduler` by just adding a handler
that fetches, at every scheduler cycle, the resource restrictions of
a task that has status `no-worker`
This scheduler extends `Scheduler` by just adding a handler that
fetches, at every scheduler cycle, the resource restrictions of a
task that has status `no-worker`
"""
def __init__(self, *args, **kwargs):
......@@ -455,9 +446,8 @@ class SchedulerResourceRestriction(Scheduler):
] = self.get_no_worker_tasks_resource_restrictions
def get_no_worker_tasks_resource_restrictions(self, comm=None):
"""
Get the a task resource restrictions for jobs that has the status 'no-worker'
"""
"""Get the a task resource restrictions for jobs that has the status
'no-worker'."""
resource_restrictions = []
for k in self.tasks:
......
......@@ -3,10 +3,10 @@
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
"""
This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big
This HAS to be the default
"""This queue setup has a DEMANDING arrangement.
For CPU jobs, it prioritizes q_1day and io_big This HAS to be the
default
"""
QUEUE_DEFAULT = {
"default": {
......
This diff is collapsed.
from collections.abc import MutableSequence
"""Base definition of sample."""
"""Base definition of sample"""
from collections.abc import MutableSequence
def _copy_attributes(s, d):
"""Copies attributes from a dictionary to self
"""
"""Copies attributes from a dictionary to self."""
s.__dict__.update(
dict([k, v] for k, v in d.items() if k not in ("data", "load", "samples"))
dict(
(k, v)
for k, v in d.items()
if k not in ("data", "load", "samples", "_data")
)
)
class DelayedSample:
"""Representation of sample that can be loaded via a callable
class _ReprMixin:
def __repr__(self):
return (
f"{self.__class__.__name__}("
+ ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items())
+ ")"
)
The optional ``**kwargs`` argument allows you to attach more attributes to
this sample instance.
class Sample(_ReprMixin):
"""Representation of sample. A Sample is a simple container that wraps a
data-point (see :ref:`bob.pipelines.sample`)
Parameters
----------
Each sample must have the following attributes:
load:
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
* attribute ``data``: Contains the data for this sample
parent : :py:class:`bob.pipelines.sample.DelayedSample`, :py:class:`bob.pipelines.sample.Sample`, None
If passed, consider this as a parent of this sample, to copy
information
kwargs : dict
Further attributes of this sample, to be stored and eventually
transmitted to transformed versions of the sample
Parameters
----------
data : object
Object representing the data to initialize this sample with.
parent : object
A parent object from which to inherit all other attributes (except
``data``)
"""
def __init__(self, load, parent=None, **kwargs):
self.load = load
def __init__(self, data, parent=None, **kwargs):
self.data = data
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
@property
def data(self):
"""Loads the data from the disk file"""
return self.load()
class DelayedSample(_ReprMixin):
"""Representation of sample that can be loaded via a callable.
class Sample:
"""Representation of sample that is sufficient for the blocks in this module
Each sample must have the following attributes:
* attribute ``data``: Contains the data for this sample
The optional ``**kwargs`` argument allows you to attach more attributes to
this sample instance.
Parameters
----------
data : object
Object representing the data to initialize this sample with.
load:
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
parent : object
A parent object from which to inherit all other attributes (except
``data``)
parent : :any:`DelayedSample`, :any:`Sample`, None
If passed, consider this as a parent of this sample, to copy
information
kwargs : dict
Further attributes of this sample, to be stored and eventually
transmitted to transformed versions of the sample
"""
def __init__(self, data, parent=None, **kwargs):
self.data = data
def __init__(self, load, parent=None, **kwargs):
self.load = load
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
self._data = None
@property
def data(self):
"""Loads the data from the disk file."""
if self._data is None:
self._data = self.load()
return self._data
class SampleSet(MutableSequence):
class SampleSet(MutableSequence, _ReprMixin):
"""A set of samples with extra attributes
https://docs.python.org/3/library/collections.abc.html#collections-abstract-base-classes
"""
https://docs.python.org/3/library/collections.abc.html#collections-
abstract-base-classes."""
def __init__(self, samples, parent=None, **kwargs):
self.samples = samples
......
......@@ -4,4 +4,4 @@ from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
__all__ = [_ for _ in dir() if not _.startswith("_")]
from bob.pipelines.sample import Sample, SampleSet, DelayedSample
import bob.pipelines as mario
import numpy
import copy
......@@ -8,13 +8,13 @@ def test_sampleset_collection():
n_samples = 10
X = numpy.ones(shape=(n_samples, 2), dtype=int)
sampleset = SampleSet(
[Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
sampleset = mario.SampleSet(
[mario.Sample(data, key=str(i)) for i, data in enumerate(X)], key="1"
)
assert len(sampleset) == n_samples
# Testing insert
sample = Sample(X, key=100)
sample = mario.Sample(X, key=100)
sampleset.insert(1, sample)
assert len(sampleset) == n_samples + 1
......@@ -27,4 +27,4 @@ def test_sampleset_collection():
# Testing iterator
for i in sampleset:
assert isinstance(i, Sample)
assert isinstance(i, mario.Sample)
import nose
import numpy as np
import os
import bob.pipelines as mario
from tempfile import NamedTemporaryFile
def test_io_vstack():
paths = [1, 2, 3, 4, 5]
def oracle(reader, paths):
return np.vstack([reader(p) for p in paths])
def reader_same_size_C(path):
return np.arange(10).reshape(5, 2)
def reader_different_size_C(path):
return np.arange(2 * path).reshape(path, 2)
def reader_same_size_F(path):
return np.asfortranarray(np.arange(10).reshape(5, 2))
def reader_different_size_F(path):
return np.asfortranarray(np.arange(2 * path).reshape(path, 2))
def reader_same_size_C2(path):
return np.arange(30).reshape(5, 2, 3)
def reader_different_size_C2(path):
return np.arange(6 * path).reshape(path, 2, 3)
def reader_same_size_F2(path):
return np.asfortranarray(np.arange(30).reshape(5, 2, 3))
def reader_different_size_F2(path):
return np.asfortranarray(np.arange(6 * path).reshape(path, 2, 3))
def reader_wrong_size(path):
return np.arange(2 * path).reshape(2, path)
# when same_size is False
for reader in [
reader_different_size_C,
reader_different_size_F,
reader_same_size_C,
reader_same_size_F,
reader_different_size_C2,
reader_different_size_F2,
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(mario.utils.vstack_features(reader, paths) == oracle(reader, paths))
# when same_size is True
for reader in [
reader_same_size_C,
reader_same_size_F,
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(
mario.utils.vstack_features(reader, paths, True) == oracle(reader, paths)
)
with nose.tools.assert_raises(AssertionError):
mario.utils.vstack_features(reader_wrong_size, paths)
# test actual files
suffix = ".npy"
with NamedTemporaryFile(suffix=suffix) as f1, NamedTemporaryFile(
suffix=suffix
) as f2, NamedTemporaryFile(suffix=suffix) as f3:
paths = [f1.name, f2.name, f3.name]
# try different readers:
for reader in [
reader_different_size_C,
reader_different_size_F,
reader_same_size_C,
reader_same_size_F,
reader_different_size_C2,
reader_different_size_F2,
reader_same_size_C2,
reader_same_size_F2,
]:
# save some data in files
for i, path in enumerate(paths):
np.save(path, reader(i + 1), allow_pickle=False)
# test when all data is present
reference = oracle(np.load, paths)
np.all(mario.utils.vstack_features(np.load, paths) == reference)
try:
os.remove(paths[0])
# Check if RuntimeError is raised when one of the files is missing
with nose.tools.assert_raises(FileNotFoundError):
mario.utils.vstack_features(np.load, paths)
finally:
# create the file back so NamedTemporaryFile does not complain
np.save(paths[0], reader(i + 1))
from .linearize import Linearize, SampleLinearize, CheckpointSampleLinearize
from .pca import CheckpointSamplePCA, SamplePCA
from .function import (
SampleFunctionTransformer,
CheckpointSampleFunctionTransformer,
StatelessPipeline,
)
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from ..wrappers import wrap
def SampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer (https://scikit-l
earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer
.html) work with :any:`Sample`-based pipelines."""
return wrap([FunctionTransformer, "sample"], **kwargs)
def CheckpointSampleFunctionTransformer(**kwargs):
"""Class that transforms Scikit learn FunctionTransformer (https://scikit-l
earn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer
.html) work with :any:`Sample`-based pipelines.
Furthermore, it makes it checkpointable
"""
return wrap([FunctionTransformer, "sample", "checkpoint"], **kwargs)
class StatelessPipeline(Pipeline):
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None, **fit_params):
return self
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from bob.pipelines.mixins import CheckpointMixin, SampleMixin
from sklearn.preprocessing import FunctionTransformer
import numpy as np
from ..wrappers import wrap
def linearize(X):
......@@ -14,16 +9,16 @@ def linearize(X):
class Linearize(FunctionTransformer):
"""Extracts features by simply concatenating all elements of the data into one long vector.
"""
"""Extracts features by simply concatenating all elements of the data into
one long vector."""
def __init__(self, **kwargs):
super().__init__(func=linearize, **kwargs)
class SampleLinearize(SampleMixin, Linearize):
pass
def SampleLinearize(**kwargs):
return wrap([Linearize, "sample"], **kwargs)
class CheckpointSampleLinearize(CheckpointMixin, SampleMixin, Linearize):
pass
def CheckpointSampleLinearize(**kwargs):
return wrap([Linearize, "sample", "checkpoint"], **kwargs)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Tiago de Freitas Pereira <tiago.pereira@idiap.ch>
from bob.pipelines.mixins import CheckpointMixin, SampleMixin
from sklearn.decomposition import PCA
from ..wrappers import wrap
class SamplePCA(SampleMixin, PCA):
"""
Enables SAMPLE handling for https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
"""
pass
def SamplePCA(**kwargs):
"""Enables SAMPLE handling for :any:`sklearn.decomposition.PCA`"""
return wrap([PCA, "sample"], **kwargs)
class CheckpointSamplePCA(CheckpointMixin, SampleMixin, PCA):
"""
Enables SAMPLE and CHECKPOINTIN handling for https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
"""
pass
def CheckpointSamplePCA(**kwargs):
"""Enables SAMPLE and CHECKPOINTIN handling for
:any:`sklearn.decomposition.PCA`"""
return wrap([PCA, "sample", "checkpoint"], **kwargs)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import pickle
import nose
import numpy as np
def is_picklable(obj):
"""
Test if an object is picklable or not
"""
import pickle
def is_picklable(obj):
"""Test if an object is picklable or not."""
try:
pickle.dumps(obj)
except TypeError:
......@@ -15,3 +13,169 @@ def is_picklable(obj):
return False
return True
def assert_picklable(obj):
"""Test if an object is picklable or not."""
string = pickle.dumps(obj)
new_obj = pickle.loads(string)
obj = obj.__dict__
new_obj = new_obj.__dict__
assert len(obj) == len(new_obj)
nose.tools.assert_equal(list(obj.keys()), list(new_obj.keys()))
for k, v in obj.items():
if isinstance(v, np.ndarray):
np.testing.assert_equal(v, new_obj[k])
else:
nose.tools.assert_equal(v, new_obj[k])
def is_estimator_stateless(estimator):
if not hasattr(estimator, "_get_tags"):
raise ValueError(
f"Passed estimator: {estimator} does not have the _get_tags method."
)
# See: https://scikit-learn.org/stable/developers/develop.html
# if the estimator does not require fit or is stateless don't call fit
tags = estimator._get_tags()
if tags["stateless"] or not tags["requires_fit"]:
return True
return False
def _generate_features(reader, paths, same_size=False):
"""Load and stack features in a memory efficient way. This function is
meant to be used inside :py:func:`vstack_features`.
Parameters
----------
reader : ``collections.Callable``
See the documentation of :py:func:`vstack_features`.
paths : ``collections.Iterable``
See the documentation of :py:func:`vstack_features`.
same_size : :obj:`bool`, optional
See the documentation of :py:func:`vstack_features`.
Yields
------
object
The first object returned is a tuple of :py:class:`numpy.dtype` of
features and the shape of the first feature. The rest of objects are
the actual values in features. The features are returned in C order.
"""
shape_determined = False
for i, path in enumerate(paths):
feature = np.atleast_2d(reader(path))
feature = np.ascontiguousarray(feature)
if not shape_determined:
shape_determined = True
dtype = feature