Commit eac5be4c authored by Manuel Günther's avatar Manuel Günther

Improved behavior when local scheduler is stopped / restarted; corrected documentation.

parent 3082f4ed
......@@ -76,32 +76,37 @@ class JobManagerLocal(JobManager):
self.unlock()
def stop_jobs(self, job_ids):
"""Stops the jobs in the grid."""
def stop_jobs(self, job_ids=None):
"""Resets the status of the job to 'submitted' when they are labeled as 'executing'."""
self.lock()
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
logger.info("Reset job '%s' in the database" % job)
job.status = 'submitted'
if job.status in ('executing', 'queued', 'waiting'):
logger.info("Reset job '%s' in the database" % job.name)
job.submit()
self.session.commit()
self.unlock()
def stop_job(self, job_id, array_id = None):
"""Stops the jobs in the grid."""
"""Resets the status of the given to 'submitted' when they are labeled as 'executing'."""
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
if job is not None:
if job.status == 'executing':
logger.info("Reset job '%s' in the database" % job)
if job.status in ('executing', 'queued', 'waiting'):
logger.info("Reset job '%s' in the database" % job.name)
job.status = 'submitted'
if array_job is not None and array_job.status == 'executing':
if array_job is not None and array_job.status in ('executing', 'queued', 'waiting'):
logger.debug("Reset array job '%s' in the database" % array_job)
array_job.status = 'submitted'
if array_job is None:
for array_job in job.array:
if array_job.status in ('executing', 'queued', 'waiting'):
logger.debug("Reset array job '%s' in the database" % array_job)
array_job.status = 'submitted'
self.session.commit()
self.unlock()
......@@ -257,10 +262,12 @@ class JobManagerLocal(JobManager):
# This is the only way to stop: you have to interrupt the scheduler
except KeyboardInterrupt:
if hasattr(self, 'session'):
self.unlock()
logger.info("Stopping task scheduler due to user interrupt.")
for task in running_tasks:
logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
task[0].kill()
if hasattr(self, 'session'):
self.unlock()
self.stop_job(task[1], task[2] if len(task) > 2 else None)
self.stop_job(task[1])
# stopp all jobs that are currently running or queued
self.stop_jobs()
......@@ -123,7 +123,7 @@ class JobManager:
self.unlock()
try:
self.stop_jobs(list(dependent_job_ids))
logger.warn("Deleted dependent jobs '%s' since this job failed.")
logger.warn("Deleted dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
except:
pass
return
......@@ -243,7 +243,7 @@ class JobManager:
def _delete_dir_if_empty(log_dir):
if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
os.rmdir(log_dir)
logger.info("Removed log directory '%s' since it was empty" % log_dir)
logger.info("Removed empty log directory '%s'" % log_dir)
def _delete(job, try_to_delete_dir=False):
# delete the job from the database
......@@ -271,11 +271,13 @@ class JobManager:
job = array_jobs[0].job
for array_job in array_jobs:
if array_job.status in status:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
_delete(array_job)
if not job.array:
if job.status in status:
logger.info("Deleting job '%d' from the database." % job.id)
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
else:
......@@ -286,11 +288,13 @@ class JobManager:
if job.array:
for array_job in job.array:
if array_job.status in status:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
_delete(array_job)
# delete this job
if job.status in status:
logger.info("Deleting job '%d' from the database." % job.id)
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
self.session.commit()
......
......@@ -124,7 +124,8 @@ class Job(Base):
self.status = new_status
for array_job in self.array:
array_job.status = new_status
if array_job.status not in ('success', 'failure'):
array_job.status = new_status
def execute(self, array_id = None):
......
......@@ -106,7 +106,7 @@ def resubmit(args):
"""Re-submits the jobs with the given ids."""
jm = setup(args)
if not args.keep_logs:
jm.delete(job_ids=args.job_ids, delete_jobs = False)
jm.delete(job_ids=args.job_ids, delete_jobs=False)
jm.resubmit(args.job_ids, args.failed_only, args.running_jobs)
......@@ -224,7 +224,7 @@ def main(command_line_options = None):
submit_parser = cmdparser.add_parser('submit', aliases=['sub'], help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.')
submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', help='the name of the SGE queue to submit the job to')
submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting the job to the specified value, e.g. 8G to set the memory requirements to 8 gigabytes')
submit_parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.')
......@@ -265,7 +265,7 @@ def main(command_line_options = None):
report_parser = cmdparser.add_parser('report', aliases=['rep', 'r'], help='Iterates through the result and error log files and prints out the logs.')
report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs (by default, both logs are reported).')
report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs; use this option also to check error files for jobs with success status.')
report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
report_parser.set_defaults(func=report)
......@@ -276,7 +276,7 @@ def main(command_line_options = None):
delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.')
delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given stati; by default all jobs are deleted.')
delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
delete_parser.set_defaults(func=delete)
# subcommand 'run_scheduler'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment