Commit 65553a4c authored by Manuel Günther's avatar Manuel Günther

Added option to ignore warnings while refreshing (jman refresh...

Added option to ignore warnings while refreshing (jman refresh --ignore-warnings); added 'job-name' to the list that is shown on 'jman ls'.
parent 874462bc
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST
# Wed 24 Aug 2011 13:06:25 CEST
"""Defines the job manager which can help you managing submitted grid jobs.
"""
......@@ -45,7 +45,7 @@ def try_remove_files(filename, recurse, verbose):
os.unlink(filename)
if verbose: print verbose + ("removed `%s'" % filename)
d = os.path.dirname(filename)
if recurse and os.path.exists(d) and not os.listdir(d):
if recurse and os.path.exists(d) and not os.listdir(d):
os.removedirs(d)
if verbose: print verbose + ("recursively removed `%s'" % d)
......@@ -81,7 +81,7 @@ class Job:
def is_array(self):
"""Determines if this job is an array or not."""
return bool(self.array)
def array_bounds(self):
......@@ -106,19 +106,19 @@ class Job:
s = time.mktime(time.strptime(self.data['submission_time']))
diff = time.time() - s
unit = 's'
if diff > 60: # more than a minute
unit = 'm'
diff /= 60.
if diff > 60: # more than an hour
unit = 'h'
diff /= 60.
if diff > 24: # more than a day
diff /= 24.
unit = 'd'
if diff > 7: # more than a week
diff /= 7.
unit = 'w'
......@@ -133,7 +133,7 @@ class Job:
return "%d %s%s" % (value, translate[unit], plural)
def queue(self):
"""The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
"""The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
process it we have to split it twice (spaces and then on '='), create a
dictionary and extract just the qname"""
......@@ -143,7 +143,7 @@ class Job:
return d['TRUE']
def __std_filename__(self, indicator, instance):
base_dir = self.data['sge_o_home']
if self.data.has_key('cwd'): base_dir = self.data['cwd']
......@@ -153,14 +153,14 @@ class Job:
if p[0] == os.sep: base_dir = p
else: base_dir = os.path.join(base_dir, p)
retval = os.path.join(base_dir, self.data['job_name'] +
retval = os.path.join(base_dir, self.data['job_name'] +
'.%s%s' % (indicator, self.data['job_number']))
if self.array:
start, stop, step = self.array
l = range(start, stop+1, step)
if isinstance(instance, (long, int)):
if instance not in l:
if instance not in l:
raise RuntimeError, "instance is not part of parametric array"
return retval + '.%d' % instance
else:
......@@ -187,9 +187,9 @@ class Job:
def stderr_filename(self, instance=None):
"""Returns the stderr filename for this job, with the full path"""
return self.__std_filename__('e', instance)
def stderr(self, instance=None):
"""Returns a string with the contents of the stderr file"""
......@@ -202,7 +202,7 @@ class Job:
try_remove_files(self.stderr_filename(instance), recurse, verbose)
def check(self):
def check(self, ignore_warnings=False):
"""Checks if the job is in error state. If this job is a parametric job, it
will return an error state if **any** of the parametrized jobs are in error
state."""
......@@ -211,7 +211,15 @@ class Job:
try:
if os.stat(name).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % jobname)
return False
if not ignore_warnings:
return False
# read the contents of the log file to ignore the annoying warning messages
is_error = False
f = open(name,'r')
for line in f:
is_error = is_error or (line and 'WARNING' not in line)
return not is_error
except OSError, e:
logging.warn("Could not find error file '%s'" % name)
return True
......@@ -252,8 +260,8 @@ class Job:
def __str__(self):
"""Returns a string containing a short job description"""
return "%s @%s (%s ago) %s" % (self.name(),
self.queue(), self.age(short=False), ' '.join(self.args[0]))
return "%s @%s (%s ago) %s %s" % (self.name(),
self.queue(), self.age(short=False), self.kwargs['name'], ' '.join(self.args[0]))
def row(self, fmt, maxcmd=0):
"""Returns a string containing the job description suitable for a table."""
......@@ -262,7 +270,7 @@ class Job:
if maxcmd and len(cmdline) > maxcmd:
cmdline = cmdline[:(maxcmd-3)] + '...'
return fmt % (self.name(), self.queue(), self.age(), cmdline)
return fmt % (self.name(), self.queue(), self.age(), self.kwargs['name'], cmdline)
def has_key(self, key):
return self.data.has_key(key)
......@@ -296,7 +304,7 @@ class JobManager:
context
The context to provide when setting up the environment to call the SGE
utilities such as qsub, qstat and qdel (normally 'grid', which also
utilities such as qsub, qstat and qdel (normally 'grid', which also
happens to be default)
"""
......@@ -315,7 +323,7 @@ class JobManager:
db = anydbm.open(self.state_file, 'n') # erase previously recorded jobs
for k in sorted(self.job.keys()): db[dumps(k)] = dumps(self.job[k])
if not self.job:
if not self.job:
logging.debug("Removing file %s because there are no more jobs to store" \
% self.state_file)
os.unlink(self.state_file)
......@@ -329,7 +337,7 @@ class JobManager:
self.job[jobid] = Job(qstat(jobid, context=self.context), args, kwargs)
return self.job[jobid]
def resubmit(self, job, stdout='', stderr='', dependencies=[],
def resubmit(self, job, stdout='', stderr='', dependencies=[],
failed_only=False):
"""Re-submit jobs automatically"""
......@@ -370,12 +378,12 @@ class JobManager:
"""Returns the status of each job still being tracked"""
# configuration
fields = ("job-id", "queue", "age", "arguments")
lengths = (20, 5, 3, 43)
fields = ("job-id", "queue", "age", "job-name", "arguments")
lengths = (20, 7, 3, 20, 43)
marker = '='
# work
fmt = "%%%ds %%%ds %%%ds %%-%ds" % lengths
fmt = "%%%ds %%%ds %%%ds %%%ds %%-%ds" % lengths
delimiter = fmt % tuple([k*marker for k in lengths])
header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
header = ' '.join(header)
......@@ -399,11 +407,11 @@ class JobManager:
"""Gets the error output of a certain job"""
return self[key].stderr(instance)
def refresh(self):
def refresh(self, ignore_warnings=False):
"""Conducts a qstat over all jobs in the cache. If the job is not present
anymore check the logs directory for output and error files. If the size of
the error file is different than zero, warn the user.
Returns two lists: jobs that work and jobs that require attention
(error file does not have size 0).
"""
......@@ -412,7 +420,7 @@ class JobManager:
for k in sorted(self.job.keys()):
d = qstat(k, context=self.context)
if not d: #job has finished. check
status = self.job[k].check()
status = self.job[k].check(ignore_warnings)
if status:
success.append(self.job[k])
del self.job[k]
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 16:13:31 CEST
# Wed 24 Aug 2011 16:13:31 CEST
"""A logging Idiap/SGE job manager
"""
......@@ -48,15 +48,15 @@ def save_jobs(j, name):
"""Saves jobs in a database"""
db = anydbm.open(name, 'c')
for k in j:
for k in j:
ki = int(k['job_number'])
db[dumps(ki)] = dumps(k)
def refresh(args):
"""Refresh action"""
jm = setup(args)
(good, bad) = jm.refresh()
(good, bad) = jm.refresh(args.ignore_warnings)
if good:
if args.verbose:
......@@ -77,7 +77,7 @@ def refresh(args):
if args.faildb: save_jobs(bad, args.faildb)
def delete(args):
jm = setup(args)
jobs = jm.keys()
if args.jobid: jobs = args.jobid
......@@ -85,23 +85,23 @@ def delete(args):
if jm.has_key(k):
J = jm[k]
if args.also_logs:
if args.verbose:
if args.verbose:
J.rm_stdout(verbose=' ', recurse = not args.keep_log_dir)
J.rm_stderr(verbose=' ', recurse = not args.keep_log_dir)
else:
else:
J.rm_stdout()
J.rm_stderr()
del jm[k]
if args.verbose: print "Deleted job %s" % J
else: print "Deleted job", J.name()
else: # did not find specific key on database
print "Ignored job %d (not found on manager)" % k
def get_logdirs(stdout, stderr, logbase):
"""Calculates the stdout and stderr log directories based on a combination
of user options.
Keyword parameters
stdout
......@@ -112,14 +112,14 @@ def get_logdirs(stdout, stderr, logbase):
logbase
User setting for logbase
Returns a tuple (stdout, stderr) with the absolute path names resolved.
"""
# setup the base logdir
if not logbase:
if not logbase:
basedir = os.path.abspath(os.curdir)
else:
else:
basedir = os.path.abspath(logbase)
if not stdout:
......@@ -142,7 +142,7 @@ def submit(args):
"""Submission command"""
# set full path to command
if not os.path.isabs(args.job[0]):
if not os.path.isabs(args.job[0]):
args.job[0] = os.path.abspath(args.job[0])
# automatically set interpreter if required
......@@ -203,7 +203,7 @@ def explain(args):
if args.verbose:
print "%s stdout (%s)" % (J.name(k[1]), J.stdout_filename(k[1]))
print J.stdout(k[1])
if args.verbose:
if args.verbose:
print "%s stderr (%s)" % (J.name(k[1]), J.stderr_filename(k[1]))
print J.stderr(k[1])
......@@ -219,8 +219,8 @@ def resubmit(args):
args.stdout, args.stderr = get_logdirs(args.stdout, args.stderr, args.logbase)
J = jm.resubmit(O, args.stdout, args.stderr, args.deps, args.failed_only)
if args.verbose:
if args.verbose:
if isinstance(J, (tuple, list)):
for k in J: print 'Re-submitted job', J
else:
......@@ -242,7 +242,7 @@ def resubmit(args):
print ' deleted job %s from database' % O.name()
class AliasedSubParsersAction(argparse._SubParsersAction):
"""Hack taken from https://gist.github.com/471779 to allow aliases in
"""Hack taken from https://gist.github.com/471779 to allow aliases in
argparse for python 2.x (this has been implemented on python 3.2)
"""
......@@ -252,7 +252,7 @@ class AliasedSubParsersAction(argparse._SubParsersAction):
if aliases:
dest += ' (%s)' % ','.join(aliases)
sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
sup.__init__(option_strings=[], dest=dest, help=help)
sup.__init__(option_strings=[], dest=dest, help=help)
def add_parser(self, name, **kwargs):
if 'aliases' in kwargs:
......@@ -289,10 +289,10 @@ def main():
action='store_true', help='increase verbosity for this script')
parser.add_argument('-g', '--debug', dest='debug', default=False,
action='store_true', help='prints out lots of debugging information')
parser.add_argument('-V', '--version', action='version',
parser.add_argument('-V', '--version', action='version',
version='GridTk version %s' % __version__)
cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
# subcommand 'list'
lsparser = cmdparser.add_parser('list', aliases=['ls'],
help='lists jobs stored in the database')
......@@ -302,8 +302,9 @@ def main():
# subcommand 'refresh'
refparser = cmdparser.add_parser('refresh', aliases=['ref'],
help='refreshes the current list of executing jobs by querying SGE, updates the databases of currently executing jobs. If you wish, it may optionally save jobs that executed successfuly and/or failed execution')
refparser.add_argument('-s', '--success-db', default='success.db', dest='successdb', metavar="DB", help='if you provide a name of a file, jobs that have succeeded will be saved on this file (defaults to "%(default)s")')
refparser.add_argument('-f', '--fail-db', dest='faildb', default='failure.db', metavar="DB", help='if you provide a name of a file, jobs that have failed will be saved on this file (defaults to "%(default)s")')
refparser.add_argument('-s', '--success-db', default='success.db', dest='successdb', metavar="DB", help='jobs that have succeeded will be saved on this file (defaults to "%(default)s")')
refparser.add_argument('-f', '--fail-db', dest='faildb', default='failure.db', metavar="DB", help='jobs that have failed will be saved on this file (defaults to "%(default)s")')
refparser.add_argument('-w', '--ignore-warnings', action="store_true", help='if enabled, warnings are not counted as errors')
refparser.add_argument('db', metavar='DATABASE', help='replace the default database to be refreshed by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
refparser.set_defaults(func=refresh)
......@@ -327,7 +328,7 @@ def main():
subparser = cmdparser.add_parser('submit', aliases=['sub'],
help='submits self-contained jobs to the SGE queue and logs them in a private database')
subparser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
subparser.add_argument('-q', '--queue', metavar='QNAME',
subparser.add_argument('-q', '--queue', metavar='QNAME',
dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
#this is ON by default as it helps job management
#subparser.add_argument('-c', '--cwd', default=False, action='store_true',
......@@ -354,11 +355,11 @@ def main():
# subcommand 'resubmit'
resubparser = cmdparser.add_parser('resubmit', aliases=['resub', 're'],
help='resubmits all jobs in a given database, exactly like they were submitted the first time')
resubparser.add_argument('fromdb', metavar='DATABASE',
help='the name of the database to re-submit the jobs from')
resubparser.add_argument('db', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
resubparser.add_argument('-j', '--jobid', dest='jobid', metavar='ID', nargs='*', type=int, default=[], help='by default I\'ll re-submit all jobs, unless you limit giving job identifiers')
resubparser.add_argument('-r', '--cleanup', dest='cleanup', default=False, action='store_true', help='if set I\'ll also remove the old logs if they exist and the re-submitted job from the re-submission database. Note that cleanup always means to cleanup the entire job entries and files. If the job was a parametric job, all output and error files will also be removed.')
resubparser.add_argument('-x', '--dependencies', '--deps', dest='deps', type=int, default=[], metavar='ID', nargs='*', help='when you re-submit jobs, dependencies are reset; if you need dependencies, add them using this option')
......
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='gridtk',
version='0.3.0',
version='0.3.1',
description='SGE Grid Submission and Monitoring Tools for Idiap',
url='https://github.com/idiap/gridtk',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment