Using scikit learn pipelines with Dask
Opening this issue just as a note for posterity.
Today I've done an exercise using scikit-learn pipelines and dask https://ml.dask.org/compose.html
We could leverage from the scikit-api and benefit from its caching mechanism too. You can check the small snipped below how I use it (I made an adaptor to transform our algorithms in scikit-estimators).
Two things to be observed. I couldn't use the cache
since most of our stuff is C++ based (not picklable).
And since things are not picklable, I made a very shitty job with the adaptor in order to integrate it with Dask as you can see in code.
In order to have the instance creation of Bob objects in the Worker (like @andre.anjos is doing with the SampleLoader (probably for the same reason (there's another reason for that too))), the method fit
creates the Bob object and returns itself.
I think the current design is cleaner. I will give up this one.
ping @amohammadi
from sklearn.pipeline import Pipeline
# Local client
import dask.bag
from dask.distributed import Client, LocalCluster
import bob.bio.base
import bob.bio.face
import numpy
cache_dir = "./cache"
from sklearn.base import BaseEstimator
class Scikit2BobEstimator(BaseEstimator):
"""
Base class to adapt from bob algorithms to scikit estimators
Check here for more info:
https://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
"""
def __init__(self, bob_object):
self.bob_class = bob_object
def fit(self, X, y, **kwargs):
self.bob_object = self.bob_class(**kwargs)
return self
def transform(self, X, **kwargs):
"""
Here `X` can be our samples where the annotations can be shipped.
"""
annotations = {"leye": (10, 10), "reye": (20, 10)}
return [self.bob_object(x, annotations=annotations) for x in X]
### Starting the client
cluster = LocalCluster(nanny=False, processes=False, n_workers=1, threads_per_worker=1)
cluster.scale_up(1)
client = Client(cluster)
####### PREPROCESSOR #########
# Using face crop
CROPPED_IMAGE_HEIGHT = 80
CROPPED_IMAGE_WIDTH = CROPPED_IMAGE_HEIGHT * 4 // 5
## eye positions for frontal images
RIGHT_EYE_POS = (CROPPED_IMAGE_HEIGHT // 5, CROPPED_IMAGE_WIDTH // 4 - 1)
LEFT_EYE_POS = (CROPPED_IMAGE_HEIGHT // 5, CROPPED_IMAGE_WIDTH // 4 * 3)
import functools
preprocessor = Scikit2BobEstimator(
functools.partial(
bob.bio.face.preprocessor.FaceCrop,
cropped_image_size=(CROPPED_IMAGE_HEIGHT, CROPPED_IMAGE_WIDTH),
cropped_positions={"leye": LEFT_EYE_POS, "reye": RIGHT_EYE_POS},
)
)
### EXTRACTOR #######
extractor = Scikit2BobEstimator(bob.bio.base.extractor.Linearize)
estimators = [("preprocess", preprocessor), ("extractor", extractor)]
#### HERE I COULD CACHE IT #####
# pipeline = Pipeline(estimators, memory=cache_dir)
pipeline = Pipeline(estimators)
X = [numpy.random.rand(3, 100, 100) for _ in range(100)]
db = dask.bag.from_sequence(X)
db = db.map_partitions(pipeline.fit_transform)
print(db.compute(scheduler=client))
client.shutdown()