Commit d5bc2a02 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

Fixed sphinx and ran black

parent 2c3e754c
Pipeline #39282 passed with stage
in 3 minutes and 39 seconds
from . import utils
from .sample import Sample, DelayedSample, SampleSet
from .wrappers import BaseWrapper, DelayedSamplesCall, SampleWrapper, CheckpointWrapper, DaskWrapper, ToDaskBag, wrap, dask_tags
from .wrappers import (
BaseWrapper,
DelayedSamplesCall,
SampleWrapper,
CheckpointWrapper,
DaskWrapper,
ToDaskBag,
wrap,
dask_tags,
)
from . import distributed
from . import transformers
def __appropriate__(*args):
"""Says object was actually declared here, and not in the import module.
Fixing sphinx warnings of not being able to find classes, when path is
shortened.
Parameters
----------
*args
The objects that you want sphinx to beleive that are defined here.
Resolves `Sphinx referencing issues <https//github.com/sphinx-
doc/sphinx/issues/3048>`
"""
for obj in args:
obj.__module__ = __name__
__appropriate__(
Sample,
DelayedSample,
SampleSet,
BaseWrapper,
DelayedSamplesCall,
SampleWrapper,
CheckpointWrapper,
DaskWrapper,
ToDaskBag,
)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
__all__ = [_ for _ in dir() if not _.startswith("_")]
from bob.pipelines.mixins import estimator_dask_it
pipeline = estimator_dask_it(pipeline)
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
\ No newline at end of file
__path__ = extend_path(__path__, __name__)
......@@ -65,7 +65,7 @@ class DelayedSample(_ReprMixin):
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
parent : :py:class:`bob.pipelines.sample.DelayedSample`, :py:class:`bob.pipelines.sample.Sample`, None
parent : :any:`DelayedSample`, :any:`Sample`, None
If passed, consider this as a parent of this sample, to copy
information
......
......@@ -4,4 +4,4 @@ from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# gets sphinx autodoc done right - don't remove it
__all__ = [_ for _ in dir() if not _.startswith('_')]
__all__ = [_ for _ in dir() if not _.startswith("_")]
......@@ -28,24 +28,3 @@ def test_sampleset_collection():
# Testing iterator
for i in sampleset:
assert isinstance(i, mario.Sample)
import tempfile
import os
import numpy as np
with tempfile.TemporaryDirectory() as dir_name:
checkpointing_transformer = mario.CheckpointWrapper(features_dir=dir_name)
# now let's create some samples with ``key`` metadata
# Creating X: 3 samples, 2 features
X = np.zeros((3, 2))
# 3 offsets: one for each sample
offsets = np.arange(3).reshape((3, 1))
# key values must be string because they will be used to create file names.
samples = [mario.Sample(x, offset=o, key=str(i)) for i, (x, o) in enumerate(zip(X, offsets))]
samples[0]
# let's transform them
transformed_samples = checkpointing_transformer.transform(samples)
# Let's check the features directory
os.listdir(dir_name)
......@@ -232,7 +232,9 @@ def test_checkpoint_fittable_pipeline():
X = np.ones(shape=(10, 2), dtype=int)
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
samples_transform = [mario.Sample(data, key=str(i + 10)) for i, data in enumerate(X)]
samples_transform = [
mario.Sample(data, key=str(i + 10)) for i, data in enumerate(X)
]
oracle = X + 3
with tempfile.TemporaryDirectory() as d:
......
from .linearize import Linearize, SampleLinearize, CheckpointSampleLinearize
from .pca import CheckpointSamplePCA, SamplePCA
from .function import SampleFunctionTransformer, CheckpointSampleFunctionTransformer, StatelessPipeline
from .function import (
SampleFunctionTransformer,
CheckpointSampleFunctionTransformer,
StatelessPipeline,
)
......@@ -11,6 +11,7 @@ def linearize(X):
class Linearize(FunctionTransformer):
"""Extracts features by simply concatenating all elements of the data into one long vector.
"""
def __init__(self, **kwargs):
super().__init__(func=linearize, **kwargs)
......
......@@ -28,6 +28,8 @@ def _frmt(estimator, limit=40):
class BaseWrapper(BaseEstimator):
"""The base class for all wrappers"""
def _more_tags(self):
return self.estimator._more_tags()
......@@ -75,7 +77,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
Attributes
----------
fit_extra_arguments : [tuple], optional
fit_extra_arguments : [tuple]
Use this option if you want to pass extra arguments to the fit method of the
mixed instance. The format is a list of two value tuples. The first value in
tuples is the name of the argument that fit accepts, like ``y``, and the second
......@@ -83,7 +85,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
passing samples to the fit method and want to pass ``subject`` attributes of
samples as the ``y`` argument to the fit method, you can provide ``[("y",
"subject")]`` as the value for this attribute.
transform_extra_arguments : [tuple], optional
transform_extra_arguments : [tuple]
Similar to ``fit_extra_arguments`` but for the transform and other similar methods.
"""
......@@ -379,22 +381,19 @@ class DaskWrapper(BaseWrapper, TransformerMixin):
class ToDaskBag(TransformerMixin, BaseEstimator):
"""Transform an arbitrary iterator into a :py:class:`dask.bag`
Paramters
---------
npartitions: int
Number of partitions used in :py:meth:`dask.bag.from_sequence`
"""Transform an arbitrary iterator into a :any:`dask.bag.Bag`
Example
-------
>>> transformer = DaskBagMixin()
>>> import bob.pipelines as mario
>>> transformer = mario.ToDaskBag()
>>> dask_bag = transformer.transform([1,2,3])
>>> dask_bag.map_partitions.....
>>> # dask_bag.map_partitions(...)
Attributes
----------
npartitions : int
Number of partitions used in :any:`dask.bag.from_sequence`
"""
def __init__(self, npartitions=None, **kwargs):
......@@ -422,7 +421,7 @@ def wrap(bases, estimator=None, **kwargs):
----------
bases : list
A list of classes to be used
estimator : None, optional
estimator : :any:`object`, optional
An initial estimator to be wrapped inside other wrappers. If None, the first class will be used to initialize the estimator.
**kwargs
Extra parameters passed to the init of classes.
......
......@@ -136,3 +136,86 @@ checkpoint exists.
... # call .fit again. This time it should not print anything
... __ = checkpointing_transformer.fit(samples)
Fit was called!
.. _bob.pipelines.wrap:
Convenience wrapper function
----------------------------
We provide a :any:`wrap` function to wrap estimators in several layers easily. So far we
learned that we need to wrap our estimators with :any:`SampleWrapper` and
:any:`CheckpointWrapper`. There is also a Dask wrapper: :any:`DaskWrapper` which you'll
learn about in :ref:`bob.pipelines.dask`. Below, is an example on how to use it.
Instead of:
.. doctest::
>>> transformer = MyTransformer()
>>> transform_extra_arguments=[("sample_specific_offsets", "offset")]
>>> transformer = mario.SampleWrapper(transformer, transform_extra_arguments)
>>> transformer = mario.CheckpointWrapper(
... transformer, features_dir="features", model_path="model.pkl")
>>> transformer = mario.DaskWrapper(transformer)
You can write:
.. doctest::
>>> transformer = mario.wrap(
... [MyTransformer, "sample", "checkpoint", "dask"],
... transform_extra_arguments=transform_extra_arguments,
... features_dir="features",
... model_path="model.pkl",
... )
>>> # or if your estimator is already created.
>>> transformer = mario.wrap(
... ["sample", "checkpoint", "dask"],
... MyTransformer(),
... transform_extra_arguments=transform_extra_arguments,
... features_dir="features",
... model_path="model.pkl",
... )
Much simpler, no? Internally ``"sample"`` string will be replaced by
:any:`SampleWrapper`. You provide a list of classes to wrap as the first argument,
optionally provide an estimator to be wrapped as the second argument. If the second
argument is missing, the first class will be used to create the estimator. Then, you
provide the ``__init__`` parameters of all classes as kwargs.
Internally, :any:`wrap` will pass kwargs to classes that accept it.
.. note::
:any:`wrap` is a convenience function but it might be limited in what it can do. You
can always use the wrapper classes directly.
:any:`wrap` recognizes :any:`sklearn.pipeline.Pipeline` objects and when pipelines are
passed, it wraps the steps inside them instead. For example, instead of:
.. doctest::
>>> transformer1 = mario.wrap(
... [MyTransformer, "sample"],
... transform_extra_arguments=transform_extra_arguments,
... )
>>> transformer2 = mario.wrap(
... [MyTransformer, "sample"],
... transform_extra_arguments=transform_extra_arguments,
... )
>>> from sklearn.pipeline import make_pipeline
>>> pipeline = make_pipeline(transformer1, transformer2)
you can write:
.. doctest::
>>> pipeline = make_pipeline(MyTransformer(), MyTransformer())
>>> pipeline = mario.wrap(["sample"], pipeline, transform_extra_arguments=transform_extra_arguments)
It will pass ``transform_extra_arguments`` to all steps when wrapping them with the
:any:`SampleWrapper`. You cannot pass specific arguments to one of the steps. Wrapping
pipelines with :any:`wrap`, while limited, becomes useful when we are wrapping them
with Dask as we will see in :ref:`bob.pipelines.dask`.
......@@ -220,8 +220,11 @@ rst_epilog = """
# Default processing flags for sphinx
autoclass_content = "class"
autodoc_member_order = "bysource"
autodoc_default_flags = ["members", "undoc-members", "show-inheritance"]
autodoc_default_options = {
"members": True,
"undoc-members": True,
"show-inheritance": True,
}
# For inter-documentation mapping:
from bob.extension.utils import link_documentation, load_requirements
......
......@@ -5,7 +5,7 @@
========================================
"`Dask is <dask:index>`_ a flexible library for parallel computing in Python.".
`Dask <https://dask.org/>`_ is a flexible library for parallel computing in Python.
The purpose of this guide is not to describe how dask works.
For that, go to its documentation.
Moreover, there are plenty of tutorials online.
......@@ -24,12 +24,12 @@ The purpose of :doc:`scikit learn pipelines <modules/generated/sklearn.pipeline.
Then, it is possible to use the methods `fit` and `transform` to create models and transform your data respectivelly.
Any :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>` can be transformed in a :doc:`Dask Graph <graphs>` to be further executed by any :doc:`Dask Client <client>`.
This is carried out via the :py:func:`bob.pipelines.mixins.estimator_dask_it` function.
Such fuction does two things:
This is carried out via the :any:`wrap` function when used like ``wrap(["dask"], estimator)`` (see :ref:`bob.pipelines.wrap`).
Such function does two things:
1. Edit the current :py:class:`sklearn.pipeline.Pipeline` by adding a new first step, where input samples are transformed in :doc:`Dask Bag <bag>`. This allows the usage of :py:func:`dask.bag.map` for further transformations.
1. Edit the current :any:`sklearn.pipeline.Pipeline` by adding a new first step, where input samples are transformed in :doc:`Dask Bag <bag>`. This allows the usage of :any:`dask.bag.map` for further transformations.
2. Mix all :doc:`estimators <modules/generated/sklearn.base.BaseEstimator>` in the pipeline with the :py:class:`bob.pipelines.mixins.DaskEstimatorMixin`. Such mixin is reponsible for the creation of the task graph for the methods `.fit` and `.transform`.
2. Wrap all :doc:`estimators <modules/generated/sklearn.base.BaseEstimator>` in the pipeline with :any:`DaskWrapper`. This wrapper is responsible for the creation of the task graph for the methods `.fit` and `.transform`.
The code snippet below enables such feature for an arbitrary :doc:`pipeline <modules/generated/sklearn.pipeline.Pipeline>`.
......@@ -37,13 +37,15 @@ The code snippet below enables such feature for an arbitrary :doc:`pipeline <mod
.. code:: python
>>> from bob.pipelines.mixins import estimator_dask_it
>>> dask_pipeline = estimator_dask_it(make_pipeline(...)) # Create a dask graph
>>> import bob.pipelines as mario
>>> from sklearn.pipeline import make_pipeline
>>> pipeline = make_pipeline(...)
>>> dask_pipeline = mario.wrap(["dask"], pipeline) # Create a dask graph
>>> dask_pipeline.fit_transform(....).compute() # Run the task graph using the default client
The code below is the same as the one presented in :ref:`checkpoint example <checkpoint_statefull>`.
However, lines 59-63 convert such pipeline in a :doc:`Dask Graph <graphs>` and runs it in a local computer.
The code below is an example. Especially lines 59-63 where we convert such pipeline in a
:doc:`Dask Graph <graphs>` and runs it in a local computer.
.. literalinclude:: ./python/pipeline_example_dask.py
......@@ -91,7 +93,7 @@ Dask provides generic :doc:`deployment <dask-jobqueue:examples>` mechanism for S
2. As a result of 1., the mechanism of :doc:`adaptive deployment <dask:setup/adaptive>` is not able to handle job submissions of two or more queues.
For this reason the generic SGE laucher was extended to this one :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
For this reason the generic SGE laucher was extended to this one :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster`. Next subsections presents some code samples using this launcher in the most common cases you will probably find in your daily job.
Launching jobs in different SGE queues
......@@ -166,10 +168,10 @@ Every time` cluster.scale` is executed to increase the amount of available SGE j
Note that in `MyBoostedFitTransformer.fit` a delay of `120s`was introduced to fake "processing" in the GPU queue.
During the execution of `MyBoostedFitTransformer.fit` in `q_gpu`, other resources are idle, which is a waste of resources (imagined a CNN training of 2 days instead of the 2 minutes from our example).
For this reason there's the method adapt in :py:class:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
For this reason there's the method adapt in :any:`bob.pipelines.distributed.sge.SGEMultipleQueuesCluster` that will adjust the SGE jobs available according to the needs of a :doc:`dask graph <graphs>`.
Its usage is pretty simple.
The code below determines that to run a :doc:`dask graph <graphs>`, the :py:class`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
The code below determines that to run a :doc:`dask graph <graphs>`, the :any`distributed.scheduler.Scheduler` can demand a maximum of 10 SGE jobs. A lower bound was also set, in this case, two SGE jobs.
.. code:: python
......
......@@ -2,4 +2,5 @@ py:meth dask.distributed.Adaptive
py:class dask_jobqueue.core.JobQueueCluster
py:class distributed.deploy.adaptive.Adaptive
py:class dask_jobqueue.core.Job
py:class sklearn.preprocessing._function_transformer.FunctionTransformer
\ No newline at end of file
py:class sklearn.preprocessing._function_transformer.FunctionTransformer
py:class bob.pipelines.sample._ReprMixin
......@@ -21,7 +21,6 @@ class MyTransformer(TransformerMixin, BaseEstimator):
class MyFitTranformer(TransformerMixin, BaseEstimator):
def __init__(self, *args, **kwargs):
self._fit_model = None
super().__init__(*args, **kwargs)
......@@ -31,7 +30,7 @@ class MyFitTranformer(TransformerMixin, BaseEstimator):
return [x @ self._fit_model for x in X]
def fit(self, X):
self._fit_model = numpy.array([[1,2],[3,4]])
self._fit_model = numpy.array([[1, 2], [3, 4]])
return self
......@@ -51,8 +50,7 @@ pipeline = make_pipeline(
features_dir="./checkpoint_1",
),
MyBoostedFitTransformer(
features_dir="./checkpoint_2",
model_path="./checkpoint_2/model.pkl",
features_dir="./checkpoint_2", model_path="./checkpoint_2/model.pkl",
),
)
......@@ -60,4 +58,6 @@ pipeline = make_pipeline(
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5)
# Run the task graph in the local computer in a single tread
X_transformed = dasked_pipeline.fit_transform(X_as_sample).compute(scheduler="single-threaded")
\ No newline at end of file
X_transformed = dasked_pipeline.fit_transform(X_as_sample).compute(
scheduler="single-threaded"
)
......@@ -87,8 +87,10 @@ client = Client(cluster) # Creating the scheduler
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(X_as_sample) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
client.shutdown()
......@@ -87,8 +87,10 @@ client = Client(cluster) # Creating the scheduler
# and tagging the the fit method of the second estimator to run in the GPU
dasked_pipeline = estimator_dask_it(pipeline, npartitions=5, fit_tag=[(1, "gpu")])
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(X_as_sample) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
dasked_pipeline.fit(X_as_sample) # Create the dask-graph for fitting
X_transformed = dasked_pipeline.transform(
X_as_sample
) # Create the dask graph for transform and returns a dask bag
X_transformed = X_transformed.compute(scheduler=client) # RUN THE GRAPH
client.shutdown()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment