Skip to content
Snippets Groups Projects
Commit cbefdaf3 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

Merge branch '38-local-parallel-queue-is-not-setup-well' into 'master'

Resolve "local-parallel queue is not setup well"

Closes #38

See merge request !77
parents 3ef187c3 eab1b1b7
No related branches found
No related tags found
1 merge request!77Resolve "local-parallel queue is not setup well"
Pipeline #56786 failed
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(16)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(32)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(4)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(8)
from multiprocessing import cpu_count
from bob.pipelines.distributed import get_local_parallel_client
from dask.distributed import Client, LocalCluster
n_nodes = cpu_count()
threads_per_worker = 1
cluster = LocalCluster(
nanny=False,
processes=False,
n_workers=1,
threads_per_worker=threads_per_worker,
)
cluster.scale_up(n_nodes)
dask_client = Client(cluster) # start local workers as threads
dask_client = get_local_parallel_client()
......@@ -66,3 +66,30 @@ def dask_get_partition_size(cluster, n_objects, lower_bound=200):
if n_objects > max_jobs
else n_objects
)
def get_local_parallel_client(parallel=None, processes=True):
"""Returns a local Dask client with the given parameters, see the dask documentation for details: https://docs.dask.org/en/latest/how-to/deploy-dask/single-distributed.html?highlight=localcluster#localcluster
Parameters
----------
parallel: int or None
The number of workers (processes or threads) to use; if `None`, take as many processors as we have on the system
processes: boolean
Shall the dask client start processes (True, recommended) or threads (False). Note that threads in pure pyton do not run in parallel, see: https://www.quantstart.com/articles/Parallelising-Python-with-Threading-and-Multiprocessing/
"""
from multiprocessing import cpu_count
from dask.distributed import Client, LocalCluster
parallel = parallel or cpu_count()
cluster = LocalCluster(
processes=processes,
n_workers=parallel if processes else 1,
threads_per_worker=1 if processes else parallel,
)
return Client(cluster)
......@@ -39,6 +39,10 @@ setup(
entry_points={
"dask.client": [
"local-parallel = bob.pipelines.config.distributed.local_parallel:dask_client",
"local-p4 = bob.pipelines.config.distributed.local_p4:dask_client",
"local-p8 = bob.pipelines.config.distributed.local_p8:dask_client",
"local-p16 = bob.pipelines.config.distributed.local_p16:dask_client",
"local-p32 = bob.pipelines.config.distributed.local_p32:dask_client",
"sge = bob.pipelines.config.distributed.sge_default:dask_client",
"sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client",
],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment