Commit 25617068 authored by Tiago de Freitas Pereira's avatar Tiago de Freitas Pereira

Merge branch 'update2' into 'master'

Using the same path used in the file checking in the delayed sample

See merge request !17
parents 06fa85bb 91d31a1d
Pipeline #38692 failed with stages
in 8 minutes and 6 seconds
...@@ -14,10 +14,7 @@ import dask.bag ...@@ -14,10 +14,7 @@ import dask.bag
def estimator_dask_it( def estimator_dask_it(
o, o, fit_tag=None, transform_tag=None, npartitions=None,
fit_tag=None,
transform_tag=None,
npartitions=None,
): ):
""" """
Mix up any :py:class:`sklearn.pipeline.Pipeline` or :py:class:`sklearn.estimator.Base` with Mix up any :py:class:`sklearn.pipeline.Pipeline` or :py:class:`sklearn.estimator.Base` with
...@@ -85,35 +82,31 @@ def estimator_dask_it( ...@@ -85,35 +82,31 @@ def estimator_dask_it(
o.steps.insert(0, ("0", DaskBagMixin(npartitions=npartitions))) o.steps.insert(0, ("0", DaskBagMixin(npartitions=npartitions)))
# Patching dask_resources # Patching dask_resources
dasked = mix_me_up( dasked = mix_me_up([DaskEstimatorMixin], o,)
[DaskEstimatorMixin],
o,
)
# Tagging each element in a pipeline # Tagging each element in a pipeline
if isinstance(o, Pipeline): if isinstance(o, Pipeline):
# Tagging each element for fitting and transforming # Tagging each element for fitting and transforming
for estimator in o.steps:
estimator[1].fit_tag = None
if fit_tag is not None: if fit_tag is not None:
for index, tag in fit_tag: for index, tag in fit_tag:
o.steps[index][1].fit_tag = tag o.steps[index][1].fit_tag = tag
else:
for estimator in o.steps:
estimator[1].fit_tag = fit_tag
for estimator in o.steps:
estimator[1].transform_tag = None
if transform_tag is not None: if transform_tag is not None:
for index, tag in transform_tag: for index, tag in transform_tag:
o.steps[index][1].transform_tag = tag o.steps[index][1].transform_tag = tag
else:
for estimator in o.steps:
estimator[1].transform_tag = transform_tag
for estimator in o.steps: for estimator in o.steps:
estimator[1].resource_tags = dict() estimator[1].resource_tags = dict()
estimator[1]._dask_state = estimator[1]
else: else:
dasked.fit_tag = fit_tag dasked.fit_tag = fit_tag
dasked.transform_tag = transform_tag dasked.transform_tag = transform_tag
dasked.resource_tags = dict() dasked.resource_tags = dict()
dasked._dask_state = dasked
# Bounding the method # Bounding the method
dasked.dask_tags = types.MethodType(_fetch_resource_tape, dasked) dasked.dask_tags = types.MethodType(_fetch_resource_tape, dasked)
...@@ -288,7 +281,7 @@ class CheckpointMixin: ...@@ -288,7 +281,7 @@ class CheckpointMixin:
# save the new sample # save the new sample
self.save(new_sample) self.save(new_sample)
else: else:
new_sample = self.load(sample) new_sample = self.load(sample, path)
return new_sample return new_sample
...@@ -338,13 +331,14 @@ class CheckpointMixin: ...@@ -338,13 +331,14 @@ class CheckpointMixin:
else: else:
raise ValueError("Type for sample not supported %s" % type(sample)) raise ValueError("Type for sample not supported %s" % type(sample))
def load(self, sample): def load(self, sample, path):
path = self.make_path(sample)
# because we are checkpointing, we return a DelayedSample # because we are checkpointing, we return a DelayedSample
# instead of a normal (preloaded) sample. This allows the next # instead of a normal (preloaded) sample. This allows the next
# phase to avoid loading it would it be unnecessary (e.g. next # phase to avoid loading it would it be unnecessary (e.g. next
# phase is already check-pointed) # phase is already check-pointed)
return DelayedSample(functools.partial(self.load_func, path), parent=sample) return DelayedSample(
functools.partial(self.load_func, path), parent=sample
)
def load_model(self): def load_model(self):
if _is_estimator_stateless(self): if _is_estimator_stateless(self):
......
...@@ -113,7 +113,7 @@ def test_fittable_sample_transformer(): ...@@ -113,7 +113,7 @@ def test_fittable_sample_transformer():
samples = [Sample(data) for data in X] samples = [Sample(data) for data in X]
# Mixing up with an object # Mixing up with an object
transformer = mix_me_up(SampleMixin, DummyWithFit()) transformer = mix_me_up([SampleMixin], DummyWithFit)()
features = transformer.fit(samples).transform(samples) features = transformer.fit(samples).transform(samples)
_assert_all_close_numpy_array(X + 1, [s.data for s in features]) _assert_all_close_numpy_array(X + 1, [s.data for s in features])
...@@ -194,12 +194,9 @@ def test_checkpoint_fittable_sample_transformer(): ...@@ -194,12 +194,9 @@ def test_checkpoint_fittable_sample_transformer():
_assert_checkpoints(features, oracle, model_path, features_dir, False) _assert_checkpoints(features, oracle, model_path, features_dir, False)
from bob.io.base import create_directories_safe
def _build_estimator(path, i): def _build_estimator(path, i):
base_dir = os.path.join(path, f"transformer{i}") base_dir = os.path.join(path, f"transformer{i}")
create_directories_safe(base_dir) os.makedirs(base_dir, exist_ok=True)
model_path = os.path.join(base_dir, "model.pkl") model_path = os.path.join(base_dir, "model.pkl")
features_dir = os.path.join(base_dir, "features") features_dir = os.path.join(base_dir, "features")
......
...@@ -31,7 +31,7 @@ requirements: ...@@ -31,7 +31,7 @@ requirements:
- dask-jobqueue - dask-jobqueue
- numpy {{ numpy }} - numpy {{ numpy }}
- h5py - h5py
- scikit-learn >-= 0.22 - scikit-learn >=0.22
run: run:
- python - python
- setuptools - setuptools
...@@ -51,9 +51,7 @@ test: ...@@ -51,9 +51,7 @@ test:
- coverage - coverage
- sphinx - sphinx
- sphinx_rtd_theme - sphinx_rtd_theme
- dask
- dask-jobqueue
- scikit-learn
about: about:
summary: bob.pipelines summary: bob.pipelines
home: https://www.idiap.ch/software/bob/ home: https://www.idiap.ch/software/bob/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment