Commit 5d175660 authored by Manuel Günther's avatar Manuel Günther

Fixed bug that froze jobs in case of large output/error messages in local submission.

parent 07be4bab
......@@ -115,7 +115,7 @@ class JobManagerLocal(JobManager):
#####################################################################
###### Methods to run the jobs in parallel on the local machine #####
def _run_parallel_job(self, job_id, array_id = None):
def _run_parallel_job(self, job_id, array_id = None, no_log = False):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ['JOB_ID'] = str(job_id)
......@@ -127,54 +127,28 @@ class JobManagerLocal(JobManager):
# generate call to the wrapper script
command = [self.wrapper_script, '-ld', self._database, 'run-job']
job = self.get_jobs((job_id,))[0]
job, array_job = self._job_and_array(job_id, array_id)
logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id, len(job.array)), job.name))
# create log files
if no_log or job.log_dir is None:
out, err = sys.stdout, sys.stderr
else:
makedirs_safe(job.log_dir)
# create unbuffered files for writing output and error status
if array_job is not None:
out, err = open(array_job.std_out_file(), 'w', 0), open(array_job.std_err_file(), 'w', 0)
else:
out, err = open(job.std_out_file(), 'w', 0), open(job.std_err_file(), 'w', 0)
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return subprocess.Popen(command, env=environ, stdout=out, stderr=err)
except OSError as e:
logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id, len(job.array)), e, job.get_command_line()))
job.finish(117, array_id) # ASCII 'O'
return None
def _result_files(self, process, job_id, array_id = None, no_log = False):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
def write(file, std, process):
f = std if file is None else open(str(file), 'w')
f.write(str_(process.read()))
self.lock()
# get the files to write to
job, array_job = self._job_and_array(job_id, array_id)
if job is None or no_log:
out, err = None, None
elif array_job is not None:
out, err = array_job.std_out_file(), array_job.std_err_file()
else:
out, err = job.std_out_file(), job.std_err_file()
log_dir = job.log_dir if not no_log and job is not None else None
job_id = job.id
array_id = array_job.id if array_job else None
array_count = len(job.array) if job is not None else 0
self.unlock()
if log_dir:
makedirs_safe(log_dir)
# write stdout
write(out, sys.stdout, process.stdout)
# write stderr
write(err, sys.stderr, process.stderr)
if log_dir:
j = self._format_log(job_id, array_id, array_count)
logger.debug("Wrote output of job '%s' to file '%s'" % (j,out))
logger.debug("Wrote errors of job '%s' to file '%s'" % (j,err))
def _format_log(self, job_id, array_id = None, array_count = 0):
return ("%d (%d/%d)" % (job_id, array_id, array_count)) if array_id is not None and array_count else ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
......@@ -183,7 +157,9 @@ class JobManagerLocal(JobManager):
running_tasks = []
try:
# keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught
while True:
# Flag that might be set in some rare cases, and that prevents the scheduler to die
repeat_execution = False
# FIRST, try if there are finished processes; this does not need a lock
......@@ -195,10 +171,8 @@ class JobManagerLocal(JobManager):
# process ended
job_id = task[1]
array_id = task[2] if len(task) > 2 else None
# report the result
self._result_files(process, job_id, array_id, no_log)
logger.info("Job '%s' finished execution" % self._format_log(job_id, array_id))
logger.info("Job '%s' finished execution" % self._format_log(job_id, array_id))
# in any case, remove the job from the list
del running_tasks[task_index]
......@@ -226,7 +200,7 @@ class JobManagerLocal(JobManager):
for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(job.id, array_job.id)
process = self._run_parallel_job(job.id, array_job.id, no_log=no_log)
if process is None:
continue
running_tasks.append((process, job.id, array_job.id))
......@@ -239,7 +213,7 @@ class JobManagerLocal(JobManager):
else:
if job.status == 'queued':
# start a new job
process = self._run_parallel_job(job.id)
process = self._run_parallel_job(job.id, no_log=no_log)
if process is None:
continue
running_tasks.append((process, job.id))
......
......@@ -95,9 +95,9 @@ class GridTKTest(unittest.TestCase):
self.assertEqual(len(jobs), 2)
self.assertEqual(jobs[0].status, 'failure')
self.assertEqual(jobs[1].status, 'queued')
# the result files should not be there yet
self.assertFalse(os.path.exists(jobs[0].std_out_file()))
self.assertFalse(os.path.exists(jobs[0].std_err_file()))
# the result files should already be there
self.assertTrue(os.path.exists(jobs[0].std_out_file()))
self.assertTrue(os.path.exists(jobs[0].std_err_file()))
job_manager.unlock()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment