Commit 3082f4ed authored by Manuel Günther's avatar Manuel Günther

Fixed possible dead lock when scheduler is interrupted.

parent 45e0868f
......@@ -123,13 +123,13 @@ class JobManagerLocal(JobManager):
command = [self.wrapper_script, '-ld', self._database, 'run-job']
job = self.get_jobs((job_id,))[0]
logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id), job.name))
logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id, len(job.array)), job.name))
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id), e, job.get_command_line()))
logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id, len(job.array)), e, job.get_command_line()))
job.finish(117, array_id) # ASCII 'O'
return None
......@@ -153,6 +153,7 @@ class JobManagerLocal(JobManager):
log_dir = job.log_dir if not no_log and job is not None else None
job_id = job.id
array_id = array_job.id if array_job else None
array_count = len(job.array) if job is not None else 0
self.unlock()
if log_dir:
......@@ -164,13 +165,13 @@ class JobManagerLocal(JobManager):
write(err, sys.stderr, process.stderr)
if log_dir:
j = self._format_log(job_id, array_id)
j = self._format_log(job_id, array_id, array_count)
logger.debug("Wrote output of job '%s' to file '%s'" % (j,out))
logger.debug("Wrote errors of job '%s' to file '%s'" % (j,err))
def _format_log(self, job_id, array_id = None):
return ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
def _format_log(self, job_id, array_id = None, array_count = 0):
return ("%d (%d/%d)" % (job_id, array_id, array_count)) if array_id is not None and array_count else ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
......@@ -260,4 +261,6 @@ class JobManagerLocal(JobManager):
for task in running_tasks:
logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
task[0].kill()
if hasattr(self, 'session'):
self.unlock()
self.stop_job(task[1], task[2] if len(task) > 2 else None)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment