Commit 2ac741d8 authored by Manuel Günther's avatar Manuel Günther

Added support for --stop-on-failure option; bug fixes in the local scheduler;...

Added support for --stop-on-failure option; bug fixes in the local scheduler; updated documentation; small improvements.
parent 5bbf0cd9
This diff is collapsed.
......@@ -13,7 +13,7 @@ import copy, os, sys
import gdbm, anydbm
from cPickle import dumps, loads
from .tools import makedirs_safe, logger, try_get_contents, try_remove_files
from .tools import makedirs_safe, logger
from .manager import JobManager
......@@ -34,15 +34,23 @@ class JobManagerLocal(JobManager):
JobManager.__init__(self, **kwargs)
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, **kwargs):
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed on the local machine during a call to "run".
All kwargs will simply be ignored."""
# add job to database
self.lock()
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir)
logger.debug("Added job '%s' to the database" % job)
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir, stop_on_failure=stop_on_failure)
logger.info("Added job '%s' to the database" % job)
if dry_run:
print "Would have added the Job", job, "to the database to be executed locally."
self.session.delete(job)
logger.info("Deleted job '%s' from the database due to dry-run option" % job)
job_id = None
else:
job_id = job.id
# return the new job id
job_id = job.id
self.unlock()
return job_id
......@@ -57,8 +65,8 @@ class JobManagerLocal(JobManager):
# check if this job needs re-submission
if running_jobs or job.status in accepted_old_status:
# re-submit job to the grid
logger.debug("Re-submitted job '%s' to the database" % job)
job.submit()
logger.info("Re-submitted job '%s' to the database" % job)
job.submit('local')
self.session.commit()
self.unlock()
......@@ -71,7 +79,7 @@ class JobManagerLocal(JobManager):
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
logger.debug("Reset job '%s' in the database" % job)
logger.info("Reset job '%s' in the database" % job)
job.status = 'submitted'
self.session.commit()
......@@ -83,7 +91,7 @@ class JobManagerLocal(JobManager):
job, array_job = self._job_and_array(job_id, array_id)
if job.status == 'executing':
logger.debug("Reset job '%s' in the database" % job)
logger.info("Reset job '%s' in the database" % job)
job.status = 'submitted'
if array_job is not None and array_job.status == 'executing':
......@@ -119,21 +127,23 @@ class JobManagerLocal(JobManager):
return None
def _result_files(self, process, job_id, array_id = None):
def _result_files(self, process, job_id, array_id = None, no_log = False):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
def write(file, process, std):
def write(file, std, process):
f = std if file is None else open(str(file), 'w')
f.write(process.read())
self.lock()
# get the files to write to
job, array_job = self._job_and_array(job_id, array_id)
if array_job:
if no_log:
out, err = None, None
elif array_job:
out, err = array_job.std_out_file(), array_job.std_err_file()
else:
out, err = job.std_out_file(), job.std_err_file()
log_dir = job.log_dir
log_dir = job.log_dir if not no_log else None
job_id = job.id
array_id = array_job.id if array_job else None
self.unlock()
......@@ -142,9 +152,9 @@ class JobManagerLocal(JobManager):
makedirs_safe(log_dir)
# write stdout
write(out, process.stdout, sys.stdout)
write(out, sys.stdout, process.stdout)
# write stderr
write(err, process.stderr, sys.stderr)
write(err, sys.stderr, process.stderr)
if log_dir:
j = self._format_log(job_id, array_id)
......@@ -155,12 +165,14 @@ class JobManagerLocal(JobManager):
def _format_log(self, job_id, array_id = None):
return ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
def run_scheduler(self, parallel_jobs = 1, sleep_time = 0.1):
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
running_tasks = []
try:
while True:
# Flag that might be set in some rare cases, and that prevents the scheduler to die
repeat_execution = False
# FIRST, try if there are finished processes; this does not need a lock
for task_index in range(len(running_tasks)-1, -1, -1):
task = running_tasks[task_index]
......@@ -170,7 +182,7 @@ class JobManagerLocal(JobManager):
job_id = task[1]
array_id = task[2] if len(task) > 2 else None
# report the result
self._result_files(process, job_id, array_id)
self._result_files(process, job_id, array_id, no_log)
logger.info("Job '%s' finished execution" % self._format_log(job_id, array_id))
# in any case, remove the job from the list
......@@ -180,18 +192,25 @@ class JobManagerLocal(JobManager):
if len(running_tasks) < parallel_jobs:
# get all unfinished jobs:
self.lock()
jobs = self.get_jobs()
jobs = self.get_jobs(job_ids)
# put all new jobs into the queue
for job in jobs:
if job.status == 'submitted':
job.queue()
# get all unfinished jobs
unfinished_jobs = [job for job in jobs if job.status in ('queued', 'executing')]
# get all unfinished jobs that are submitted to the local queue
unfinished_jobs = [job for job in jobs if job.status in ('queued', 'executing') and job.queue_name == 'local']
for job in unfinished_jobs:
if job.array:
# find array jobs that can run
for array_job in job.array:
if array_job.status == 'queued':
queued_array_jobs = [array_job for array_job in job.array if array_job.status == 'queued']
if not len(queued_array_jobs):
job.finish(0, -1)
repeat_execution = True
else:
# there are new array jobs to run
for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(job.id, array_job.id)
running_tasks.append((process, job.id, array_job.id))
......@@ -215,6 +234,11 @@ class JobManagerLocal(JobManager):
self.session.commit()
self.unlock()
# if after the submission of jobs there are no jobs running, we should have finished all the queue.
if die_when_finished and not repeat_execution and len(running_tasks) == 0:
logger.info("Stopping task scheduler since there are no more jobs running.")
break
# THIRD: sleep the desired amount of time before re-checking
time.sleep(sleep_time)
......
......@@ -13,6 +13,7 @@ class JobManager:
def __init__(self, database, wrapper_script = './bin/jman', debug = False):
self._database = os.path.realpath(database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=debug)
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
# store the command that this job manager was called with
self.wrapper_script = wrapper_script
......@@ -35,10 +36,10 @@ class JobManager:
"""Generates (and returns) a blocking session object to the database."""
if hasattr(self, 'session'):
raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
Session = sqlalchemy.orm.sessionmaker()
if not os.path.exists(self._database):
self._create()
self.session = Session(bind=self._engine)
# now, create a session
self.session = self._session_maker()
logger.debug("Created new database session to '%s'" % self._database)
return self.session
......@@ -46,8 +47,8 @@ class JobManager:
"""Closes the session to the database."""
if not hasattr(self, 'session'):
raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
self.session.close()
logger.debug("Closed database session of '%s'" % self._database)
self.session.close()
del self.session
......@@ -63,6 +64,7 @@ class JobManager:
logger.debug("Created new empty database '%s'" % self._database)
def get_jobs(self, job_ids = None):
"""Returns a list of jobs that are stored in the database."""
q = self.session.query(Job)
......@@ -100,6 +102,25 @@ class JobManager:
# set the 'executing' status to the job
job.execute(array_id)
if job.status == 'failure':
# there has been a dependent job that has failed before
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.id])
while len(dependent_jobs):
dep = dependent_jobs[0]
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.id for dep in new])
self.unlock()
try:
self.stop_jobs(list(dependent_job_ids))
logger.warn("Deleted dependent jobs '%s' since this job failed.")
except:
pass
return
# get the command line of the job
command_line = job.get_command_line()
self.session.commit()
......@@ -116,6 +137,7 @@ class JobManager:
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
logger.error("The job with id '%d' could not be found in the database!" % job_id)
return
job = jobs[0]
......@@ -128,9 +150,17 @@ class JobManager:
def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False):
"""Lists the jobs currently added to the database."""
# configuration for jobs
fields = ("job-id", "queue", "status", "job-name", "arguments")
lengths = (20, 9, 14, 20, 43)
format = "{:^%d} {:^%d} {:^%d} {:^%d} {:^%d}" % lengths
if print_dependencies:
fields = ("job-id", "queue", "status", "job-name", "dependencies", "submitted command line")
lengths = (20, 9, 14, 20, 30, 43)
format = "{:^%d} {:^%d} {:^%d} {:^%d} {:^%d} {:<%d}" % lengths
dependency_length = lengths[4]
else:
fields = ("job-id", "queue", "status", "job-name", "submitted command line")
lengths = (20, 9, 14, 20, 43)
format = "{:^%d} {:^%d} {:^%d} {:^%d} {:<%d}" % lengths
dependency_length = 0
array_format = "{:>%d} {:^%d} {:^%d}" % lengths[:3]
delimiter = format.format(*['='*k for k in lengths])
array_delimiter = array_format.format(*["-"*k for k in lengths[:3]])
......@@ -143,7 +173,7 @@ class JobManager:
self.lock()
for job in self.get_jobs(job_ids):
print job.format(format, print_dependencies, None if long else 43)
print job.format(format, dependency_length, None if long else 43)
if print_array_jobs and job.array:
print array_delimiter
for array_job in job.array:
......@@ -232,10 +262,10 @@ class JobManager:
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
logger.debug("Deleting array job '%d' of job '%d'" % array_job.id, job.id)
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
_delete(array_job)
if not job.array:
logger.info("Deleting job '%d'" % job.id)
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
else:
......@@ -245,10 +275,10 @@ class JobManager:
# delete all array jobs
if job.array:
for array_job in job.array:
logger.debug("Deleting array job '%d' of job '%d'" % (array_job.id, job.id))
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
_delete(array_job)
# delete this job
logger.info("Deleting job '%d'" % job.id)
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
self.session.commit()
......
import sqlalchemy
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy import Table, Column, Integer, String, Boolean, ForeignKey
from bob.db.sqlalchemy_migration import Enum, relationship
from sqlalchemy.orm import backref
from sqlalchemy.ext.declarative import declarative_base
......@@ -64,25 +64,29 @@ class Job(Base):
id = Column(Integer, unique = True) # The ID of the job as given from the grid
log_dir = Column(String(255)) # The directory where the log files will be put to
array_string = Column(String(255)) # The array string (only needed for re-submission)
stop_on_failure = Column(Boolean) # An indicator whether to stop depending jobs when this job finishes with an error
status = Column(Enum(*Status))
result = Column(Integer)
def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', **kwargs):
def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', stop_on_failure = False, **kwargs):
"""Constructs a Job object without an ID (needs to be set later)."""
self.command_line = dumps(command_line)
self.name = name
self.queue_name = queue_name # will be set during the queue command later
self.grid_arguments = dumps(kwargs)
self.log_dir = log_dir
self.stop_on_failure = stop_on_failure
self.array_string = dumps(array_string)
self.submit()
def submit(self):
def submit(self, new_queue = None):
"""Sets the status of this job to 'submitted'."""
self.status = 'submitted'
self.result = None
if new_queue is not None:
self.queue_name = new_queue
for array_job in self.array:
array_job.status = 'submitted'
array_job.result = None
......@@ -103,13 +107,15 @@ class Job(Base):
self.result = None
# check if we have to wait for another job to finish
for job in self.get_jobs_we_wait_for():
if job is not None and job.status not in Status[-2:]:
if job.status not in ('success', 'failure'):
new_status = 'waiting'
elif self.stop_on_failure and job.status == 'failure':
new_status = 'failure'
# reset the queued jobs that depend on us to waiting status
for job in self.get_jobs_waiting_for_us():
if job is not None and job.status == 'queued':
job.status = 'waiting'
if job.status == 'queued':
job.status = 'failure' if new_status == 'failure' else 'waiting'
self.status = new_status
for array_job in self.array:
......@@ -124,6 +130,13 @@ class Job(Base):
if array_job.id == array_id:
array_job.status = 'executing'
# sometimes, the 'finish' command did not work for array jobs,
# so check if any old job still has the 'executing' flag set
for job in self.get_jobs_we_wait_for():
if job.array and job.status == 'executing':
job.finish(0, -1)
def finish(self, result, array_id = None):
"""Sets the status of this job to 'success' or 'failure'."""
......@@ -136,7 +149,7 @@ class Job(Base):
if array_job.id == array_id:
array_job.status = new_status
array_job.result = result
if array_job.status not in Status[-2:]:
if array_job.status not in ('success', 'failure'):
finished = False
elif new_result == 0:
new_result = array_job.result
......@@ -185,18 +198,22 @@ class Job(Base):
else: r = "%s" % self.status
return "%s : %s -- %s" % (n, r, " ".join(self.get_command_line()))
def format(self, format, add_dependencies = False, limit_command_line = None):
def format(self, format, dependencies = 0, limit_command_line = None):
"""Formats the current job into a nicer string to fit into a table."""
command_line = " ".join(self.get_command_line())
if add_dependencies:
command_line = str([dep.id for dep in self.get_jobs_we_wait_for()]) + " -- " + command_line
if limit_command_line is not None:
if limit_command_line is not None and len(command_line) > limit_command_line:
command_line = command_line[:limit_command_line-3] + '...'
job_id = "%d" % self.id + (" [%d-%d:%d]" % self.get_array() if self.array else "")
status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
return format.format(job_id, self.queue_name, status, self.name, command_line)
if dependencies:
deps = str([dep.id for dep in self.get_jobs_we_wait_for()])
if dependencies < len(deps):
deps = deps[:dependencies-3] + '...'
return format.format(job_id, self.queue_name, status, self.name, deps, command_line)
else:
return format.format(job_id, self.queue_name, status, self.name, command_line)
......@@ -218,9 +235,9 @@ class JobDependence(Base):
def add_job(session, command_line, name = 'job', dependencies = [], array = None, log_dir = None, **kwargs):
def add_job(session, command_line, name = 'job', dependencies = [], array = None, log_dir = None, stop_on_failure = False, **kwargs):
"""Helper function to create a job, add the dependencies and the array jobs."""
job = Job(command_line=command_line, name=name, log_dir=log_dir, array_string=array, kwargs=kwargs)
job = Job(command_line=command_line, name=name, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
session.add(job)
session.flush()
......
This diff is collapsed.
......@@ -69,14 +69,22 @@ class JobManagerSGE(JobManager):
return grid_id
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", **kwargs):
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed in the grid."""
# add job to database
self.lock()
job = add_job(self.session, command_line, name, dependencies, array, log_dir=log_dir, context=self.context, **kwargs)
logger.debug("Added job '%s' to the database." % job)
job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, **kwargs)
job = add_job(self.session, command_line, name, dependencies, array, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
logger.info("Added job '%s' to the database." % job)
if dry_run:
print "Would have added the Job"
print job
print "to the database to be executed in the grid with options:", str(kwargs)
self.session.delete(job)
logger.info("Deleted job '%s' from the database due to dry-run option" % job)
job_id = None
else:
job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, **kwargs)
self.session.commit()
self.unlock()
......@@ -84,6 +92,22 @@ class JobManagerSGE(JobManager):
return job_id
def communicate(self, job_ids = None):
"""Communicates with the SGE grid (using qstat) to see if jobs are still running."""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
status = qstat(job.id, context=self.context)
if len(status) == 0:
job.status = 'failure'
job.result = 70 # ASCII: 'F'
logger.warn("The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files." % job)
self.session.commit()
self.unlock()
def resubmit(self, job_ids = None, failed_only = False, running_jobs = False):
"""Re-submit jobs automatically"""
self.lock()
......@@ -111,7 +135,7 @@ class JobManagerSGE(JobManager):
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
if job.status in ('executing', 'queued', 'waiting'):
qdel(job.id, context=self.context)
logger.info("Stopped job '%s' in the SGE grid." % job)
job.status = 'submitted'
......
......@@ -90,6 +90,7 @@ class DatabaseTest(unittest.TestCase):
assert not os.path.exists(jobs[0].std_err_file())
job_manager.unlock()
# reset the job 1
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])
......@@ -132,10 +133,10 @@ class DatabaseTest(unittest.TestCase):
assert os.path.exists(self.log_dir)
# now, let the scheduler run all jobs
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2'])
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished'])
# ... and kill the scheduler
time.sleep(3)
self.scheduler_job.send_signal(signal.SIGINT)
assert self.scheduler_job.poll() is not None
self.scheduler_job = None
# check that all output files are generated again
......@@ -177,6 +178,35 @@ class DatabaseTest(unittest.TestCase):
# check that the database and the log files are gone
assert len(os.listdir(self.temp_dir)) == 0
# add the scripts again, but this time with the --stop-on-failure option
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', script_2])
# and execute them, but without writing the log files
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files'])
# ... and kill the scheduler
time.sleep(3)
assert self.scheduler_job.poll() is not None
self.scheduler_job = None
# assert that the log files are not there
assert not os.path.isfile(out_file)
assert not os.path.isfile(err_file)
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[0].result == 255
assert jobs[1].status == 'failure'
assert jobs[1].result is None
job_manager.unlock()
# and clean up again
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
def test02_grid(self):
# Tests the functionality of the grid toolkit in the grid
......
......@@ -31,12 +31,6 @@ if sys.version_info < (2,7):
# Constant regular expressions
QSTAT_FIELD_SEPARATOR = re.compile(':\s+')
def random_logdir():
"""Generates a random log directory for placing the command output"""
x = hashlib.md5(str(random.randint(100000,999999))).hexdigest()
return os.path.join(x[:2], x[2:4], x[4:6])
def makedirs_safe(fulldir):
"""Creates a directory if it does not exists. Takes into consideration
concurrent access support. Works like the shell's 'mkdir -p'.
......@@ -49,37 +43,6 @@ def makedirs_safe(fulldir):
if exc.errno == errno.EEXIST: pass
else: raise
def try_get_contents(filename):
"""Loads contents out of a certain filename"""
try:
return open(filename, 'rt').read()
except OSError, e:
logger.warn("Could not find file '%s'" % filename)
return ''
def try_remove_files(filename, recurse, verbose):
"""Safely removes files from the filesystem"""
if isinstance(filename, (tuple, list)):
for k in filename:
if os.path.exists(k):
os.unlink(k)
if verbose: print verbose + ("removed `%s'" % k)
d = os.path.dirname(k)
if recurse and os.path.exists(d) and not os.listdir(d):
os.removedirs(d)
if verbose: print verbose + ("recursively removed `%s'" % d)
else:
if os.path.exists(filename):
os.unlink(filename)
if verbose: print verbose + ("removed `%s'" % filename)
d = os.path.dirname(filename)
if recurse and os.path.exists(d) and not os.listdir(d):
os.removedirs(d)
if verbose: print verbose + ("recursively removed `%s'" % d)
def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
stderr='', env=[], array=None, context='grid', hostname=None,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment