Skip to content
Snippets Groups Projects
Commit f471c204 authored by André MAYORAZ's avatar André MAYORAZ
Browse files

Merge branch 'fix-jobqueue' into 'master'

Update deprecated dask-jobqueue names

See merge request !106
parents 8c9c8e52 4d2bc361
No related branches found
No related tags found
1 merge request!106Update deprecated dask-jobqueue names
Pipeline #69067 failed
......@@ -38,7 +38,7 @@ class SGEIdiapJob(Job):
queue=None,
project=rc.get("sge.project"),
resource_spec=None,
job_extra=None,
job_extra_directives=None,
config_name="sge",
**kwargs,
):
......@@ -51,8 +51,10 @@ class SGEIdiapJob(Job):
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % config_name
)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)
if job_extra_directives is None:
job_extra_directives = dask.config.get(
"jobqueue.%s.job-extra-directives" % config_name
)
# Resources
resources = kwargs.pop("resources", None)
......@@ -84,7 +86,7 @@ class SGEIdiapJob(Job):
header_lines.append("#$ -e %(log_directory)s/")
header_lines.append("#$ -o %(log_directory)s/")
header_lines.extend(["#$ -cwd", "#$ -j y"])
header_lines.extend(["#$ %s" % arg for arg in job_extra])
header_lines.extend(["#$ %s" % arg for arg in job_extra_directives])
header_template = "\n".join(header_lines)
config = {
......@@ -154,7 +156,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
dashboard_address: str
Default port for the dask dashboard,
env_extra: str,
job_script_prologue: str,
Extra environment variables to send to the workers
sge_job_spec: dict
......@@ -236,7 +238,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
log_directory="./logs",
protocol="tcp://",
dashboard_address=":8787",
env_extra=None,
job_script_prologue=None,
sge_job_spec=QUEUE_DEFAULT,
min_jobs=1,
project=rc.get("sge.project"),
......@@ -256,11 +258,13 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
host = None
security = None
if env_extra is None:
env_extra = []
elif not isinstance(env_extra, list):
env_extra = [env_extra]
self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)]
if job_script_prologue is None:
job_script_prologue = []
elif not isinstance(job_script_prologue, list):
job_script_prologue = [job_script_prologue]
self.job_script_prologue = job_script_prologue + [
"export PYTHONPATH=" + ":".join(sys.path)
]
scheduler = {
"cls": SchedulerResourceRestriction, # Use local scheduler for now
......@@ -315,7 +319,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
"queue": queue,
"project": self.project,
"memory": job_spec.get("memory", ""),
"job_extra": job_spec.get("job_extra", None),
"job_extra_directives": job_spec.get("job_extra_directives", None),
"cores": 1,
"processes": 1,
"log_directory": self.log_directory,
......@@ -325,7 +329,7 @@ class SGEMultipleQueuesCluster(JobQueueCluster):
"protocol": self.protocol,
"security": None,
"resources": job_spec.get("resources", ""),
"env_extra": self.env_extra,
"job_script_prologue": self.job_script_prologue,
}
def scale(self, n_jobs, sge_job_spec_key="default"):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment