Commit a0e86541 authored by André Anjos's avatar André Anjos 💬

Initial version

parents
*.py?
*~
*.egg-info
*.swp
#!/idiap/group/torch5spro/nightlies/externals/v2/linux-x86_64/pyenv/bin/python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 17:21:28 CEST
import os
import sys
install_dir = os.path.realpath(os.path.dirname(sys.argv[0]))
sys.path.append(install_dir)
from gridtk.scripts.grid import main
main()
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST
"""Defines the job manager which can help you managing submitted grid jobs.
"""
import os
import time
import logging
import anydbm
from cPickle import dumps, loads
from .tools import qsub, qstat, qdel
from .setshell import environ
class Job:
"""The job class describes a job"""
def __init__(self, data, args, kwargs):
self.data = data
self.data['user_args'] = args
self.data['user_kwargs'] = kwargs
def id(self):
"""Returns my own numerical id"""
return int(self.data['job_number'])
def age(self, short=True):
"""Returns a string representation indicating, approximately, how much time
has ellapsed since the job was submitted. The input argument must be a
string as defined in the filed 'submission_time' """
translate = {
's': 'second',
'm': 'minute',
'h': 'hour',
'd': 'day',
'w': 'week',
}
s = time.mktime(time.strptime(self.data['submission_time']))
diff = time.time() - s
unit = 's'
if diff > 60: # more than a minute
unit = 'm'
diff /= 60.
if diff > 60: # more than an hour
unit = 'h'
diff /= 60.
if diff > 24: # more than a day
diff /= 24.
unit = 'd'
if diff > 7: # more than a week
diff /= 7.
unit = 'w'
value = int(round(diff))
if short:
return "%d%s" % (value, unit)
else:
plural = "" if value == 1 else "s"
return "%d %s%s" % (value, translate[unit], plural)
def stdout_filename(self):
"""Returns the stdout filename for this job, with the full path"""
base_dir = self.data['sge_o_home']
if self.data.has_key('cwd'): base_dir = self.data['cwd']
# add-on outor directory
if self.data.has_key('stdout_path_list'):
p = self.data['stdout_path_list'].split(':')[2]
if p[0] == os.sep: base_dir = p
else: base_dir = os.path.join(base_dir, p)
return os.path.join(base_dir, self.data['job_name'] +
'.o%s' % self.data['job_number'])
def stderr_filename(self):
"""Returns the stderr filename for this job, with the full path"""
base_dir = self.data['sge_o_home']
if self.data.has_key('cwd'): base_dir = self.data['cwd']
# add-on error directory
if self.data.has_key('stderr_path_list'):
p = self.data['stderr_path_list'].split(':')[2]
if p[0] == os.sep: base_dir = p
else: base_dir = os.path.join(base_dir, p)
return os.path.join(base_dir, self.data['job_name'] +
'.e%s' % self.data['job_number'])
def check(self):
"""Checks if the job was detected to be completed"""
err_file = self.stderr_filename()
try:
if os.stat(err_file).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % \
self.data['job_number'])
return False
except OSError, e:
logging.warn("Could not find error file '%s'" % err_file)
logging.debug("Zero size error log at '%s'" % err_file)
return True
def __str__(self):
"""Returns a string containing a short job description"""
return "%s @%s (%s ago) %s" % (self.data['job_number'],
self.data['hard'].split('=')[1], self.age(short=False),
' '.join(self.data['user_args'][0]))
def row(self, fmt):
"""Returns a string containing the job description suitable for a table"""
return fmt % (self.data['job_number'],
self.data['hard'].split('=')[1], self.age(),
' '.join(self.data['user_args'][0]))
def stderr(self):
"""Returns a string with the contents of the stderr file"""
err_file = self.stderr_filename()
try:
return open(err_file, 'rt').read()
except OSError, e:
logging.warn("Could not find error file '%s'" % err_file)
return ""
def stdout(self):
"""Returns a string with the contents of the stdout file"""
out_file = self.stdout_filename()
try:
return open(out_file, 'rt').read()
except OSError, e:
logging.warn("Could not find output file '%s'" % output_file)
return ""
def has_key(self, key):
return self.data.has_key(key)
def keys(self):
return self.data.keys()
def values(self):
return self.data.values()
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
def __delitem__(self, key):
del self.data[key]
class JobManager:
"""The JobManager will submit and control the status of submitted jobs"""
def __init__(self, statefile='.jobmanager.db', context='grid'):
"""Intializes this object with a state file and a method for qsub'bing.
Keyword parameters:
statefile
The file containing a valid status database for the manager. If the file
does not exist it is initialized. If it exists, it is loaded.
context
The context to provide when setting up the environment to call the SGE
utilities such as qsub, qstat and qdel (normally 'grid', which also
happens to be default)
"""
self.state_file = statefile
self.state_db = anydbm.open(self.state_file, 'c')
self.job = {}
logging.debug("Loading previous state...")
for k in self.state_db.keys():
ki = loads(k)
self.job[ki] = loads(self.state_db[k])
logging.debug("Job %d loaded" % ki)
self.context = environ(context)
def __del__(self):
"""Safely terminates the JobManager"""
db = anydbm.open(self.state_file, 'n') # erase previously recorded jobs
for k in sorted(self.job.keys()): db[dumps(k)] = dumps(self.job[k])
if not self.job:
logging.debug("Removing file %s because there are no more jobs to store" \
% self.state_file)
os.unlink(self.state_file)
def submit(self, *args, **kwargs):
"""Calls the configure qsub method and registers the job"""
kwargs['context'] = self.context
jobid = qsub(*args, **kwargs)
del kwargs['context']
self.job[jobid] = Job(qstat(jobid, context=self.context), args, kwargs)
return self.job[jobid]
def resubmit(self, job, dependencies=[]):
"""Re-submit jobs automatically"""
if dependencies: job['user_kwargs']['deps'] = dependencies
return self.submit(job['user_args'][0], **job['user_kwargs'])
def keys(self):
return self.job.keys()
def __getitem__(self, key):
return self.job[key]
def __delitem__(self, key):
if not self.job.has_key(key): raise KeyError, key
qdel(key, context=self.context)
del self.job[key]
def __str__(self):
"""Returns the status of each job still being tracked"""
# configuration
fields = ("job-id", "queue", "age", "arguments")
lengths = (8, 5, 3, 55)
marker = '='
# work
fmt = "%%%ds %%%ds %%%ds %%-%ds" % lengths
delimiter = fmt % tuple([k*marker for k in lengths])
header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
header = ' '.join(header)
return '\n'.join([header] + [delimiter] + \
[self[k].row(fmt) for k in self.job])
def clear(self):
"""Clear the whole job queue"""
for k in self.keys(): del self[k]
def describe(self, key):
"""Returns a string explaining a certain job"""
return str(self[key])
def stdout(self, key):
"""Gets the output of a certain job"""
return self[key].stdout()
def stderr(self, key):
"""Gets the error output of a certain job"""
return self[key].stderr()
def refresh(self):
"""Conducts a qstat over all jobs in the cache. If the job is not present
anymore check the logs directory for output and error files. If the size of
the error file is different than zero, warn the user.
Returns two lists: jobs that work and jobs that require attention
(error file does not have size 0).
"""
success = []
error = []
for k in sorted(self.job.keys()):
d = qstat(k, context=self.context)
if not d: #job has finished. check
status = self.job[k].check()
if status:
success.append(self.job[k])
del self.job[k]
logging.debug("Job %d completed successfuly" % k)
else:
error.append(self.job[k])
del self.job[k]
logging.debug("Job %d probably did not complete successfuly" % k)
return success, error
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 27 Jul 2011 14:36:06 CEST
"""Executes a given command within the context of a shell script that has its
enviroment set like Idiap's 'SETSHELL grid' does."""
import os
import sys
def main():
if len(sys.argv) < 2:
print __doc__
print "usage: %s <command> [arg [arg ...]]" % \
os.path.basename(sys.argv[0])
sys.exit(1)
from ..setshell import replace
replace('grid', sys.argv[1:])
This diff is collapsed.
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 09:20:40 CEST
"""Wrappers for Idiap's SETSHELL functionality
"""
import os
import sys
import signal
import subprocess
import logging
def environ(context):
"""Retrieves the environment for a particular SETSHELL context"""
BASEDIRSETSHELL = os.environ['BASEDIRSETSHELL']
dosetshell = '%s/setshell/bin/dosetshell' % BASEDIRSETSHELL
command = [dosetshell, '-s', 'sh', context]
# First things first, we get the path to the temp file created by dosetshell
try:
logging.debug("Executing: '%s'", ' '.join(command))
p = subprocess.Popen(command, stdout = subprocess.PIPE)
except OSError as e:
# occurs when the file is not executable or not found
raise OSError, "Error executing '%s': %s (%d)" % \
(' '.join(command), e.strerror, e.errno)
try:
source = p.communicate()[0]
source = source.strip()
except KeyboardInterrupt: # the user CTRL-C'ed
os.kill(p.pid, signal.SIGTERM)
sys.exit(signal.SIGTERM)
# We have now the name of the source file, source it and erase it
command2 = ['bash', '-c', 'source %s && env' % source]
try:
logging.debug("Executing: '%s'", ' '.join(command2))
p2 = subprocess.Popen(command2, stdout = subprocess.PIPE)
except OSError as e:
# occurs when the file is not executable or not found
raise OSError, "Error executing '%s': %s (%d)" % \
(' '.join(command2), e.strerror, e.errno)
new_environ = dict(os.environ)
for line in p2.stdout:
(key, _, value) = line.partition("=")
new_environ[key.strip()] = value.strip()
try:
p2.communicate()
except KeyboardInterrupt: # the user CTRL-C'ed
os.kill(p2.pid, signal.SIGTERM)
sys.exit(signal.SIGTERM)
if os.path.exists(source): os.unlink(source)
logging.debug("Discovered environment for context '%s':", context)
if logging.getLogger().isEnabledFor(logging.DEBUG):
for k in sorted(new_environ.keys()):
logging.debug(" %s = %s", k, new_environ[k])
return new_environ
def sexec(context, command, error_on_nonzero=True):
"""Executes a command within a particular Idiap SETSHELL context"""
if isinstance(context, (str, unicode)): E = environ(context)
else: E = context
try:
logging.debug("Executing: '%s'", ' '.join(command))
p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=E)
(stdout, stderr) = p.communicate() #note: stderr will be 'None'
if p.returncode != 0:
if error_on_nonzero:
raise RuntimeError, \
"Execution of '%s' exited with status != 0 (%d): %s" % \
(' '.join(command), p.returncode, stdout)
else:
logging.debug("Execution of '%s' exited with status != 0 (%d): %s" % \
(' '.join(command), p.returncode, stdout))
return stdout.strip()
except KeyboardInterrupt: # the user CTRC-C'ed
os.kill(p.pid, signal.SIGTERM)
sys.exit(signal.SIGTERM)
def replace(context, command):
E = environ(context)
os.execvpe(command[0], command, E)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 09:26:46 CEST
"""Functions that replace the shell based utilities for the grid submission and
probing.
"""
import os
import re
import logging
# Constant regular expressions
WHITE_SPACE = re.compile('\s+')
def makedirs_safe(fulldir):
"""Creates a directory if it does not exists. Takes into consideration
concurrent access support. Works like the shell's 'mkdir -p'.
"""
try:
if not os.path.exists(fulldir): os.makedirs(fulldir)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST: pass
else: raise
def qsub(command, queue='all.q', cwd=True, name=None, deps=[], stdout='',
stderr='', env=[], context='grid'):
"""Submits a shell job to a given grid queue
Keyword parameters:
command
The command to be submitted to the grid
queue
A valid queue name
cwd
If the job should change to the current working directory before starting
name
An optional name to set for the job. If not given, defaults to the script
name being launched.
deps
Job ids to which this job will be dependent on
stdout
The standard output directory. If not given, defaults to what qsub has as a
default.
stderr
The standard error directory (if not given, defaults to the stdout
directory).
env
This is a list of extra variables that will be set on the environment
running the command of your choice.
context
The setshell context in which we should try a 'qsub'. Normally you don't
need to change the default. This variable can also be set to a context
dictionary in which case we just setup using that context instead of
probing for a new one, what can be fast.
Returns a list of job ids assigned to this job (integers)
"""
scmd = ['qsub', '-l', 'qname=%s' % queue]
if cwd: scmd += ['-cwd']
if name: scmd += ['-N', name]
if deps: scmd += ['-hold_jid', ','.join(['%d' % k for k in deps])]
if stdout:
if not cwd:
# pivot, temporarily, to home directory
curdir = os.path.realpath(os.curdir)
os.chdir(os.environ['HOME'])
if not os.path.exists(stdout): makedirs_safe(stdout)
if not cwd:
# go back
os.chdir(os.path.realpath(curdir))
scmd += ['-o', stdout]
if stderr:
if not os.path.exists(stdout): makedirs_safe(stdout)
scmd += ['-e', stderr]
elif stdout: #just re-use the stdout settings
scmd += ['-e', stdout]
scmd += ['-terse'] # simplified job identifiers returned by the command line
for k in env: scmd += ['-v', k]
if not isinstance(command, (list, tuple)): command = [command]
scmd += command
logging.debug("Qsub command '%s'", ' '.join(scmd))
from .setshell import sexec
jobid = sexec(context, scmd)
return int(jobid.split('.',1)[0])
def make_shell(shell, command, kwargs):
"""Returns a single command given a shell and a command to be qsub'ed
Keyword parameters: (please read the help of qsub())
(read the help of qsub() for details on extra arguments that may be
supplied)
shell
The path to the shell to use when submitting the job.
command
The script path to be submitted
Returns the args and kwargs parameters to be supplied to qsub()
"""
return (['-S', shell] + command, kwargs)
def make_python_wrapper(wrapper, command, kwargs):
"""Returns a single command given a python wrapper and a command to be
qsub'ed by that wrapper.
Keyword parameters: (please read the help of qsub())
(read the help of qsub() for details on extra arguments that may be
supplied)
wrapper
This is the python wrapper to be used for prefixing the environment in
which the **command** will execute. This parameter must be either a path to
the wrapper or a list with the wrapper and **wrapper** command options.
command
The script path to be submitted
Returns the args and kwargs parameters to be supplied to qsub()
"""
if not isinstance(wrapper, (list, tuple)): wrapper = [wrapper]
if not isinstance(command, (list, tuple)): command = [command]
return make_shell('/usr/bin/python', wrapper + ['--'] + command, kwargs)
def make_torch_wrapper(torch, debug, command, kwargs):
"""Submits a command using the Torch python wrapper so the **command**
executes in a valid Torch context.
Keyword parameters: (please read the help of qsub())
(read the help of qsub() for details on extra arguments that may be
supplied)
torch
This is the root directory for the torch installation you would like to use
for wrapping the execution of **command**.
debug
If set, this flag will switch the torch libraries to debug versions with
symbols loaded.
command
The script path to be submitted
Returns the args and kwargs parameters to be supplied to qsub()
"""
binroot = os.path.join(torch, 'bin')
shell = os.path.join(binroot, 'shell.py')
if not os.path.exists(shell):
raise RuntimeError, 'Cannot locate wrapper "%s"' % shell
wrapper = [shell]
if debug: wrapper += ['--debug']
# adds OVERWRITE_TORCH5SPRO_ROOT to the execution environment
if not kwargs.has_key('env'): kwargs['env'] = {}
kwargs['env'].append('OVERWRITE_TORCH5SPRO_BINROOT=%s' % binroot)
return make_python_wrapper(wrapper, command, kwargs)
def qstat(jobid, context='grid'):
"""Queries status of a given job.
Keyword parameters:
jobid
The job identifier as returned by qsub()
context
The setshell context in which we should try a 'qsub'. Normally you don't
need to change the default. This variable can also be set to a context
dictionary in which case we just setup using that context instead of
probing for a new one, what can be fast.
Returns a dictionary with the specific job properties
"""
scmd = ['qstat', '-j', '%d' % jobid, '-f']
logging.debug("Qstat command '%s'", ' '.join(scmd))
from .setshell import sexec
data = sexec(context, scmd, error_on_nonzero=False)
# some parsing:
retval = {}
for line in data.split('\n'):
s = line.strip()
if s.lower().find('do not exist') != -1: return {}
if not s or s.find(10*'=') != -1: continue
key, value = WHITE_SPACE.split(s, 1)
key = key.rstrip(':')
retval[key] = value
return retval
def qdel(jobid, context='grid'):
"""Halts a given job.
Keyword parameters:
jobid
The job identifier as returned by qsub()
context
The setshell context in which we should try a 'qsub'. Normally you don't
need to change the default. This variable can also be set to a context
dictionary in which case we just setup using that context instead of
probing for a new one, what can be fast.
"""
scmd = ['qdel', '%d' % jobid]
logging.debug("Qdel command '%s'", ' '.join(scmd))
from .setshell import sexec
sexec(context, scmd, error_on_nonzero=False)
#!/idiap/group/torch5spro/nightlies/externals/v2/linux-x86_64/pyenv/bin/python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 17:21:46 CEST
import os
import sys
install_dir = os.path.realpath(os.path.dirname(sys.argv[0]))
sys.path.append(install_dir)
from gridtk.scripts.jman import main
main()
.. vim: set fileencoding=utf-8 :
.. Andre Anjos <andre.anjos@idiap.ch>
.. Thu 25 Aug 2011 14:23:15 CEST
=================
SGE Job Manager
=================
The Job Manager is python wrapper around SGE utilities like `qsub`, `qstat` and
`qdel`. It interacts with these tools to submit and manage grid jobs making up
a complete workflow ecosystem.
Everytime you interact with the Job Manager, a local database file (normally
named `.jobmanager.db`) is read or written so it preserves its state during
decoupled calls. The database contains all informations about jobs that is
required for the Job Manager to:
* submit jobs (includes wrapped jobs or Torch5spro specific jobs)
* probe for submitted jobs
* query SGE for submitted jobs
* identify problems with submitted jobs
* cleanup logs from submitted jobs
* easily re-submit jobs if problems occur
Many of these features are also achieveable using the stock SGE utilities, the
Job Manager only makes it dead simple.
Submitting a job
----------------
To interact with the Job Manager we use the `jman` utility. Make sure to have
your shell environment setup to reach it w/o requiring to type-in the full
path. The first task you may need to pursue is to submit jobs. Here is how: