Commit 13c6d8d2 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch 'multiple-changes' into 'master'

Multiple changes

Closes #24 and #26

See merge request !47
parents b399779e a61d9797
Pipeline #45956 passed with stages
in 9 minutes and 8 seconds
......@@ -8,19 +8,34 @@ import numpy as np
from bob.io.base import vstack_features
SAMPLE_DATA_ATTRS = ("data", "load", "samples", "_data")
SAMPLE_DATA_ATTRS = ("data", "load", "samples", "delayed_attributes")
def _copy_attributes(s, d):
def _copy_attributes(sample, parent, kwargs):
"""Copies attributes from a dictionary to self."""
s.__dict__.update(dict((k, v) for k, v in d.items() if k not in SAMPLE_DATA_ATTRS))
if parent is not None:
for key in parent.__dict__:
if key in SAMPLE_DATA_ATTRS:
continue
setattr(sample, key, getattr(parent, key))
for key, value in kwargs.items():
if key in SAMPLE_DATA_ATTRS:
continue
setattr(sample, key, value)
class _ReprMixin:
def __repr__(self):
return (
f"{self.__class__.__name__}("
+ ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items())
+ ", ".join(
f"{k}={v!r}"
for k, v in self.__dict__.items()
if k != "delayed_attributes"
)
+ ")"
)
......@@ -72,9 +87,7 @@ class Sample(_ReprMixin):
def __init__(self, data, parent=None, **kwargs):
self.data = data
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
_copy_attributes(self, parent, kwargs)
class DelayedSample(_ReprMixin):
......@@ -87,7 +100,7 @@ class DelayedSample(_ReprMixin):
Parameters
----------
load:
load
A python function that can be called parameterlessly, to load the
sample in question from whatever medium
......@@ -95,24 +108,34 @@ class DelayedSample(_ReprMixin):
If passed, consider this as a parent of this sample, to copy
information
delayed_attributes : dict or None
A dictionary of name : load_fn pairs that will be used to create
attributes of name : load_fn() in this class. Use this to option
to create more delayed attributes than just ``sample.data``.
kwargs : dict
Further attributes of this sample, to be stored and eventually
transmitted to transformed versions of the sample
"""
def __init__(self, load, parent=None, **kwargs):
def __init__(self, load, parent=None, delayed_attributes=None, **kwargs):
self.delayed_attributes = delayed_attributes
# create the delayed attributes but leave the their values as None for now.
if delayed_attributes is not None:
kwargs.update({k: None for k in delayed_attributes})
_copy_attributes(self, parent, kwargs)
self.load = load
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
self._data = None
def __getattribute__(self, name: str):
delayed_attributes = super().__getattribute__("delayed_attributes")
if delayed_attributes is None or name not in delayed_attributes:
return super().__getattribute__(name)
return delayed_attributes[name]()
@property
def data(self):
"""Loads the data from the disk file."""
if self._data is None:
self._data = self.load()
return self._data
return self.load()
class SampleSet(MutableSequence, _ReprMixin):
......@@ -120,9 +143,7 @@ class SampleSet(MutableSequence, _ReprMixin):
def __init__(self, samples, parent=None, **kwargs):
self.samples = samples
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
_copy_attributes(self, parent, kwargs)
def __len__(self):
return len(self.samples)
......@@ -147,9 +168,7 @@ class DelayedSampleSet(SampleSet):
def __init__(self, load, parent=None, **kwargs):
self._data = None
self.load = load
if parent is not None:
_copy_attributes(self, parent.__dict__)
_copy_attributes(self, kwargs)
_copy_attributes(self, parent, kwargs)
@property
def samples(self):
......@@ -165,19 +184,20 @@ class SampleBatch(Sequence, _ReprMixin):
sample.data attributes in a memory efficient way.
"""
def __init__(self, samples):
def __init__(self, samples, sample_attribute="data"):
self.samples = samples
self.sample_attribute = sample_attribute
def __len__(self):
return len(self.samples)
def __getitem__(self, item):
return self.samples[item].data
return getattr(self.samples[item], self.sample_attribute)
def __array__(self, dtype=None, *args, **kwargs):
def _reader(s):
# adding one more dimension to data so they get stacked sample-wise
return s.data[None, ...]
return getattr(s, self.sample_attribute)[None, ...]
arr = vstack_features(_reader, self.samples, dtype=dtype)
return np.asarray(arr, dtype, *args, **kwargs)
......
......@@ -7,6 +7,7 @@ import tempfile
import h5py
import numpy as np
from bob.pipelines import DelayedSample
from bob.pipelines import DelayedSampleSet
from bob.pipelines import Sample
from bob.pipelines import SampleSet
......@@ -84,3 +85,19 @@ def test_sample_hdf5():
compare = [a == b for a, b in zip(samples_deserialized, samples)]
assert np.sum(compare) == 10
def test_delayed_samples():
def load_data():
return 0
def load_annot():
return "annotation"
delayed_sample = DelayedSample(load_data, delayed_attributes=dict(annot=load_annot))
assert delayed_sample.data == 0, delayed_sample.data
assert delayed_sample.annot == "annotation", delayed_sample.annot
child_sample = Sample(1, parent=delayed_sample)
assert child_sample.data == 1, child_sample.data
assert child_sample.annot == "annotation", child_sample.annot
import os
import tempfile
from functools import partial
import dask
import dask_ml.decomposition
import dask_ml.preprocessing
......@@ -17,9 +19,25 @@ from sklearn.preprocessing import StandardScaler
import bob.pipelines as mario
def _build_toy_samples():
def _build_toy_samples(delayed=False):
X = np.ones(shape=(10, 5), dtype=int)
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
if delayed:
def _load(index, attr):
if attr == "data":
return X[index]
if attr == "key":
return str(index)
samples = [
mario.DelayedSample(
partial(_load, i, "data"),
delayed_attributes=dict(key=partial(_load, i, "key")),
)
for i in range(len(X))
]
else:
samples = [mario.Sample(data, key=str(i)) for i, data in enumerate(X)]
return X, samples
......@@ -31,17 +49,46 @@ def test_samples_to_dataset():
np.testing.assert_array_equal(dataset["key"], [str(i) for i in range(10)])
def _build_iris_dataset(shuffle=False):
def test_delayed_samples_to_dataset():
X, samples = _build_toy_samples(delayed=True)
dataset = mario.xr.samples_to_dataset(samples)
assert dataset.dims == {"sample": X.shape[0], "dim_0": X.shape[1]}, dataset.dims
np.testing.assert_array_equal(dataset["data"], X)
np.testing.assert_array_equal(dataset["key"], [str(i) for i in range(10)])
def _build_iris_dataset(shuffle=False, delayed=False):
iris = datasets.load_iris()
X = iris.data
keys = [str(k) for k in range(len(X))]
samples = [
mario.Sample(x, target=y, key=k)
for x, y, k in zip(iris.data, iris.target, keys)
]
if delayed:
def _load(index, attr):
if attr == "data":
return X[index]
if attr == "key":
return str(index)
if attr == "target":
return iris.target[index]
samples = [
mario.DelayedSample(
partial(_load, i, "data"),
delayed_attributes=dict(
key=partial(_load, i, "key"),
target=partial(_load, i, "target"),
),
)
for i in range(len(X))
]
else:
samples = [
mario.Sample(x, target=y, key=k)
for x, y, k in zip(iris.data, iris.target, keys)
]
meta = xr.DataArray(X[0], dims=("feature",))
dataset = mario.xr.samples_to_dataset(
samples, meta=meta, npartitions=3, shuffle=shuffle
......@@ -50,20 +97,21 @@ def _build_iris_dataset(shuffle=False):
def test_dataset_pipeline():
ds = _build_iris_dataset()
estimator = mario.xr.DatasetPipeline(
[
PCA(n_components=0.99),
{
"estimator": LinearDiscriminantAnalysis(),
"fit_input": ["data", "target"],
},
]
)
for delayed in (True, False):
ds = _build_iris_dataset(delayed=delayed)
estimator = mario.xr.DatasetPipeline(
[
PCA(n_components=0.99),
{
"estimator": LinearDiscriminantAnalysis(),
"fit_input": ["data", "target"],
},
]
)
estimator = estimator.fit(ds)
ds = estimator.decision_function(ds)
ds.compute()
estimator = estimator.fit(ds)
ds = estimator.decision_function(ds)
ds.compute()
def test_dataset_pipeline_with_shapes():
......
......@@ -21,20 +21,41 @@ from .utils import is_estimator_stateless
logger = logging.getLogger(__name__)
def _one_sample_to_dataset(sample, meta=None):
dataset = {k: v for k, v in sample.__dict__.items() if k not in SAMPLE_DATA_ATTRS}
def _load_fn_to_xarray(load_fn, meta=None):
if meta is None:
meta = sample.data
dataset["data"] = dask.array.from_delayed(
dask.delayed(sample).data, meta.shape, dtype=meta.dtype, name=False
meta = np.array(load_fn())
da = dask.array.from_delayed(
dask.delayed(load_fn)(), meta.shape, dtype=meta.dtype, name=False
)
try:
dims = meta.dims
except Exception:
dims = None
dataset["data"] = xr.DataArray(dataset["data"], dims=dims)
return xr.Dataset(dataset).chunk()
xa = xr.DataArray(da, dims=dims)
return xa, meta
def _one_sample_to_dataset(sample, meta=None):
dataset = {}
delayed_attributes = getattr(sample, "delayed_attributes", None) or {}
for k in sample.__dict__:
if k in SAMPLE_DATA_ATTRS or k in delayed_attributes:
continue
dataset[k] = getattr(sample, k)
meta = meta or {}
for k in ["data"] + list(delayed_attributes.keys()):
attr_meta = meta.get(k)
attr_array, attr_meta = _load_fn_to_xarray(
partial(getattr, sample, k), meta=attr_meta
)
meta[k] = attr_meta
dataset[k] = attr_array
return xr.Dataset(dataset).chunk(), meta
def samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False):
......@@ -58,13 +79,20 @@ def samples_to_dataset(samples, meta=None, npartitions=48, shuffle=False):
``xarray.Dataset``
The constructed dataset with at least a ``data`` variable.
"""
if meta is None:
dataset = _one_sample_to_dataset(samples[0])
meta = dataset["data"]
if meta is not None and not isinstance(meta, dict):
meta = dict(data=meta)
delayed_attributes = getattr(samples[0], "delayed_attributes", None) or {}
if meta is None or not all(
k in meta for k in ["data"] + list(delayed_attributes.keys())
):
dataset, meta = _one_sample_to_dataset(samples[0])
if shuffle:
random.shuffle(samples)
dataset = xr.concat(
[_one_sample_to_dataset(s, meta=meta) for s in samples], dim="sample"
[_one_sample_to_dataset(s, meta=meta)[0] for s in samples], dim="sample"
)
if npartitions is not None:
dataset = dataset.chunk({"sample": max(1, len(samples) // npartitions)})
......@@ -431,7 +459,9 @@ class DatasetPipeline(_BaseComposition):
try:
ds = block.dataset_map(ds)
except Exception as e:
raise RuntimeError(f"Could not map ds {ds}\n with {block.dataset_map}") from e
raise RuntimeError(
f"Could not map ds {ds}\n with {block.dataset_map}"
) from e
continue
if do_fit:
......
......@@ -243,4 +243,4 @@ def setup(app):
# Add `>>>` button to toggle visibility of prompts in code blocks.
# see https://github.com/readthedocs/sphinx_rtd_theme/issues/167 and
# https://raw.githubusercontent.com/python/python-docs-theme/master/python_docs_theme/static/copybutton.js
app.add_javascript("copybutton.js")
app.add_js_file("copybutton.js")
......@@ -157,7 +157,7 @@ Below, follow an example on how to use :any:`DelayedSample`.
... return np.zeros((2,))
>>> delayed_sample = mario.DelayedSample(load, metadata=1)
>>> delayed_sample
DelayedSample(load=<function load at ...>, metadata=1, _data=None)
DelayedSample(metadata=1, load=<function load at ...)
As soon as you access the ``.data`` attribute, the data is loaded and kept in memory:
......@@ -213,6 +213,6 @@ transform each sample inside and returns the same SampleSets with new data.
>>> transformed_sample_sets = sample_pipeline.transform(sample_sets)
>>> transformed_sample_sets[0].samples[1]
DelayedSample(load=..., offset=array([1]), _data=None)
DelayedSample(offset=array([1]), load=...)
>>> transformed_sample_sets[0].samples[1].data
array([1., 1.])
......@@ -76,7 +76,7 @@ to convert our dataset to a list of samples first:
... for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(load=functools.partial(<function load at ...>, 0), target=0, _data=None)
DelayedSample(target=0, load=...)
You may be already familiar with our sample concept. If not, please read more on
:ref:`bob.pipelines.sample`. Now, to optimize our process, we will represent our
......@@ -265,7 +265,7 @@ features. Let's add the ``key`` metadata to our dataset first:
... for i, y in enumerate(iris.target)
... ]
>>> samples[0]
DelayedSample(load=functools.partial(<function load at ...>, 0), target=0, key=0, _data=None)
DelayedSample(target=0, key=0, load=...)
>>> # construct the meta from one sample
>>> meta = xr.DataArray(samples[0].data, dims=("feature"))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment