Commit 6788926e authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

[wrappers] When checkpointing, checkpoing all steps in a pipeline

parent ac8afd2a
......@@ -312,7 +312,10 @@ class CheckpointWrapper(BaseWrapper, TransformerMixin):
os.makedirs(os.path.dirname(path), exist_ok=True)
# Gets or sample.<sample_attribute> if specified
to_save = getattr(sample, self.sample_attribute)
return self.save_func(to_save, path)
self.save_func(to_save, path)
except Exception as e:
raise RuntimeError(f"Could not save {to_save} duing {self}.save") from e
def load(self, sample, path):
# because we are checkpointing, we return a DelayedSample
......@@ -505,7 +508,16 @@ def wrap(bases, estimator=None, **kwargs):
if isinstance(estimator, Pipeline):
# wrap inner steps
for idx, name, trans in estimator._iter():
trans, leftover = _wrap(trans, **kwargs)
# when checkpointing a pipeline, checkpoint each transformer in its own folder
new_kwargs = dict(kwargs)
features_dir, model_path = kwargs.get("features_dir"), kwargs.get("model_path")
if features_dir is not None:
new_kwargs["features_dir"] = os.path.join(features_dir, name)
if model_path is not None:
new_kwargs["model_path"] = os.path.join(model_path, name)
trans, leftover = _wrap(trans, **new_kwargs)
estimator.steps[idx] = (name, trans)
# if being wrapped with DaskWrapper, add ToDaskBag to the steps
