bob.pipelines merge requestshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests2023-03-28T12:52:29Zhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/113meta [entry-points]: Revert dask.client group name2023-03-28T12:52:29ZYannick DAYERmeta [entry-points]: Revert dask.client group nameSwitch back to `dask.client` instead of `bob.pipelines.dask.client` for
the dask Client entry-points group name in `pyproject.toml`.
Fixes #46.Switch back to `dask.client` instead of `bob.pipelines.dask.client` for
the dask Client entry-points group name in `pyproject.toml`.
Fixes #46.Yannick DAYERYannick DAYERhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/115meta(deps): add bob as dependency in new structure2023-06-14T12:35:09ZYannick DAYERmeta(deps): add bob as dependency in new structureAdapt to the new structure of bob with `bob/bob` on top.Adapt to the new structure of bob with `bob/bob` on top.Yannick DAYERYannick DAYERhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/91Many API changes2022-05-24T11:40:31ZAmir MOHAMMADIMany API changesExpose utils API in the root API.
Fix the docs API.
Remove unused transformers.
Fix SGE GPU submissions.Expose utils API in the root API.
Fix the docs API.
Remove unused transformers.
Fix SGE GPU submissions.The Great DeprecationAmir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/5Make scikit operations daskable2020-03-14T19:35:05ZTiago de Freitas PereiraMake scikit operations daskableHi guys, this is result of our discussion today. I just "copy/paste" the print from @amohammadi in this MR so we can iterate over it.
Well, I don't know if we can solve this with ONLY Mixin's.
I will try to: 1-) Formalize the problem...Hi guys, this is result of our discussion today. I just "copy/paste" the print from @amohammadi in this MR so we can iterate over it.
Well, I don't know if we can solve this with ONLY Mixin's.
I will try to: 1-) Formalize the problem; 2-) Present all possible use cases and 3-) Try to propose directions with dask operations.
# Problem Statements:
1. Can we make stateless (only `estimator.transform`) and statefull (methods `estimator.fit`/`estimator.transform` enabled) scikit estimators automatically daskable by just wrapping them with some magic Mixin?
2. Can we make WHOLE pipelines (a stack of these estimators daskable) daskable enabled with some magic Mixin?
# Formalization of the problem:
## Boundaries
1. Here I will consider ONLY cases where estimators are stacked objects (a.k.a pipeline), because this is like real life looks like. Hence, pipeline can be defined as: `pipeline=[estimator_[1], estimator_[2],....estimator_[n]]`
2. whole pipelines can be either fittable/transformable (statefull) or transformable (stateless) (look at https://gitlab.idiap.ch/bob/bob.pipelines/blob/master/bob/pipelines/test/test_processor.py#L230).
3. `pipeline.transform` HAS to be called as a result of `dask.bag.map` so we can enjoy parallelization
## Cardinality of the operations
### Case A: `pipeline.transform`
1. `pipeline.transform` is a 1:1 operation. Hence, 2 or more `estimator_n.transform` can be dasked in one shot with: `dask.bag.from_sequence([sample_set]).map_partitions(pipeline.transform)`. Easy. We already enjoy that in the vanilla-pipeline
### Case B: `pipeline.fit`
Here we have 2 situations:
1. `estimator_[n].fit` followed by `estimator_[n].transform` is an 1:N operation. Once a `estimator_[n].fit` is done, we need to be able to take all the samples used in this operation and `map` them again into a `estimator_[n].transform` so we can enjoy parallellization. The only way I see this working is by making samples as input of Mixin class (in the __init__). In this we could `dask.delayed(estimator_[n].fit)([sample_set])` and `dask.bag.from_sequence([sample_set]).map_partitions(estimator_[n].transform)`. This basically would break the scikit `API` :-(
2. `estimator_[n].transform` followed by `estimator_[n+1].fit` is an N:1 operation. We need to be able to concatenate samples from `estimator_[n].transform` to pass them to the followed `estimator_[n+1].fit`. I don't see how this can work ONLY WITH MIXIN'S. We need to have some higher level entitty (possibly an extension of the scikit.pipelines that i wrote in the last MR) to orchestrate this.
Well, that's it.
I hope I provided enough details for discussion
ping @andre.anjos @amohammadi
thankshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/31Make a sampleset work transparently with list of DelayedSamples2020-05-25T08:48:52ZTiago de Freitas PereiraMake a sampleset work transparently with list of DelayedSamplesThis makes us able to make
`SampleSet([Sample(1),Sample(2)])` equals to `SampleSet(DelayedSample(load_function))`
where `load_function` loads `[Sample(1),Sample(2)]`This makes us able to make
`SampleSet([Sample(1),Sample(2)])` equals to `SampleSet(DelayedSample(load_function))`
where `load_function` loads `[Sample(1),Sample(2)]`https://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/89Load checkpointed estimators inside the scheduler2022-05-09T17:40:41ZAmir MOHAMMADILoad checkpointed estimators inside the schedulerAlso adds resilience to loading checkpointed samplesAlso adds resilience to loading checkpointed samplesThe Great DeprecationAmir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/48Improvements on CheckpointWrapper2020-11-22T18:40:31ZTiago de Freitas PereiraImprovements on CheckpointWrapperAdded the optional argument `hash_fn` in the `CheckpointWrapper` class.
Once this is set, `sample.key` generates a hash code and this hash code is used to compose the final path where `sample` will be checkpointed.
This is optional and ...Added the optional argument `hash_fn` in the `CheckpointWrapper` class.
Once this is set, `sample.key` generates a hash code and this hash code is used to compose the final path where `sample` will be checkpointed.
This is optional and generic enough for our purposes.
This hash function can be shipped in the database interface.
Closes https://gitlab.idiap.ch/bob/bob.pipelines/-/issues/25Bob 9.0.0Amir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/63Implemented a mechanism in the Checkpoint wrapper that asserts if data was...2021-10-29T15:34:58ZTiago de Freitas PereiraImplemented a mechanism in the Checkpoint wrapper that asserts if data was...Implemented a mechanism in the Checkpoint wrapper that asserts if data was properly written in the disk
Closes #31Implemented a mechanism in the Checkpoint wrapper that asserts if data was properly written in the disk
Closes #31Amir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/80Implemented a force mechanism2022-01-13T13:09:57ZTiago de Freitas PereiraImplemented a force mechanismCreated a `force` option for the CheckpointWrapper
Related to:
https://gitlab.idiap.ch/bob/bob.bio.base/-/issues/173Created a `force` option for the CheckpointWrapper
Related to:
https://gitlab.idiap.ch/bob/bob.bio.base/-/issues/173Amir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/62Implement a new simple generic csv-based database interface2021-03-22T16:13:46ZAmir MOHAMMADIImplement a new simple generic csv-based database interfaceDepends on https://gitlab.idiap.ch/bob/bob.extension/-/merge_requests/126Depends on https://gitlab.idiap.ch/bob/bob.extension/-/merge_requests/126Bob 9.0.0Tiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/94[hotfix] [Dask] Add configuration to access multithreaded queue2022-06-14T15:15:30ZLaurent COLBOIS[hotfix] [Dask] Add configuration to access multithreaded queueThis adds an additional queue configuration that asks for multithreaded jobs, thus doubling the virtual memory limit to 16GB.
With this fix I can run heavy Tensorflow baselines without OOM crash.
Long term it would be better to:
* Expos...This adds an additional queue configuration that asks for multithreaded jobs, thus doubling the virtual memory limit to 16GB.
With this fix I can run heavy Tensorflow baselines without OOM crash.
Long term it would be better to:
* Expose the exact thread multiplier (now 2 by default)
* Use the resource tagging of transformers to have the queue dynamically selected based on the transformerhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/1Handling tasks with in heterogeneous workers2020-02-11T15:50:27ZTiago de Freitas PereiraHandling tasks with in heterogeneous workersHi guys,
This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.
**Solving 1**: To launch Heterogeneous jobs, I had to extend `dask_jobqueue.core.JobQueueClust...Hi guys,
This MR solves the issue with 1-) automatically launch heterogeneous SGE workers and 2-) assign specific tasks to specific workers.
**Solving 1**: To launch Heterogeneous jobs, I had to extend `dask_jobqueue.core.JobQueueCluster` in such way that every time that the pool of workers needs to be increased,
a new "worker_spec" is defined (https://github.com/dask/distributed/blob/ddbec38ba1ec6de913ccbfcd090f1c85eea1b032/distributed/deploy/spec.py#L443).
This is not the default behaviour of `dask_jobqueue`, where it's assumed that everything is homogeneous.
**Solving 2**: To make the scheduler assign a `dask.delayed` task to one specific worker it was supposed to be simple.
In case `dask.workers` are manually launched, the script `dask-worker` has an argument called `--resources` where you can add a personal TAG to the worker https://distributed.dask.org/en/latest/resources.html#example.
Once this is done, a tuple containing the `dask-delayed` hash and the personal TAG can be passed to the method `dask.delayed.compute` (https://distributed.dask.org/en/latest/resources.html#resources-with-collections).
Then, everything works nicely.
**HOWEVER**, our `dask.workers` are launched by the `dask_jobqueue` and there's no way to amend the `--resource` command-line argument to it.
I will propose a patch https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/core.py#L77 to solve this issue.
In the mean time, I implemented a work around here https://gitlab.idiap.ch/bob/bob.pipelines/blob/20d1732da5ce341db48a49da49f14b21ddaa2f67/bob/pipelines/distributed/sge.py#L18
It works nicely.
Follow a snippet to use this feature (i will amend it in a test case somehow)
```python
from bob.pipelines.distributed.sge import SGEIdiapCluster, sge_submit_iobig_gpu
from dask.distributed import Client
import dask
from time import sleep
import os
# Defining dummy jobs
def inc(x):
sleep(1)
return x + 1
def double(x):
sleep(1)
print(os.system("nvidia-smi"))
return x + 2
def add(x, y):
sleep(1)
return x + y
# Defining my graph
X = [1,2,3,4,5,6,7]
output = []
resources = dict()
for x in X:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
# LOOK HERE
# Annotating certain jobs to run in the GPU
resources[tuple(b.__dask_keys__())] = {'GPU':1}
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
# Submitting 2 jobs in the iobig and one at the GPU queue
client = sge_submit_iobig_gpu(n_jobs_iobig=2, n_jobs_gpu=1)
# Run the graph with a resource specification
print(total.compute(scheduler=client, resources=resources))
pass
```
ping @andre.anjoshttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/70Handle estimator tags in wrapper classes2022-03-18T09:09:58ZYannick DAYERHandle estimator tags in wrapper classesAllows setting some parameters of the `SampleWrapper` and `CheckpointWrapper` via estimator tags.
bob.bio.base#143Allows setting some parameters of the `SampleWrapper` and `CheckpointWrapper` via estimator tags.
bob.bio.base#143Yannick DAYERYannick DAYERhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/66Handled failed processing (Failure to Acquire) in the wrappers2021-10-29T15:34:58ZYannick DAYERHandled failed processing (Failure to Acquire) in the wrappersFixes #32Fixes #32Amir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/20Get SampleSet to accept DelayedSample2020-04-06T06:25:28ZYannick DAYERGet SampleSet to accept DelayedSampleQuick fix to accept both Sample and DelayedSample objects in
SampleSet.insert.
Fixes #9Quick fix to accept both Sample and DelayedSample objects in
SampleSet.insert.
Fixes #9Tiago de Freitas PereiraTiago de Freitas Pereirahttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/36For some reason, the class information is not passed in the sample wrapper2020-08-31T10:13:47ZTiago de Freitas PereiraFor some reason, the class information is not passed in the sample wrapperhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/100Fix the doctest of xarray failing on python 3.82022-07-22T10:15:12ZYannick DAYERFix the doctest of xarray failing on python 3.8Yannick DAYERYannick DAYERhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/44Fix sphinx warnings2020-11-09T14:48:34ZAmir MOHAMMADIFix sphinx warningsConda-based CIAmir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/68Fix parent's delayed_attributes modified by child2021-10-29T15:34:58ZYannick DAYERFix parent's delayed_attributes modified by childA `DelayedSample` child's `delayed_attributes` is no longer referencing the parent's `delayed_attributes`.A `DelayedSample` child's `delayed_attributes` is no longer referencing the parent's `delayed_attributes`.Amir MOHAMMADIAmir MOHAMMADIhttps://gitlab.idiap.ch/bob/bob.pipelines/-/merge_requests/79Fixing compatibility issues with dask_jobqueue=0.7.22021-11-30T18:25:54ZTiago de Freitas PereiraFixing compatibility issues with dask_jobqueue=0.7.2closes #37
Unfortunately we can't test this on the CI (there's no SGE there)closes #37
Unfortunately we can't test this on the CI (there's no SGE there)