bob.pipelines merge requestshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests2020-02-11T15:50:27Zhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/1Handling tasks with in heterogeneous workers2020-02-11T15:50:27ZTiago de Freitas PereiraHandling tasks with in heterogeneous workersHi guys,
This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.
**Solving 1**: To launch Heterogeneous jobs, I had to extend `dask_jobqueue.core.JobQueueClust...Hi guys,
This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.
**Solving 1**: To launch Heterogeneous jobs, I had to extend `dask_jobqueue.core.JobQueueCluster` in such way that every time that the pool of workers needs to be increased,
a new "worker_spec" is defined (https://github.com/dask/distributed/blob/ddbec38ba1ec6de913ccbfcd090f1c85eea1b032/distributed/deploy/spec.py#L443).
This is not the default behaviour of `dask_jobqueue`, where it's assumed that everything is homogeneous.
**Solving 2**: To make the scheduler assign a `dask.delayed` task to one specific worker it was supposed to be simple.
In case `dask.workers` are manually launched, the script `dask-worker` has an argument called `--resources` where you can add a personal TAG to the worker https://distributed.dask.org/en/latest/resources.html#example.
Once this is done, a tuple containing the `dask-delayed` hash and the personal TAG can be passed to the method `dask.delayed.compute` (https://distributed.dask.org/en/latest/resources.html#resources-with-collections).
Then, everything works nicely.
**HOWEVER**, our `dask.workers` are launched by the `dask_jobqueue` and there's no way to amend the `--resource` command-line argument to it.
I will propose a patch https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/core.py#L77 to solve this issue.
In the mean time, I implemented a work around here https://gitlab.idiap.ch/bob/bob.pipelines/blob/20d1732da5ce341db48a49da49f14b21ddaa2f67/bob/pipelines/distributed/sge.py#L18
It works nicely.
Follow a snippet to use this feature (i will amend it in a test case somehow)
```python
from bob.pipelines.distributed.sge import SGEIdiapCluster, sge_submit_iobig_gpu
from dask.distributed import Client
import dask
from time import sleep
import os
# Defining dummy jobs
def inc(x):
sleep(1)
return x + 1
def double(x):
sleep(1)
print(os.system("nvidia-smi"))
return x + 2
def add(x, y):
sleep(1)
return x + y
# Defining my graph
X = [1,2,3,4,5,6,7]
output = []
resources = dict()
for x in X:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
# LOOK HERE
# Annotating certain jobs to run in the GPU
resources[tuple(b.__dask_keys__())] = {'GPU':1}
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
# Submitting 2 jobs in the iobig and one at the GPU queue
client = sge_submit_iobig_gpu(n_jobs_iobig=2, n_jobs_gpu=1)
# Run the graph with a resource specification
print(total.compute(scheduler=client, resources=resources))
pass
```
ping @andre.anjoshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/2Added distance prediction and scoring2020-02-17T09:04:48ZYannick DAYERAdded distance prediction and scoringAdded a test in AlgorithmAdaptator enroll and score to not apply
'project()' in a distance algorithm (when 'requires_projector_training' is False).Added a test in AlgorithmAdaptator enroll and score to not apply
'project()' in a distance algorithm (when 'requires_projector_training' is False).https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/3Moving some elements to bob.bio.base2020-02-20T09:10:44ZTiago de Freitas PereiraMoving some elements to bob.bio.baseJointly working with https://gitlab.idiap.ch/bob/bob.bio.base/merge_requests/180Jointly working with https://gitlab.idiap.ch/bob/bob.bio.base/merge_requests/180https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/4Processor api2020-03-09T15:49:22ZTiago de Freitas PereiraProcessor apiSweetheart S2,
Follow in the MR the check-pointable processors.
With this MR it contains:
- Base Processor class that defines methods for `fit` a processor given some samples (optionally) and to `transform` samples
- ProcessorPipelin...Sweetheart S2,
Follow in the MR the check-pointable processors.
With this MR it contains:
- Base Processor class that defines methods for `fit` a processor given some samples (optionally) and to `transform` samples
- ProcessorPipeline that:
- stack those Processors **sequentially**
- checkpoint them if necessary
- fit processor if they are fittable
- transform them
Close https://gitlab.idiap.ch/bob/bob.pipelines/issues/4
ping @andre.anjos @amohammadihttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/5Make scikit operations daskable2020-03-14T19:35:05ZTiago de Freitas PereiraMake scikit operations daskableHi guys, this is result of our discussion today. I just "copy/paste" the print from @amohammadi in this MR so we can iterate over it.
Well, I don't know if we can solve this with ONLY Mixin's.
I will try to: 1-) Formalize the problem...Hi guys, this is result of our discussion today. I just "copy/paste" the print from @amohammadi in this MR so we can iterate over it.
Well, I don't know if we can solve this with ONLY Mixin's.
I will try to: 1-) Formalize the problem; 2-) Present all possible use cases and 3-) Try to propose directions with dask operations.
# Problem Statements:
1. Can we make stateless (only `estimator.transform`) and statefull (methods `estimator.fit`/`estimator.transform` enabled) scikit estimators automatically daskable by just wrapping them with some magic Mixin?
2. Can we make WHOLE pipelines (a stack of these estimators daskable) daskable enabled with some magic Mixin?
# Formalization of the problem:
## Boundaries
1. Here I will consider ONLY cases where estimators are stacked objects (a.k.a pipeline), because this is like real life looks like. Hence, pipeline can be defined as: `pipeline=[estimator_[1], estimator_[2],....estimator_[n]]`
2. whole pipelines can be either fittable/transformable (statefull) or transformable (stateless) (look at https://gitlab.idiap.ch/bob/bob.pipelines/blob/master/bob/pipelines/test/test_processor.py#L230).
3. `pipeline.transform` HAS to be called as a result of `dask.bag.map` so we can enjoy parallelization
## Cardinality of the operations
### Case A: `pipeline.transform`
1. `pipeline.transform` is a 1:1 operation. Hence, 2 or more `estimator_n.transform` can be dasked in one shot with: `dask.bag.from_sequence([sample_set]).map_partitions(pipeline.transform)`. Easy. We already enjoy that in the vanilla-pipeline
### Case B: `pipeline.fit`
Here we have 2 situations:
1. `estimator_[n].fit` followed by `estimator_[n].transform` is an 1:N operation. Once a `estimator_[n].fit` is done, we need to be able to take all the samples used in this operation and `map` them again into a `estimator_[n].transform` so we can enjoy parallellization. The only way I see this working is by making samples as input of Mixin class (in the __init__). In this we could `dask.delayed(estimator_[n].fit)([sample_set])` and `dask.bag.from_sequence([sample_set]).map_partitions(estimator_[n].transform)`. This basically would break the scikit `API` :-(
2. `estimator_[n].transform` followed by `estimator_[n+1].fit` is an N:1 operation. We need to be able to concatenate samples from `estimator_[n].transform` to pass them to the followed `estimator_[n+1].fit`. I don't see how this can work ONLY WITH MIXIN'S. We need to have some higher level entitty (possibly an extension of the scikit.pipelines that i wrote in the last MR) to orchestrate this.
Well, that's it.
I hope I provided enough details for discussion
ping @andre.anjos @amohammadi
thankshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/6small fixes for bob.bio.base integration2020-03-16T16:47:27ZTiago de Freitas Pereirasmall fixes for bob.bio.base integrationSome small fixes for bob.bio.base integration https://gitlab.idiap.ch/bob/bob.bio.base/merge_requests/182Some small fixes for bob.bio.base integration https://gitlab.idiap.ch/bob/bob.bio.base/merge_requests/182https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/7Two updates2020-03-23T12:48:17ZTiago de Freitas PereiraTwo updates- Made Sample and Checkpoint Mixins transform samplesets
- Removed the transform sampletset function (no purpose for having that)- Made Sample and Checkpoint Mixins transform samplesets
- Removed the transform sampletset function (no purpose for having that)https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/8Checking if the super class has the method fit to avoid unecessary stack from...2020-03-24T18:17:37ZTiago de Freitas PereiraChecking if the super class has the method fit to avoid unecessary stack from the base classhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/9SampleMixin now accepts extra arguments2020-03-26T10:04:59ZAmir MOHAMMADISampleMixin now accepts extra argumentsTiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/11SampleMixin don't call fit on stateless estimators2020-03-26T12:18:04ZAmir MOHAMMADISampleMixin don't call fit on stateless estimatorsTiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/12Not all functions implement the keyword argument create_directories2020-03-26T14:38:01ZTiago de Freitas PereiraNot all functions implement the keyword argument create_directoriesAmir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/14Fix dask_it, mix_me_up, and CheckpointMixin.load2020-03-26T18:40:16ZAmir MOHAMMADIFix dask_it, mix_me_up, and CheckpointMixin.loadTiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/15Fix dask_it, mix_me_up, and CheckpointMixin.load2020-03-27T06:54:08ZAmir MOHAMMADIFix dask_it, mix_me_up, and CheckpointMixin.loadTiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/16Updates2020-03-27T08:25:34ZTiago de Freitas PereiraUpdatesFixed dask_it function
Set a minimum version for scikit learnFixed dask_it function
Set a minimum version for scikit learnhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/17Using the same path used in the file checking in the delayed sample2020-03-29T17:22:36ZTiago de Freitas PereiraUsing the same path used in the file checking in the delayed samplehttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/19Two updates2020-04-03T07:02:37ZTiago de Freitas PereiraTwo updates1. Created SampleSet.__len__ to count the samples inside of the sampleset
2. Created some logging in our main mixins1. Created SampleSet.__len__ to count the samples inside of the sampleset
2. Created some logging in our main mixinshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/20Get SampleSet to accept DelayedSample2020-04-06T06:25:28ZYannick DAYERGet SampleSet to accept DelayedSampleQuick fix to accept both Sample and DelayedSample objects in
SampleSet.insert.
Fixes #9Quick fix to accept both Sample and DelayedSample objects in
SampleSet.insert.
Fixes #9Tiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/21Adaptive and heterogenous resource allocation2020-04-08T16:09:19ZTiago de Freitas PereiraAdaptive and heterogenous resource allocationHi people
With this MR, we can submit jobs via dask SpecCluster with different SGE specifications and I allowed the scheduler to **dynamically** request for some SGE job specs.
The Dask scheduler has a heuristics to create and dest...Hi people
With this MR, we can submit jobs via dask SpecCluster with different SGE specifications and I allowed the scheduler to **dynamically** request for some SGE job specs.
The Dask scheduler has a heuristics to create and destroy workers based on CPU and MEMORY load.
The scheduler that I extended does the same (it's the same method), but also request for SGE requirements if necessary.
Follow below a use case.
The code below executes the following graph:
![Screen_Shot_2020-04-06_at_12.06.51](/uploads/fb4488da28c43dce35e0d659ff5d766c/Screen_Shot_2020-04-06_at_12.06.51.png)
All the jobs can be executed in `all.q` hosts, except the bottleneck one (pointed in the image).
For this one, only `q_1day` hosts can take it.
Our scheduler (`IdiapScheduler`) recognizes that a task requests an specific type of worker and just ask it.
Our `SGEIdiapCluster` recognizes that kind of requests and does `qsub` with the proper requirements
```python
# from bob.pipelines.distributed.sge import SGEIdiapCluster
from dask.distributed import Client
from bob.pipelines.distributed.sge import SGEIdiapCluster, AdaptiveIdiap
from dask.distributed import Client
import time
import os
import logging
logger = logging.getLogger("distributed")
logger.setLevel(0)
def dependency_example(client):
import dask
# Defining dummy jobs
def inc(x, value):
time.sleep(0.25)
return x + value
def bottleneck(x):
time.sleep(20)
return sum(x)
# Defining my graph
X = list(range(100))
#X = list(range(2))
output = []
resources = dict()
paralell_data = []
for x in X:
a = dask.delayed(inc)(x, 1)
b = dask.delayed(inc)(a, 2)
paralell_data.append(b)
# Bottleneck
c = dask.delayed(bottleneck)(paralell_data)
# LOOK HERE
# Annotating certain jobs to run in the GPU
resources[tuple(c.__dask_keys__())] = {'q_1day':1}
final_parelell_data = []
for x in paralell_data:
final_parelell_data.append(dask.delayed(inc)(x, c))
total = dask.delayed(sum)(final_parelell_data)
print(total.compute(scheduler=client, resources=resources))
#print(total.compute(scheduler="single-threaded"))
Q_1DAY_ALL = {
"q_1day": {
"queue": "q_1day",
"memory": "8GB",
"io_big": True,
"resource_spec": "",
"resources": {"q_1day":1},
},
"default": {
"queue": "all.q",
"memory": "4GB",
"io_big": False,
"resource_spec": "",
"resources": "",
}
}
cluster = SGEIdiapCluster(sge_job_spec=Q_1DAY_ALL)
cluster.scale(1, sge_job_spec_key="default")
cluster.adapt(minimum=0, maximum=48)
client = Client(cluster)
dependency_example(client)
#dependency_example(None)
```
https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/22New User Guide2020-04-22T15:59:33ZTiago de Freitas PereiraNew User Guidehttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/23Better defaults for .adapt method2020-04-23T10:43:15ZTiago de Freitas PereiraBetter defaults for .adapt method