Commit 4edc65de authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

introduce SampleBatch

parent 473afdf5
Pipeline #39602 passed with stage
in 3 minutes and 40 seconds
"""Base definition of sample."""
from collections.abc import MutableSequence
from collections.abc import MutableSequence, Sequence
from .utils import vstack_features
import numpy as np
def _copy_attributes(s, d):
......@@ -89,9 +91,7 @@ class DelayedSample(_ReprMixin):
class SampleSet(MutableSequence, _ReprMixin):
"""A set of samples with extra attributes
https://docs.python.org/3/library/collections.abc.html#collections-
abstract-base-classes."""
"""A set of samples with extra attributes"""
def __init__(self, samples, parent=None, **kwargs):
self.samples = samples
......@@ -114,3 +114,27 @@ class SampleSet(MutableSequence, _ReprMixin):
def insert(self, index, item):
# if not item in self.samples:
self.samples.insert(index, item)
class SampleBatch(Sequence, _ReprMixin):
"""A batch of samples that looks like [s.data for s in samples]
However, when you call np.array(SampleBatch), it will construct a numpy array from
sample.data attributes in a memory efficient way.
"""
def __init__(self, samples):
self.samples = samples
def __len__(self):
return len(self.samples)
def __getitem__(self, item):
return self.samples[item].data
def __array__(self, dtype=None, *args, **kwargs):
def _reader(s):
# adding one more dimension to data so they get stacked sample-wise
return s.data[None, ...]
arr = vstack_features(_reader, self.samples, dtype=dtype)
return np.asarray(arr, dtype, *args, **kwargs)
......@@ -5,7 +5,7 @@ import bob.pipelines as mario
import numpy as np
def test_linearize_processor():
def test_linearize():
def _assert(Xt, oracle):
assert np.allclose(Xt, oracle), (Xt, oracle)
......@@ -32,7 +32,7 @@ def test_linearize_processor():
assert os.path.exists(os.path.join(d, "1.h5"))
def test_pca_processor():
def test_pca():
# Test wrapped in to a Sample
X = np.random.rand(100, 10)
......
......@@ -9,36 +9,42 @@ def test_io_vstack():
paths = [1, 2, 3, 4, 5]
def asser_(actual, desired, dtype=None):
np.testing.assert_allclose(actual, desired)
if dtype is not None:
assert actual.dtype == dtype, (actual.dtype, dtype)
def oracle(reader, paths):
return np.vstack([reader(p) for p in paths])
def reader_same_size_C(path):
return np.arange(10).reshape(5, 2)
return np.arange(10).reshape(5, 2) + path
def reader_different_size_C(path):
return np.arange(2 * path).reshape(path, 2)
return np.arange(2 * path).reshape(path, 2) + path
def reader_same_size_F(path):
return np.asfortranarray(np.arange(10).reshape(5, 2))
return np.asfortranarray(np.arange(10).reshape(5, 2)) + path
def reader_different_size_F(path):
return np.asfortranarray(np.arange(2 * path).reshape(path, 2))
return np.asfortranarray(np.arange(2 * path).reshape(path, 2)) + path
def reader_same_size_C2(path):
return np.arange(30).reshape(5, 2, 3)
return np.arange(30).reshape(5, 2, 3) + path
def reader_different_size_C2(path):
return np.arange(6 * path).reshape(path, 2, 3)
return np.arange(6 * path).reshape(path, 2, 3) + path
def reader_same_size_F2(path):
return np.asfortranarray(np.arange(30).reshape(5, 2, 3))
return np.asfortranarray(np.arange(30).reshape(5, 2, 3)) + path
def reader_different_size_F2(path):
return np.asfortranarray(np.arange(6 * path).reshape(path, 2, 3))
return np.asfortranarray(np.arange(6 * path).reshape(path, 2, 3)) + path
def reader_wrong_size(path):
return np.arange(2 * path).reshape(2, path)
return np.arange(2 * path).reshape(2, path) + path
dtype = "float32"
# when same_size is False
for reader in [
reader_different_size_C,
......@@ -50,7 +56,12 @@ def test_io_vstack():
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(mario.utils.vstack_features(reader, paths) == oracle(reader, paths))
asser_(mario.utils.vstack_features(reader, paths), oracle(reader, paths))
asser_(
mario.utils.vstack_features(reader, paths, dtype=dtype),
oracle(reader, paths),
dtype,
)
# when same_size is True
for reader in [
......@@ -59,8 +70,11 @@ def test_io_vstack():
reader_same_size_C2,
reader_same_size_F2,
]:
np.all(
mario.utils.vstack_features(reader, paths, True) == oracle(reader, paths)
asser_(mario.utils.vstack_features(reader, paths, True), oracle(reader, paths))
asser_(
mario.utils.vstack_features(reader, paths, True, dtype=dtype),
oracle(reader, paths),
dtype,
)
with nose.tools.assert_raises(AssertionError):
......@@ -88,7 +102,12 @@ def test_io_vstack():
np.save(path, reader(i + 1), allow_pickle=False)
# test when all data is present
reference = oracle(np.load, paths)
np.all(mario.utils.vstack_features(np.load, paths) == reference)
asser_(mario.utils.vstack_features(np.load, paths), reference)
asser_(
mario.utils.vstack_features(np.load, paths, dtype=dtype),
reference,
dtype,
)
try:
os.remove(paths[0])
# Check if RuntimeError is raised when one of the files is missing
......
......@@ -288,7 +288,7 @@ def test_checkpoint_fit_transform_pipeline():
transformer = ("1", _build_transformer(d, 1))
pipeline = Pipeline([fitter, transformer])
if dask_enabled:
pipeline = mario.wrap(["dask"], pipeline, fit_tag=[(1, "GPU")])
pipeline = mario.wrap(["dask"], pipeline, fit_tag=[(1, "GPU")], npartitions=1)
pipeline = pipeline.fit(samples)
tags = mario.dask_tags(pipeline)
......
......@@ -83,11 +83,14 @@ def _generate_features(reader, paths, same_size=False):
assert shape[1:] == list(feature.shape[1:])
assert dtype == feature.dtype
for value in feature.flat:
yield value
if same_size:
yield (feature.ravel(),)
else:
for feat in feature:
yield (feat.ravel(), )
def vstack_features(reader, paths, same_size=False):
def vstack_features(reader, paths, same_size=False, dtype=None):
"""Stacks all features in a memory efficient way.
Parameters
......@@ -107,6 +110,8 @@ def vstack_features(reader, paths, same_size=False):
If ``True``, it assumes that arrays inside all the paths are the same
shape. If you know the features are the same size in all paths, set this
to ``True`` to improve the performance.
dtype : :py:class:`numpy.dtype`, optional
If provided, the data will be casted to this format.
Returns
-------
......@@ -164,18 +169,21 @@ def vstack_features(reader, paths, same_size=False):
This function runs very slowly. Only use it when RAM is precious.
"""
iterable = _generate_features(reader, paths, same_size)
dtype, shape = next(iterable)
data_dtype, shape = next(iterable)
if dtype is None:
dtype = data_dtype
if same_size:
total_size = int(len(paths) * np.prod(shape))
all_features = np.fromiter(iterable, dtype, total_size)
# numpy black magic: https://stackoverflow.com/a/12473478/1286165
field_dtype = [("", (dtype, (np.prod(shape),)))]
total_size = len(paths)
all_features = np.fromiter(iterable, field_dtype, total_size)
else:
all_features = np.fromiter(iterable, dtype)
field_dtype = [("", (dtype, (np.prod(shape[1:]),)))]
all_features = np.fromiter(iterable, field_dtype)
# go from a field array to a normal array
all_features = all_features.view(dtype)
# the shape is assumed to be (n_samples, ...) it can be (5, 2) or (5, 3, 4).
shape = list(shape)
shape[0] = -1
return np.reshape(all_features, shape, order="C")
def samples_to_np_array(samples, same_size=True):
return vstack_features(lambda s: s.data, samples, same_size=same_size)
"""Scikit-learn Estimator Wrappers."""
from .sample import DelayedSample, SampleSet
from .utils import is_estimator_stateless, samples_to_np_array
from .sample import DelayedSample, SampleSet, SampleBatch
from .utils import is_estimator_stateless
import dask.bag
from sklearn.base import TransformerMixin, BaseEstimator, MetaEstimatorMixin
import os
......@@ -14,7 +14,8 @@ import logging
logger = logging.getLogger(__name__)
def _frmt(estimator, limit=40):
def _frmt(estimator, limit=30):
# default value of limit is chosen so the log can be seen in dask graphs
def _n(e):
return e.__class__.__name__.replace("Wrapper", "")
......@@ -48,24 +49,20 @@ def _make_kwargs_from_samples(samples, arg_attr_list):
def _check_n_input_output(samples, output, func_name):
ls, lo = len(samples), len(output)
if ls != lo:
raise RuntimeError(f"{func_name} got {ls} samples but returned {lo} features!")
raise RuntimeError(f"{func_name} got {ls} samples but returned {lo} samples!")
class DelayedSamplesCall:
def __init__(self, func, func_name, samples, input_is_np_array=False, **kwargs):
def __init__(self, func, func_name, samples, **kwargs):
super().__init__(**kwargs)
self.func = func
self.func_name = func_name
self.samples = samples
self.output = None
self.input_is_np_array = input_is_np_array
def __call__(self, index):
if self.output is None:
if self.input_is_np_array:
X = samples_to_np_array(self.samples)
else:
X = [s.data for s in self.samples]
X = SampleBatch(self.samples)
self.output = self.func(X)
_check_n_input_output(self.samples, self.output, self.func_name)
return self.output[index]
......@@ -101,14 +98,12 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
estimator,
transform_extra_arguments=None,
fit_extra_arguments=None,
input_is_np_array=False,
**kwargs,
):
super().__init__(**kwargs)
self.estimator = estimator
self.transform_extra_arguments = transform_extra_arguments or tuple()
self.fit_extra_arguments = fit_extra_arguments or tuple()
self.input_is_np_array = input_is_np_array
def _samples_transform(self, samples, method_name):
# Transform either samples or samplesets
......@@ -129,7 +124,6 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
partial(method, **kwargs),
func_name,
samples,
input_is_np_array=self.input_is_np_array,
)
new_samples = [
DelayedSample(partial(delayed, index=i), parent=s)
......@@ -161,10 +155,7 @@ class SampleWrapper(BaseWrapper, TransformerMixin):
logger.debug(f"{_frmt(self)}.fit")
kwargs = _make_kwargs_from_samples(samples, self.fit_extra_arguments)
if self.input_is_np_array:
X = samples_to_np_array(samples)
else:
X = [s.data for s in samples]
X = SampleBatch(samples)
self.estimator = self.estimator.fit(X, **kwargs)
copy_learned_attributes(self.estimator, self)
......@@ -476,10 +467,9 @@ def wrap(bases, estimator=None, **kwargs):
# if being wrapped with DaskWrapper, add ToDaskBag to the steps
if DaskWrapper in bases:
valid_params = ToDaskBag._get_param_names()
params = {k: kwargs.pop(k) for k in valid_params if k in kwargs}
params = {k: leftover.pop(k) for k in valid_params if k in leftover}
dask_bag = ToDaskBag(**params)
estimator.steps.insert(0, ("ToDaskBag", dask_bag))
else:
estimator, leftover = _wrap(estimator, **kwargs)
......
......@@ -31,7 +31,7 @@ setup(
zip_safe=False,
install_requires=install_requires,
entry_points={
"bob.cli": ["pipelines = bob.pipelines.script.pipelines:pipelines",],
"bob.cli": ["pipelines = bob.pipelines.scripts.pipelines:pipelines"],
},
# check classifiers, add and remove as you see fit
# full list here: https://pypi.org/classifiers/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment