Commit 0750c4df authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

When vanilla_pad was working

parent c917ba39
Pipeline #39183 failed with stage
in 3 minutes and 59 seconds
from bob.pipelines.mixins import estimator_dask_it
pipeline = estimator_dask_it(pipeline)
from dask.distributed import Client, LocalCluster
from multiprocessing import cpu_count
n_nodes = 4
n_nodes = cpu_count()
threads_per_worker = 1
cluster = LocalCluster(
This diff is collapsed.
......@@ -7,7 +7,11 @@ def _copy_attributes(s, d):
"""Copies attributes from a dictionary to self
dict([k, v] for k, v in d.items() if k not in ("data", "load", "samples"))
(k, v)
for k, v in d.items()
if k not in ("data", "load", "samples", "_data")
......@@ -40,11 +44,14 @@ class DelayedSample:
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
self._data = None
def data(self):
"""Loads the data from the disk file"""
return self.load()
if self._data is None:
self._data = self.load()
return self._data
class Sample:
......@@ -74,7 +81,6 @@ class Sample:
_copy_attributes(self, kwargs)
class SampleSet(MutableSequence):
"""A set of samples with extra attributes
......@@ -11,7 +11,7 @@ from bob.pipelines.mixins import (
from .linearize import Linearize, SampleLinearize, CheckpointSampleLinearize
from .pca import CheckpointSamplePCA, SamplePCA
from .function import SampleFunctionTransformer, CheckpointSampleFunctionTransformer, StatelessPipeline
from .dask import ToDaskBag
import dask.bag
from sklearn.base import TransformerMixin, BaseEstimator
class ToDaskBag(TransformerMixin, BaseEstimator):
"""Transform an arbitrary iterator into a :py:class:`dask.bag`
npartitions: int
Number of partitions used in :py:meth:`dask.bag.from_sequence`
>>> transformer = DaskBagMixin()
>>> dask_bag = transformer.transform([1,2,3])
>>> dask_bag.map_partitions.....
def __init__(self, npartitions=None, **kwargs):
self.npartitions = npartitions
def fit(self, X, y=None):
return self
def transform(self, X):
return dask.bag.from_sequence(X, npartitions=self.npartitions)
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
from ..mixins import SampleMixin, CheckpointMixin
class SampleFunctionTransformer(SampleMixin, FunctionTransformer):
"""Class that transforms Scikit learn FunctionTransformer
work with :any:`Sample`-based pipelines.
class CheckpointSampleFunctionTransformer(
CheckpointMixin, SampleMixin, FunctionTransformer
"""Class that transforms Scikit learn FunctionTransformer
work with :any:`Sample`-based pipelines.
Furthermore, it makes it checkpointable
class StatelessPipeline(Pipeline):
def _more_tags(self):
return {"stateless": True, "requires_fit": False}
def fit(self, X, y=None, **fit_params):
return self
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
import pickle
import nose
import numpy as np
def is_picklable(obj):
Test if an object is picklable or not
import pickle
except TypeError:
......@@ -15,3 +14,21 @@ def is_picklable(obj):
return False
return True
def assert_picklable(obj):
Test if an object is picklable or not
string = pickle.dumps(obj)
new_obj = pickle.loads(string)
obj = obj.__dict__
new_obj = new_obj.__dict__
assert len(obj) == len(new_obj), list(new_obj.keys()))
for k, v in obj.items():
if isinstance(v, np.ndarray):
np.testing.assert_equal(v, new_obj[k])
else:, new_obj[k])
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment