Commit b777993c authored by Manuel Günther's avatar Manuel Günther

Corrected the job submission (and re-submission) to handle unique ID's (instead of grid ID's).

parent 2635bf20
......@@ -54,13 +54,14 @@ class JobManagerSGE(JobManager):
jman = self.wrapper_script
python = sys.executable
# remove duplicate dependencies
dependencies = sorted(list(set(dependencies)))
# get the grid id's for the dependencies and remove duplicates
dependent_jobs = self.get_jobs(dependencies)
deps = sorted(list(set([j.id for j in dependent_jobs])))
# generate call to the wrapper script
command = make_shell(python, [jman, '-d', self._database, 'run-job'])
q_array = "%d-%d:%d" % array if array else None
grid_id = qsub(command, context=self.context, name=name, deps=dependencies, array=q_array, stdout=log_dir, stderr=log_dir, **kwargs)
grid_id = qsub(command, context=self.context, name=name, deps=deps, array=q_array, stdout=log_dir, stderr=log_dir, **kwargs)
# get the result of qstat
status = qstat(grid_id, context=self.context)
......@@ -76,7 +77,7 @@ class JobManagerSGE(JobManager):
logger.warn("This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." % kwargs['queue'] if 'queue' in kwargs else 'all.q')
assert job.id == grid_id
return grid_id
return job.unique
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
......@@ -109,7 +110,7 @@ class JobManagerSGE(JobManager):
jobs = self.get_jobs(job_ids)
for job in jobs:
job.refresh()
if job.status in ('queued', 'executing', 'waiting'):
if job.status in ('queued', 'executing', 'waiting') and job.queue_name != 'local':
status = qstat(job.id, context=self.context)
if len(status) == 0:
job.status = 'failure'
......@@ -147,7 +148,7 @@ class JobManagerSGE(JobManager):
if job.queue_name == 'local' and 'queue' not in arguments:
logger.warn("Re-submitting job '%s' locally (since no queue name is specified)." % job)
else:
deps = [dep.id for dep in job.get_jobs_we_wait_for()]
deps = [dep.unique for dep in job.get_jobs_we_wait_for()]
logger.debug("Re-submitting job '%s' with dependencies '%s' to the grid." % (job, deps))
self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, **arguments)
......@@ -159,8 +160,11 @@ class JobManagerSGE(JobManager):
"""Overwrites the run-job command from the manager to extract the correct job id before calling base class implementation."""
# get the unique job id from the given grid id
self.lock()
job = self.session.query(Job).filter(Job.id == job_id)
job_id = list(job)[0].unique
jobs = list(self.session.query(Job).filter(Job.id == job_id))
if len(jobs) != 1:
self.unlock()
raise ValueError("Could not find job id '%d' in the database'" % job_id)
job_id = jobs[0].unique
self.unlock()
# call base class implementation with the corrected job id
return JobManager.run_job(self, job_id, array_id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment