Skip to content
GitLab
Projects Groups Snippets
  • /
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in
  • bob.pipelines bob.pipelines
  • Project information
    • Project information
    • Activity
    • Labels
    • Members
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 5
    • Issues 5
    • List
    • Boards
    • Service Desk
    • Milestones
  • Merge requests 2
    • Merge requests 2
  • CI/CD
    • CI/CD
    • Pipelines
    • Jobs
    • Schedules
  • Deployments
    • Deployments
    • Environments
    • Releases
  • Packages and registries
    • Packages and registries
    • Package Registry
    • Infrastructure Registry
  • Monitor
    • Monitor
    • Incidents
  • Analytics
    • Analytics
    • Value stream
    • CI/CD
    • Repository
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar
  • bobbob
  • bob.pipelinesbob.pipelines
  • Issues
  • #2
Closed
Open
Issue created Jan 22, 2020 by Tiago de Freitas Pereira@tiago.pereiraOwner

Using scikit learn pipelines with Dask

Opening this issue just as a note for posterity.

Today I've done an exercise using scikit-learn pipelines and dask https://ml.dask.org/compose.html

We could leverage from the scikit-api and benefit from its caching mechanism too. You can check the small snipped below how I use it (I made an adaptor to transform our algorithms in scikit-estimators).

Two things to be observed. I couldn't use the cache since most of our stuff is C++ based (not picklable). And since things are not picklable, I made a very shitty job with the adaptor in order to integrate it with Dask as you can see in code. In order to have the instance creation of Bob objects in the Worker (like @andre.anjos is doing with the SampleLoader (probably for the same reason (there's another reason for that too))), the method fit creates the Bob object and returns itself.

I think the current design is cleaner. I will give up this one.

ping @amohammadi

from sklearn.pipeline import Pipeline

# Local client
import dask.bag
from dask.distributed import Client, LocalCluster

import bob.bio.base
import bob.bio.face
import numpy

cache_dir = "./cache"


from sklearn.base import BaseEstimator


class Scikit2BobEstimator(BaseEstimator):
    """
    Base class to adapt from bob algorithms to scikit estimators

    Check here for more info:

    https://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
    """

    def __init__(self, bob_object):
        self.bob_class = bob_object

    def fit(self, X, y, **kwargs):
        self.bob_object = self.bob_class(**kwargs)
        return self

    def transform(self, X, **kwargs):
        """
        Here `X` can be our samples where the annotations can be shipped.
        """
        annotations = {"leye": (10, 10), "reye": (20, 10)}
        return [self.bob_object(x, annotations=annotations) for x in X]


### Starting the client
cluster = LocalCluster(nanny=False, processes=False, n_workers=1, threads_per_worker=1)
cluster.scale_up(1)
client = Client(cluster)


####### PREPROCESSOR #########
# Using face crop
CROPPED_IMAGE_HEIGHT = 80
CROPPED_IMAGE_WIDTH = CROPPED_IMAGE_HEIGHT * 4 // 5

## eye positions for frontal images
RIGHT_EYE_POS = (CROPPED_IMAGE_HEIGHT // 5, CROPPED_IMAGE_WIDTH // 4 - 1)
LEFT_EYE_POS = (CROPPED_IMAGE_HEIGHT // 5, CROPPED_IMAGE_WIDTH // 4 * 3)

import functools

preprocessor = Scikit2BobEstimator(
    functools.partial(
        bob.bio.face.preprocessor.FaceCrop,
        cropped_image_size=(CROPPED_IMAGE_HEIGHT, CROPPED_IMAGE_WIDTH),
        cropped_positions={"leye": LEFT_EYE_POS, "reye": RIGHT_EYE_POS},
    )
)


### EXTRACTOR #######
extractor = Scikit2BobEstimator(bob.bio.base.extractor.Linearize)
estimators = [("preprocess", preprocessor), ("extractor", extractor)]


#### HERE I COULD CACHE IT #####
# pipeline = Pipeline(estimators, memory=cache_dir)
pipeline = Pipeline(estimators)


X = [numpy.random.rand(3, 100, 100) for _ in range(100)]

db = dask.bag.from_sequence(X)
db = db.map_partitions(pipeline.fit_transform)


print(db.compute(scheduler=client))

client.shutdown()
Edited Jan 23, 2020 by Tiago de Freitas Pereira
Assignee
Assign to
Time tracking