Commit dc2ed0d5 authored by Manuel Günther's avatar Manuel Günther

Trial to stop dependent jobs on failure.

parent 15cfb503
from __future__ import print_function
import os
import os, sys
import subprocess
import socket # to get the host name
from .models import Base, Job, ArrayJob, Status
......@@ -127,26 +127,6 @@ class JobManager:
# set the 'executing' status to the job
job.execute(array_id, machine_name)
""" This is currently not working, as it seems...
if job.status == 'failure':
# there has been a dependent job that has failed before
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs[0]
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.id for dep in new])
self.unlock()
try:
self.stop_jobs(list(dependent_job_ids))
logger.warn("Stopped dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
except:
pass
return
"""
self.session.commit()
except Exception:
pass
......@@ -163,16 +143,17 @@ class JobManager:
try:
result = subprocess.call(command_line)
except Exception as e:
logger.error("The job with id '%d' could not be executed: %s" % (job_id, e))
print("ERROR: The job with id '%d' could not be executed: %s" % (job_id, e), file=sys.stderr)
result = 69 # ASCII: 'E'
# set a new status and the results of the job
try:
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
logger.error("The job with id '%d' could not be found in the database!" % job_id)
print("ERROR: The job with id '%d' could not be found in the database!" % job_id, file=sys.stderr)
self.unlock()
return
......@@ -180,10 +161,32 @@ class JobManager:
job.finish(result, array_id)
self.session.commit()
except Exception:
# This might not be working properly, so use with care!
if job.stop_on_failure and job.status == 'failure':
# the job has failed
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.unique for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs.pop(0)
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.unique for dep in new])
self.unlock()
deps = sorted(list(dependent_job_ids))
self.stop_jobs(deps)
print ("WARNING: Stopped dependent jobs '%s' since this job failed." % str(deps), file=sys.stderr)
except Exception as e:
print ("ERROR: Caught exception '%s'" % e)
pass
finally:
self.unlock()
if hasattr(self, 'session'):
self.unlock()
def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, status=Status, names=None, ids_only=False):
......
......@@ -189,11 +189,7 @@ class JobManagerSGE(JobManager):
if job.status in ('executing', 'queued', 'waiting'):
qdel(job.id, context=self.context)
logger.info("Stopped job '%s' in the SGE grid." % job)
job.status = 'submitted'
for array_job in job.array:
if array_job.status in ('executing', 'queued', 'waiting'):
array_job.status = 'submitted'
job.submit()
self.session.commit()
self.unlock()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment