Commit bdb3341c authored by André Anjos's avatar André Anjos 💬
Browse files

Modifications to support parametric jobs

parent 14ae1e01
......@@ -14,22 +14,79 @@ from cPickle import dumps, loads
from .tools import qsub, qstat, qdel
from .setshell import environ
import re
JOB_ARRAY_SPLIT = re.compile(r'^(?P<m>\d+)-(?P<n>\d+):(?P<s>\d+)$')
def try_get_contents(filename):
"""Loads contents out of a certain filename"""
try:
return open(filename, 'rt').read()
except OSError, e:
logging.warn("Could not find file '%s'" % filename)
return ''
def try_remove_files(filename, verbose):
"""Safely removes files from the filesystem"""
import pdb; pdb.set_trace()
if isinstance(filename, (tuple, list)):
for k in filename:
if os.path.exists(k):
os.unlink(k)
if verbose: print verbose + ("removed `%f'" % k)
else:
if os.path.exists(filename):
os.unlink(filename)
if verbose: print verbose + ("removed `%f'" % filename)
class Job:
"""The job class describes a job"""
def __init__(self, data, args, kwargs):
self.data = data
self.data['user_args'] = args
self.data['user_kwargs'] = kwargs
self.args = args
self.kwargs = kwargs
if self.data.has_key('job-array tasks'):
b = JOB_ARRAY_SPLIT.match(self.data['job-array tasks']).groupdict()
self.array = (int(b['m']), int(b['n']), int(b['s']))
else:
self.array = None
def id(self):
"""Returns my own numerical id"""
return int(self.data['job_number'])
def name(self, instance=None):
"""Returns my own numerical id"""
if self.is_array():
if isinstance(instance, (int, long)):
return self.data['job_number'] + '.%d' % instance
else:
return self.data['job_number'] + '.%d-%d:%d' % self.array
else:
return self.data['job_number']
def is_array(self):
"""Determines if this job is an array or not."""
return bool(self.array)
def array_bounds(self):
"""If this job is an array (parametric) job, returns a tuple containing 3
elements indicating the start, end and step of the parametric job."""
return self.array
def age(self, short=True):
"""Returns a string representation indicating, approximately, how much time
has ellapsed since the job was submitted. The input argument must be a
string as defined in the filed 'submission_time' """
string as defined in the filed 'submission_time'"""
translate = {
's': 'second',
......@@ -68,8 +125,14 @@ class Job:
plural = "" if value == 1 else "s"
return "%d %s%s" % (value, translate[unit], plural)
def stdout_filename(self):
"""Returns the stdout filename for this job, with the full path"""
def queue(self):
"""The hard resource_list comes like this: 'qname=all.q mem=128M'. To
process it we have to split it twice (spaces and then on '='), create a
dictionary and extract just the qname"""
return dict([k.split('=') for k in self.data['hard resource_list'].split()])['qname']
def __std_filename__(self, indicator, instance):
base_dir = self.data['sge_o_home']
if self.data.has_key('cwd'): base_dir = self.data['cwd']
......@@ -80,77 +143,116 @@ class Job:
if p[0] == os.sep: base_dir = p
else: base_dir = os.path.join(base_dir, p)
return os.path.join(base_dir, self.data['job_name'] +
'.o%s' % self.data['job_number'])
retval = os.path.join(base_dir, self.data['job_name'] +
'.%s%s' % (indicator, self.data['job_number']))
def stderr_filename(self):
"""Returns the stderr filename for this job, with the full path"""
base_dir = self.data['sge_o_home']
if self.data.has_key('cwd'): base_dir = self.data['cwd']
if self.array:
start, stop, step = self.array
l = range(start, stop+1, step)
if isinstance(instance, (long, int)):
if instance not in l:
raise RuntimeError, "instance is not part of parametric array"
return retval + '.%d' % instance
else:
return tuple([retval + '.%d' % k for k in l])
# add-on error directory
if self.data.has_key('stderr_path_list'):
p = self.data['stderr_path_list'].split(':')[2]
if p[0] == os.sep: base_dir = p
else: base_dir = os.path.join(base_dir, p)
return os.path.join(base_dir, self.data['job_name'] +
'.e%s' % self.data['job_number'])
return retval
def check(self):
"""Checks if the job was detected to be completed"""
def stdout_filename(self, instance=None):
"""Returns the stdout filename for this job, with the full path"""
err_file = self.stderr_filename()
return self.__std_filename__('o', instance)
try:
if os.stat(err_file).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % \
self.data['job_number'])
return False
except OSError, e:
logging.warn("Could not find error file '%s'" % err_file)
def stdout(self, instance=None):
"""Returns a string with the contents of the stdout file"""
logging.debug("Zero size error log at '%s'" % err_file)
return True
if self.array and instance is None:
return '\n'.join([try_get_contents(k) for k in self.stdout_filename()])
else:
return try_get_contents(self.stdout_filename(instance))
def __str__(self):
"""Returns a string containing a short job description"""
def rm_stdout(self, instance=None, verbose=False):
return "%s @%s (%s ago) %s" % (self.data['job_number'],
self.data['hard'].split('=')[1], self.age(short=False),
' '.join(self.data['user_args'][0]))
try_remove_files(self.stdout_filename(instance), verbose)
def row(self, fmt):
"""Returns a string containing the job description suitable for a table"""
def stderr_filename(self, instance=None):
"""Returns the stderr filename for this job, with the full path"""
return self.__std_filename__('e', instance)
def stderr(self, instance=None):
"""Returns a string with the contents of the stderr file"""
return fmt % (self.data['job_number'],
self.data['hard'].split('=')[1], self.age(),
' '.join(self.data['user_args'][0]))
if self.array and instance is None:
return '\n'.join([try_get_contents(k) for k in self.stderr_filename()])
else:
return try_get_contents(self.stderr_filename(instance))
def stderr(self):
"""Returns a string with the contents of the stderr file"""
def rm_stderr(self, instance=None, verbose=False):
err_file = self.stderr_filename()
try_remove_files(self.stderr_filename(instance), verbose)
try:
return open(err_file, 'rt').read()
except OSError, e:
logging.warn("Could not find error file '%s'" % err_file)
def check(self):
"""Checks if the job is in error state. If this job is a parametric job, it
will return an error state if **any** of the parametrized jobs are in error
state."""
def check_file(name, jobname):
try:
if os.stat(name).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % jobname)
return False
except OSError, e:
logging.warn("Could not find error file '%s'" % name)
return True
if self.array:
start, stop, step = self.array
files = self.stderr_filename()
jobnames = [self.name(k) for k in range(start, stop+1, step)]
return tuple([check_file(*args) for args in zip(files, jobnames)])
else:
return check_file(self.stderr_filename(), self.name())
def check_array(self):
"""Checks if any of the jobs in a parametric job array has failed. Returns
a list of sub-job identifiers that failed."""
if not self.array:
raise RuntimeError, 'Not a parametric job'
def check_file(name, jobname):
try:
if os.stat(name).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % jobname)
return False
except OSError, e:
logging.warn("Could not find error file '%s'" % f)
return True
start, stop, step = self.array
files = self.stderr_filename()
ids = range(start, stop+1, step)
jobnames = [self.name(k) for k in ids]
retval = []
for i, jobname, f in zip(ids, jobnames, files):
if not check_file(f, jobname): retval.append(i)
return retval
return ""
def __str__(self):
"""Returns a string containing a short job description"""
def stdout(self):
"""Returns a string with the contents of the stdout file"""
return "%s @%s (%s ago) %s" % (self.name(),
self.queue(), self.age(short=False), ' '.join(self.args[0]))
out_file = self.stdout_filename()
def row(self, fmt):
"""Returns a string containing the job description suitable for a table."""
try:
return open(out_file, 'rt').read()
except OSError, e:
logging.warn("Could not find output file '%s'" % output_file)
cmdline = ' '.join(self.args[0])
if len(cmdline) > fmt[-1]:
cmdline = cmdline[:(fmt[-1]-3)] + '...'
return ""
return fmt % (self.name(), self.queue(), self.age(), cmdline)
def has_key(self, key):
return self.data.has_key(key)
......@@ -173,7 +275,7 @@ class Job:
class JobManager:
"""The JobManager will submit and control the status of submitted jobs"""
def __init__(self, statefile='.jobmanager.db', context='grid'):
def __init__(self, statefile='submitted.db', context='grid'):
"""Intializes this object with a state file and a method for qsub'bing.
Keyword parameters:
......@@ -217,11 +319,20 @@ class JobManager:
self.job[jobid] = Job(qstat(jobid, context=self.context), args, kwargs)
return self.job[jobid]
def resubmit(self, job, dependencies=[]):
def resubmit(self, job, dependencies=[], failed_only=False):
"""Re-submit jobs automatically"""
if dependencies: job['user_kwargs']['deps'] = dependencies
return self.submit(job['user_args'][0], **job['user_kwargs'])
if dependencies: job.kwargs['deps'] = dependencies
if failed_only and job.is_array():
retval = []
for k in job.check_array():
job.kwargs['array'] = (k,k,1)
retval.append(self.submit(job.args[0], **job.kwargs))
return retval
else: #either failed_only is not set or submit the job as it was, entirely
return self.submit(job.args[0], **job.kwargs)
def keys(self):
return self.job.keys()
......@@ -242,7 +353,7 @@ class JobManager:
# configuration
fields = ("job-id", "queue", "age", "arguments")
lengths = (8, 5, 3, 55)
lengths = (16, 5, 3, 47)
marker = '='
# work
......@@ -262,13 +373,13 @@ class JobManager:
"""Returns a string explaining a certain job"""
return str(self[key])
def stdout(self, key):
def stdout(self, key, instance=None):
"""Gets the output of a certain job"""
return self[key].stdout()
return self[key].stdout(instance)
def stderr(self, key):
def stderr(self, key, instance=None):
"""Gets the error output of a certain job"""
return self[key].stderr()
return self[key].stderr(instance)
def refresh(self):
"""Conducts a qstat over all jobs in the cache. If the job is not present
......
......@@ -31,7 +31,7 @@ def setup(args):
jm = JobManager(**kwargs)
# set-up logging
if args.verbose:
if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)
......@@ -58,21 +58,22 @@ def refresh(args):
(good, bad) = jm.refresh()
if good:
print "These jobs finished well:"
for k in good: print k
if args.verbose:
print "These jobs finished well:"
for k in good: print k
else:
print "%d job(s) finished well" % len(good)
if args.successdb: save_jobs(good, args.successdb)
if bad:
print "These jobs require attention:"
for k in bad: print k
if args.faildb: save_jobs(bad, args.faildb)
def remove(f):
"""Remove files in a safe way"""
if args.verbose:
print "These jobs require attention:"
for k in bad: print k
else:
print "%d job(s) need attention" % len(bad)
if os.path.exists(f):
os.unlink(f)
print " removed `%s'" % f
if args.faildb: save_jobs(bad, args.faildb)
def delete(args):
......@@ -83,11 +84,17 @@ def delete(args):
if jm.has_key(k):
J = jm[k]
del jm[k]
print "Deleted job %s" % J
if args.also_logs:
remove(J.stdout_filename())
remove(J.stderr_filename())
else:
if args.verbose:
J.rm_stdout(verbose=' ')
J.rm_stderr(verbose=' ')
else:
J.rm_stdout()
J.rm_stderr()
if args.verbose: print "Deleted job %s" % J
else: print "Deleted job", J.name()
else: # did not find specific key on database
print "Ignored job %d (not found on manager)" % k
def submit(args):
......@@ -101,9 +108,11 @@ def submit(args):
'stdout': args.stdout,
'stderr': args.stderr,
'env': args.env,
'array': args.array,
}
jobid = jm.submit(args.job, **kwargs)
print 'Submitted', jm.describe(jobid)
job = jm.submit(args.job, **kwargs)
if args.verbose: print 'Submitted', job
else: print 'Job', job.name(), 'submitted'
def wsubmit(args):
......@@ -116,11 +125,13 @@ def wsubmit(args):
'stdout': args.stdout,
'stderr': args.stderr,
'env': args.env,
'array': args.array,
}
command = make_python_wrapper(args.wrapper, args.job)
job = jm.submit(command, **kwargs)
job = jm.submit(args.wrapper, args.job, **kwargs)
print 'Submitted (wrapped)', job
if args.verbose: print 'Submitted (wrapped)', job
else: print 'Job', job.name(), 'submitted'
def tsubmit(args):
......@@ -133,46 +144,37 @@ def tsubmit(args):
'stdout': args.stdout,
'stderr': args.stderr,
'env': args.env,
'array': args.array,
}
command, kwargs = make_torch_wrapper(args.torch, args.torch_debug,
args.job, kwargs)
job = jm.submit(command, **kwargs)
print 'Submitted (torch\'d)', job
if args.verbose: print 'Submitted (wrapped)', job
else: print 'Job', job.name(), 'submitted'
def explain(args):
"""Explain action"""
jm = setup(args)
jobs = jm.keys()
if args.jobid: jobs = args.jobid
if args.jobid:
jobs = [[int(n) for n in k.split('.', 1)] for k in args.jobid]
for v in jobs:
if len(v) == 1: v.append(None)
else:
jobs = [(k, None) for k in jm.keys()]
first_time = True
for k in jobs:
if not first_time: print 79*'-'
first_time = False
J = jm[k]
J = jm[k[0]]
print "Job", J
print "Command line:", J['user_args'], J['user_kwargs']
print
print "%d stdout (%s)" % (k, J.stdout_filename())
print J.stdout()
print
print "%d stderr (%s)" % (k, J.stderr_filename())
print J.stderr()
def cleanup(args):
"""Cleanup action"""
jm = setup(args)
jobs = jm.keys()
if args.jobid: jobs = args.jobid
for k in jobs:
J = jm[k]
print 'Cleaning-up logs for job', J
remove(J.stdout_filename())
remove(J.stderr_filename())
if args.remove_job:
del jm[k]
print ' deleted job %s from database' % J['job_number']
print "Command line:", J.args, J.kwargs
print "%s stdout (%s)" % (J.name(k[1]), J.stdout_filename(k[1]))
print J.stdout(k[1])
print "%s stderr (%s)" % (J.name(k[1]), J.stderr_filename(k[1]))
print J.stderr(k[1])
def resubmit(args):
......@@ -182,31 +184,47 @@ def resubmit(args):
if args.jobid: jobs = args.jobid
for k in jobs:
O = fromjm[k]
J = jm.resubmit(O, args.deps)
print 'Re-submitted job', J
J = jm.resubmit(O, args.deps, args.failed_only)
if args.verbose:
if isinstance(J, (tuple, list)):
for k in J: print 'Re-submitted job', J
else:
print 'Re-submitted job', J
else:
if isinstance(J, (tuple, list)):
print 'Re-submitted %d jobs' % len(J)
else:
print 'Re-submitted job', J.name()
if args.cleanup:
remove(O.stdout_filename())
remove(O.stderr_filename())
if args.verbose:
O.rm_stdout(verbose=' ')
O.rm_stderr(verbose=' ')
else:
O.rm_stdout()
O.rm_stderr()
del fromjm[k]
print ' deleted job %s from database' % O['job_number']
print ' deleted job %s from database' % O.name()
def add_submission_options(parser):
"""Adds standard submission options to a given parser"""
parser.add_argument('-q', '--queue', metavar='QNAME',
dest='qname', default='all.q',
help='the name of the SGE queue to submit the job to (defaults to %(default)s)')
dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
#this is ON by default as it helps job management
#parser.add_argument('-c', '--cwd', default=False, action='store_true',
# dest='cwd', help='Makes SGE switch to the current working directory before executing the job')
parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
parser.add_argument('-x', '--dependencies', '--deps', dest='deps', type=int,
default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
parser.add_argument('-o', '--stdout', '--out', metavar='DIR', dest='stdout', default='logs', help='Set the standard output of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory or the home directory if the option --cwd was not given')
parser.add_argument('-e', '--stderr', '--err', metavar='DIR', dest='stderr', default='logs', help='Set the standard error of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory or the home directory if the option --cwd was not given')
parser.add_argument('-o', '--stdout', '--out', metavar='DIR', dest='stdout', default='logs', help='Set the standard output of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to "%(default)s")')
parser.add_argument('-e', '--stderr', '--err', metavar='DIR', dest='stderr', default='logs', help='Set the standard error of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to "%(default)s")')
parser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
dest='env', nargs='*', default=[],
help='Passes specific environment variables to the job')
parser.add_argument('-t', '--array', '--parametric', metavar='n[-m[:s]]',
dest='array', help='Creates a parametric (array) job. You must specify the starting range "n" (>=1), the stopping (inclusive) range "m" and the step "s". Read the qsub command man page for details')
class AliasedSubParsersAction(argparse._SubParsersAction):
"""Hack taken from https://gist.github.com/471779 to allow aliases in
......@@ -253,42 +271,29 @@ def main():
parser.add_argument('-d', '--database', metavar='FILE', dest='db', help='replace the default database by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
parser.add_argument('-v', '--verbose', dest='verbose', default=False,
action='store_true', help='increase verbosity for this script')
parser.add_argument('-g', '--debug', dest='debug', default=False,
action='store_true', help='prints out lots of debugging information')
cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
# subcommand 'list'
lsparser = cmdparser.add_parser('list', aliases=['ls'],
help='lists jobs stored in the database')
lsparser.add_argument('-f', '--full', dest='full', default=False,
action='store_true', help='increases information on job lists')
lsparser.set_defaults(func=ls)
# subcommand 'refresh'
refparser = cmdparser.add_parser('refresh', aliases=['ref'],
help='refreshes the current list of executing jobs by querying SGE, updates the databases of currently executing jobs. If you wish, it may optionally save jobs that executed successfuly and/or failed execution')
refparser.add_argument('-s', '--no-success-db', default='success.db', action='store_false', dest='successdb', help='if you provide a name of a file, jobs that have succeeded will be saved on this file')
refparser.add_argument('-f', '--no-fail-db', dest='faildb', default='failure.db', action='store_false',
help='if you provide a name of a file, jobs that have failed will be saved on this file')
refparser.add_argument('-s', '--no-success-db', default='success.db', action='store_false', dest='successdb', help='if you provide a name of a file, jobs that have succeeded will be saved on this file (defaults to "%(default)s")')
refparser.add_argument('-f', '--no-fail-db', dest='faildb', default='failure.db', action='store_false', help='if you provide a name of a file, jobs that have failed will be saved on this file (defaults to "%(default)s")')
refparser.set_defaults(func=refresh)
# subcommand 'explain'
exparser = cmdparser.add_parser('explain', aliases=['why'],
help='explains why jobs failed in a database')
exparser.add_argument('db', metavar='FILE',
help='the name of the database to explain the jobs from')
exparser.add_argument('jobid', metavar='ID', nargs='*', type=int,
default=[], help='by default I\'ll explain all jobs, unless you limit giving job identifiers')
exparser.add_argument('jobid', metavar='ID', nargs='*', type=str,
default=[], help='by default I\'ll explain all jobs, unless you limit giving job identifiers. Identifiers that contain a "." (dot) limits the explanation of a certain job only to a subjob in a parametric array. Everything that comes after the dot is ignored if the job is non-parametric.')
exparser.set_defaults(func=explain)
# subcommand 'cleanup'
cleanparser = cmdparser.add_parser('cleanup', aliases=['clean', 'mrproper'],
help='remove all logging traces of a job - this action only makes sense for completed jobs')
cleanparser.add_argument('db', metavar='FILE',
help='the name of the database to cleanup the jobs from')
cleanparser.add_argument('jobid', metavar='ID', nargs='*', type=int,
default=[], help='by default I\'ll clean-up all jobs, unless you limit giving job identifiers')
cleanparser.add_argument('-r', '--remove-job', dest='remove_job', default=False, action='store_true', help='if set I\'ll also remove the job reference from the database')
cleanparser.set_defaults(func=cleanup)
# subcommand 'delete'
delparser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
......@@ -318,7 +323,7 @@ def main():
help='submits a job that will be executed inside the context of a torch release')
add_submission_options(tsubparser)
tsubparser.set_defaults(func=tsubmit)
tsubparser.add_argument('-t', '--torch', '--torch-root', metavar='DIR',
tsubparser.add_argument('-T', '--torch', '--torch-root', metavar='DIR',
default='/idiap/group/torch5spro/nightlies/last', help='the root directory of a valid torch installation (defaults to %(default)s)')
tsubparser.add_argument('-D', '--torch-debug', dest='torch_debug', default=False, action='store_true', help='if set I\'ll setup the torch environment in debug mode')
tsubparser.add_argument('job', metavar='command', nargs='+')
......@@ -330,8 +335,11 @@ def main():
help='the name of the database to re-submit the jobs from')
resubparser.add_argument('jobid', metavar='ID', nargs='*', type=int,
default=[], help='by default I\'ll re-submit all jobs, unless you limit giving job identifiers')
resubparser.add_argument('-r', '--cleanup', dest='cleanup', default=False, action='store_true', help='if set I\'ll also remove the old logs if they exist and the re-submitted job from the re-submission database')
resubparser.add_argument('-x', '--dependencies', '--deps', dest='deps', type=int, default=[], metavar='ID', nargs='*', help='when you re-submit jobs, dependencies are reset; if you need dependencies, add them using this variable')