Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
bob.bio.base
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
bob
bob.bio.base
Commits
a2bc11ba
Commit
a2bc11ba
authored
3 years ago
by
Yannick DAYER
Browse files
Options
Downloads
Patches
Plain Diff
[refactor] isort
parent
95ee08d1
No related branches found
No related tags found
1 merge request
!290
PipelineSimple partitioning fixes
Pipeline
#60905
passed
3 years ago
Stage: build
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
bob/bio/base/pipelines/entry_points.py
+13
-5
13 additions, 5 deletions
bob/bio/base/pipelines/entry_points.py
with
13 additions
and
5 deletions
bob/bio/base/pipelines/entry_points.py
+
13
−
5
View file @
a2bc11ba
...
@@ -18,8 +18,8 @@ from bob.bio.base.pipelines import (
...
@@ -18,8 +18,8 @@ from bob.bio.base.pipelines import (
is_checkpointed
,
is_checkpointed
,
)
)
from
bob.pipelines.distributed
import
dask_get_partition_size
from
bob.pipelines.distributed
import
dask_get_partition_size
from
bob.pipelines.utils
import
is_estimator_stateless
,
isinstance_nested
from
bob.pipelines.distributed.sge
import
SGEMultipleQueuesCluster
from
bob.pipelines.distributed.sge
import
SGEMultipleQueuesCluster
from
bob.pipelines.utils
import
is_estimator_stateless
,
isinstance_nested
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
...
@@ -185,11 +185,15 @@ def execute_pipeline_simple(
...
@@ -185,11 +185,15 @@ def execute_pipeline_simple(
if
dask_partition_size
is
not
None
:
if
dask_partition_size
is
not
None
:
# Create partitions of the same defined size for each Set
# Create partitions of the same defined size for each Set
n_objects
=
max
(
n_objects
=
max
(
len
(
background_model_samples
),
len
(
biometric_references
),
len
(
probes
)
len
(
background_model_samples
),
len
(
biometric_references
),
len
(
probes
),
)
)
partition_size
=
None
partition_size
=
None
if
not
isinstance
(
dask_client
,
str
):
if
not
isinstance
(
dask_client
,
str
):
partition_size
=
dask_get_partition_size
(
dask_client
.
cluster
,
n_objects
,
dask_partition_size
)
partition_size
=
dask_get_partition_size
(
dask_client
.
cluster
,
n_objects
,
dask_partition_size
)
logger
.
debug
(
"
Splitting data with fixed size partitions.
"
)
logger
.
debug
(
"
Splitting data with fixed size partitions.
"
)
pipeline
=
dask_pipeline_simple
(
pipeline
=
dask_pipeline_simple
(
pipeline
,
pipeline
,
...
@@ -206,11 +210,15 @@ def execute_pipeline_simple(
...
@@ -206,11 +210,15 @@ def execute_pipeline_simple(
# Split in max_jobs partitions or revert to the default behavior of
# Split in max_jobs partitions or revert to the default behavior of
# dask.Bag from_sequence: partition_size = 100
# dask.Bag from_sequence: partition_size = 100
n_jobs
=
None
n_jobs
=
None
if
not
isinstance
(
dask_client
,
str
)
and
isinstance
(
dask_client
.
cluster
,
SGEMultipleQueuesCluster
):
if
not
isinstance
(
dask_client
,
str
)
and
isinstance
(
dask_client
.
cluster
,
SGEMultipleQueuesCluster
):
logger
.
debug
(
logger
.
debug
(
"
Splitting data according to the number of available workers.
"
"
Splitting data according to the number of available workers.
"
)
)
n_jobs
=
dask_client
.
cluster
.
sge_job_spec
[
"
default
"
][
"
max_jobs
"
]
n_jobs
=
dask_client
.
cluster
.
sge_job_spec
[
"
default
"
][
"
max_jobs
"
]
logger
.
debug
(
f
"
{
n_jobs
}
partitions will be created.
"
)
logger
.
debug
(
f
"
{
n_jobs
}
partitions will be created.
"
)
pipeline
=
dask_pipeline_simple
(
pipeline
,
npartitions
=
n_jobs
)
pipeline
=
dask_pipeline_simple
(
pipeline
,
npartitions
=
n_jobs
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment