Skip to content
Snippets Groups Projects
Commit ddcb1ddb authored by Manuel Guenther's avatar Manuel Guenther
Browse files

Added local clients and modified local setup

parent 4125db62
No related branches found
No related tags found
No related merge requests found
Pipeline #56705 failed
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(16)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(32)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(4)
from bob.pipelines.distributed import get_local_parallel_client
dask_client = get_local_parallel_client(8)
from multiprocessing import cpu_count
from bob.pipelines.distributed import get_local_parallel_client
from dask.distributed import Client, LocalCluster
n_nodes = cpu_count()
threads_per_worker = 1
cluster = LocalCluster(
nanny=False,
processes=True,
n_workers=1,
threads_per_worker=threads_per_worker,
)
cluster.scale_up(n_nodes)
dask_client = Client(cluster) # start local workers as threads
dask_client = get_local_parallel_client()
......@@ -66,3 +66,32 @@ def dask_get_partition_size(cluster, n_objects, lower_bound=200):
if n_objects > max_jobs
else n_objects
)
def get_local_parallel_client(n_workers=None, processes=True, threads_per_worker=1):
"""Returns a local Dask client with the given parameters, see the dask documentation for details: https://docs.dask.org/en/latest/how-to/deploy-dask/single-distributed.html?highlight=localcluster#localcluster
Parameters
----------
n_workers: int or None
The number of workers (processes) to use; if `None`, take as many processors as we have on the system
processes: boolean
Shall the dask client start processes (True, recommended) or threads (False). Note that threads in pyton do not run in parallel
threads_per_worker: int
How many threads (not processes) per worker to you allow?
"""
from multiprocessing import cpu_count
from dask.distributed import Client, LocalCluster
n_workers = n_workers or cpu_count()
cluster = LocalCluster(
processes=processes,
n_workers=n_workers,
threads_per_worker=threads_per_worker,
)
return Client(cluster)
......@@ -39,6 +39,10 @@ setup(
entry_points={
"dask.client": [
"local-parallel = bob.pipelines.config.distributed.local_parallel:dask_client",
"local-p4 = bob.pipelines.config.distributed.local_p4:dask_client",
"local-p8 = bob.pipelines.config.distributed.local_p8:dask_client",
"local-p16 = bob.pipelines.config.distributed.local_p16:dask_client",
"local-p32 = bob.pipelines.config.distributed.local_p32:dask_client",
"sge = bob.pipelines.config.distributed.sge_default:dask_client",
"sge-gpu = bob.pipelines.config.distributed.sge_gpu:dask_client",
],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment