Commit 8a831aa0 authored by Manuel Günther's avatar Manuel Günther

Added possibility to use Idiaps multi-thread queues; Added a '--long' option...

Added possibility to use Idiaps multi-thread queues; Added a '--long' option for jman list; Fixed bugs in re-submitting jobs.
parent cd74fd53
......@@ -202,6 +202,7 @@ class JobManager:
self.lock()
for job in self.get_jobs(job_ids):
job.refresh()
if job.status in status:
print(job.format(format, dependency_length, None if long else 43))
if print_array_jobs and job.array:
......
......@@ -95,11 +95,14 @@ class Job(Base):
"""Sets the status of this job to 'submitted'."""
self.status = 'submitted'
self.result = None
self.machine_name = None
if new_queue is not None:
self.queue_name = new_queue
for array_job in self.array:
array_job.status = 'submitted'
array_job.result = None
array_job.machine_name = None
def queue(self, new_job_id = None, new_job_name = None, queue_name = None):
"""Sets the status of this job to 'queued' or 'waiting'."""
......@@ -152,7 +155,6 @@ class Job(Base):
job.finish(0, -1)
def finish(self, result, array_id = None):
"""Sets the status of this job to 'success' or 'failure'."""
# check if there is any array job still running
......@@ -180,6 +182,20 @@ class Job(Base):
job.queue()
def refresh(self):
"""Refreshes the status information."""
if self.status == 'executing' and self.array:
new_result = 0
for array_job in self.array:
if array_job.status == 'failure' and new_result is not None:
new_result = array_job.result
elif array_job.status not in ('success', 'failure'):
new_result = None
if new_result is not None:
self.status = 'success' if new_result == 0 else 'failure'
self.result = new_result
def get_command_line(self):
"""Returns the command line for the job."""
# In python 2, the command line is unicode, which needs to be converted to string before pickling;
......@@ -192,11 +208,26 @@ class Job(Base):
# In python 3, the command line is bytes, which can be pickled directly
return loads(self.array_string) if isinstance(self.array_string, bytes) else loads(str(self.array_string))
def get_arguments(self):
"""Returns the additional options for the grid (such as the queue, memory requirements, ...)."""
# In python 2, the command line is unicode, which needs to be converted to string before pickling;
# In python 3, the command line is bytes, which can be pickled directly
return loads(self.grid_arguments) if isinstance(self.grid_arguments, bytes) else loads(str(self.grid_arguments))
args = loads(self.grid_arguments)['kwargs'] if isinstance(self.grid_arguments, bytes) else loads(str(self.grid_arguments))['kwargs']
retval = {}
if 'pe_opt' in args:
retval['pe_opt'] = args['pe_opt']
if 'memfree' in args and args['memfree'] is not None:
retval['memfree'] = args['memfree']
if 'hvmem' in args and args['hvmem'] is not None:
retval['hvmem'] = args['hvmem']
if 'env' in args and len(args['env']) > 0:
retval['env'] = args['env']
if 'io_big' in args and args['io_big']:
retval['io_big'] = True
return retval
def get_jobs_we_wait_for(self):
return [j.waited_for_job for j in self.jobs_we_have_to_wait_for if j.waited_for_job is not None]
......@@ -233,6 +264,11 @@ class Job(Base):
job_id = "%d" % self.id + (" [%d-%d:%d]" % self.get_array() if self.array else "")
status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
queue = self.queue_name if self.machine_name is None else self.machine_name
if limit_command_line is None:
grid_opt = self.get_arguments()
if grid_opt:
# add additional information about the job at the end
command_line = "<" + ",".join(["%s=%s" % (key,value) for key,value in grid_opt.iteritems()]) + ">: " + command_line
if dependencies:
deps = str([dep.id for dep in self.get_jobs_we_wait_for()])
......
......@@ -18,6 +18,7 @@ import sys
import argparse
import logging
import string
from ..tools import make_shell, logger
from .. import local, sge
......@@ -95,6 +96,9 @@ def submit(args):
if args.array is not None: kwargs['array'] = get_array(args.array)
if args.log_dir is not None: kwargs['log_dir'] = args.log_dir
if args.dependencies is not None: kwargs['dependencies'] = args.dependencies
if args.parallel is not None:
kwargs['pe_opt'] = "pe_mth %d" % args.parallel
kwargs['memfree'] = "%d%s" % (int(args.memory.rstrip(string.ascii_letters)) * args.parallel, args.memory.lstrip(string.digits))
kwargs['dry_run'] = args.dry_run
kwargs['stop_on_failure'] = args.stop_on_failure
......@@ -121,7 +125,7 @@ def run_scheduler(args):
def list(args):
"""Lists the jobs in the given database."""
jm = setup(args)
jm.list(job_ids=args.job_ids, print_array_jobs=args.print_array_jobs, print_dependencies=args.print_dependencies, status=args.status, long=args.verbose > 1, ids_only=args.ids_only)
jm.list(job_ids=args.job_ids, print_array_jobs=args.print_array_jobs, print_dependencies=args.print_dependencies, status=args.status, long=args.verbose > 1 or args.long, ids_only=args.ids_only)
def communicate(args):
......@@ -203,8 +207,9 @@ def main(command_line_options = None):
from ..config import __version__
formatter = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
formatter_class=formatter)
# part of the hack to support aliases in subparsers
parser.register('action', 'parsers', AliasedSubParsersAction)
......@@ -221,9 +226,10 @@ def main(command_line_options = None):
cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
# subcommand 'submit'
submit_parser = cmdparser.add_parser('submit', aliases=['sub'], help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.')
submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', help='the name of the SGE queue to submit the job to')
submit_parser = cmdparser.add_parser('submit', aliases=['sub'], formatter_class=formatter, help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.')
submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', choices=('q1d', 'q1w', 'q1m', 'q1dm', 'q1wm'), help='the name of the SGE queue to submit the job to')
submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting the job to the specified value, e.g. 8G to set the memory requirements to 8 gigabytes')
submit_parser.add_argument('-p', '--parallel', '--pe_mth', type=int, help='Sets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.')
submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
......@@ -236,7 +242,7 @@ def main(command_line_options = None):
submit_parser.set_defaults(func=submit)
# subcommand 're-submit'
resubmit_parser = cmdparser.add_parser('resubmit', aliases=['reset', 'requeue', 're'], help='Re-submits a list of jobs.')
resubmit_parser = cmdparser.add_parser('resubmit', aliases=['reset', 'requeue', 're'], formatter_class=formatter, help='Re-submits a list of jobs.')
resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Re-submit only the jobs with the given ids (by default, all finished jobs are re-submitted).')
resubmit_parser.add_argument('-k', '--keep-logs', action='store_true', help='Do not clean the log files of the old job before re-submitting.')
resubmit_parser.add_argument('-f', '--failed-only', action='store_true', help='Re-submit only jobs that have failed.')
......@@ -244,27 +250,28 @@ def main(command_line_options = None):
resubmit_parser.set_defaults(func=resubmit)
# subcommand 'stop'
stop_parser = cmdparser.add_parser('stop', help='Stops the execution of jobs in the grid.')
stop_parser = cmdparser.add_parser('stop', formatter_class=formatter, help='Stops the execution of jobs in the grid.')
stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Stop only the jobs with the given ids (by default, all jobs are stopped).')
stop_parser.set_defaults(func=stop)
# subcommand 'list'
list_parser = cmdparser.add_parser('list', aliases=['ls'], help='Lists jobs stored in the database. Use the -vv option to get a long listing.')
list_parser = cmdparser.add_parser('list', aliases=['ls'], formatter_class=formatter, help='Lists jobs stored in the database. Use the -vv option to get a long listing.')
list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Also list the array ids.')
list_parser.add_argument('-l', '--long', action='store_true', help='Prints additional information about the submitted job.')
list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
list_parser.add_argument('-o', '--ids-only', action='store_true', help='Prints ONLY the job ids (so that they can be parsed by automatic scripts).')
list_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
list_parser.set_defaults(func=list)
# subcommand 'communicate'
stop_parser = cmdparser.add_parser('communicate', aliases = ['com'], help='Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.')
stop_parser = cmdparser.add_parser('communicate', aliases = ['com'], formatter_class=formatter, help='Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.')
stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Check only the jobs with the given ids (by default, all jobs are checked)')
stop_parser.set_defaults(func=communicate)
# subcommand 'report'
report_parser = cmdparser.add_parser('report', aliases=['rep', 'r'], help='Iterates through the result and error log files and prints out the logs.')
report_parser = cmdparser.add_parser('report', aliases=['rep', 'r', 'explain', 'why'], formatter_class=formatter, help='Iterates through the result and error log files and prints out the logs.')
report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs (by default, both logs are reported).')
report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs; use this option also to check error files for jobs with success status.')
......@@ -273,7 +280,7 @@ def main(command_line_options = None):
report_parser.set_defaults(func=report)
# subcommand 'delete'
delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'], help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.')
delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'], formatter_class=formatter, help='Removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.')
delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted).')
delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.')
delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
......@@ -282,7 +289,7 @@ def main(command_line_options = None):
delete_parser.set_defaults(func=delete)
# subcommand 'run_scheduler'
scheduler_parser = cmdparser.add_parser('run-scheduler', aliases=['sched', 'x'], help='Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the \'--local\' option.')
scheduler_parser = cmdparser.add_parser('run-scheduler', aliases=['sched', 'x'], formatter_class=formatter, help='Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the \'--local\' option.')
scheduler_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
scheduler_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Select the job ids that should be run (be default, all submitted and queued jobs are run).')
scheduler_parser.add_argument('-s', '--sleep-time', type=float, default=0.1, help='Set the sleep time between for the scheduler in seconds.')
......
......@@ -39,7 +39,7 @@ class JobManagerSGE(JobManager):
def _queue(self, kwargs):
"""The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
process it we have to split it twice (spaces and then on '='), create a
process it we have to split it twice (',' and then on '='), create a
dictionary and extract just the qname"""
if not 'hard resource_list' in kwargs: return 'all.q'
d = dict([reversed(k.split('=')) for k in kwargs['hard resource_list'].split(',')])
......@@ -100,6 +100,7 @@ class JobManagerSGE(JobManager):
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
job.refresh()
if job.status in ('queued', 'executing'):
status = qstat(job.id, context=self.context)
if len(status) == 0:
......@@ -110,6 +111,7 @@ class JobManagerSGE(JobManager):
self.session.commit()
self.unlock()
def resubmit(self, job_ids = None, failed_only = False, running_jobs = False):
"""Re-submit jobs automatically"""
self.lock()
......@@ -122,10 +124,10 @@ class JobManagerSGE(JobManager):
# re-submit job to the grid
if job.queue_name == 'local':
logger.warn("Re-submitting job '%s' locally (since no queue name is specified)." % job)
job.submit()
else:
logger.debug("Re-submitting job '%s' to the grid." % job)
self._submit_to_grid(job, job.name, job.get_array(), [dep.id for dep in job.get_jobs_we_wait_for()], job.log_dir)
self._submit_to_grid(job, job.name, job.get_array(), [dep.id for dep in job.get_jobs_we_wait_for()], job.log_dir, **job.get_arguments())
job.submit()
self.session.commit()
self.unlock()
......
......@@ -96,7 +96,7 @@ def str_(name):
def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
stderr='', env=[], array=None, context='grid', hostname=None,
mem=None, memfree=None, hvmem=None, pe_opt=None, io_big=False):
memfree=None, hvmem=None, pe_opt=None, io_big=False):
"""Submits a shell job to a given grid queue
Keyword parameters:
......@@ -154,12 +154,6 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
dictionary in which case we just setup using that context instead of
probing for a new one, what can be fast.
mem
@deprecated Please use memfree and hvmem options separately
If set, it asks the queue for a node with a minimum amount of memory,
setting both mem_free and h_vmem.
(cf. qsub -l mem_free=<...> -l h_vmem=<...>)
memfree
If set, it asks the queue for a node with a minimum amount of memory
Used only if mem is not set
......@@ -191,12 +185,8 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
if isinstance(queue, six.string_types) and queue not in ('all.q', 'default'):
scmd += ['-l', queue]
if mem:
scmd += ['-l', 'mem_free=%s' % mem]
scmd += ['-l', 'h_vmem=%s' % mem]
else:
if memfree: scmd += ['-l', 'mem_free=%s' % memfree]
if hvmem: scmd += ['-l', 'h_vmem=%s' % hvmem]
if memfree: scmd += ['-l', 'mem_free=%s' % memfree]
if hvmem: scmd += ['-l', 'h_vmem=%s' % hvmem]
if io_big: scmd += ['-l', 'io_big']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment