jman.py 13.3 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
4
# Wed 24 Aug 2011 16:13:31 CEST
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys
import anydbm
from cPickle import dumps

import argparse

23
from .. import local, sge
24
from ..tools import make_shell, random_logdir, logger
25 26 27 28 29

def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

  kwargs = {}
30
  if args.db: kwargs['database'] = args.db
31
  if args.local:
32
    jm = local.JobManagerLocal(**kwargs)
33
  else:
34
    jm = sge.JobManagerSGE(**kwargs)
35 36 37 38

  # set-up logging
  if args.debug:
    import logging
39 40
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
41 42 43

  return jm

44 45 46 47 48 49 50 51
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
52
  else:
53 54 55 56 57 58 59 60
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
      c = int(array[step+1])
61

62
  return (a,b,c)
63 64 65 66 67 68


def submit(args):
  """Submission command"""

  # set full path to command
69
  if not os.path.isabs(args.job[0]):
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
    args.job[0] = os.path.abspath(args.job[0])

  # automatically set interpreter if required
  if args.python or os.path.splitext(args.job[0])[1] in ('.py',):
    args.job = make_shell(sys.executable, args.job)


  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
      'hvmem': args.memory,
85
      'io_big': args.io_big,
86 87
      }

88 89 90 91
  if args.array is not None:         kwargs['array'] = get_array(args.array)
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies

92 93 94 95 96 97
  if args.dry_run:
    print '-> Job', args.job, 'to', args.qname, 'with',
    print 'queue:', args.qname,
    print 'memory:', args.memory,
    print 'array:', args.array,
    print 'deps:', args.deps,
98 99
    print 'env:', args.env,
    print 'io_big:', args.io_big
100 101
    return

102 103 104
  # submit the job
  job_id = jm.submit(args.job, **kwargs)

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123

def explain(args):
  """Explain action"""

  jm = setup(args)

  if args.jobid:
    jobs = [[int(n) for n in k.split('.', 1)] for k in args.jobid]
    for v in jobs:
      if len(v) == 1: v.append(None)
  else:
    jobs = [(k, None) for k in jm.keys()]

  first_time = True
  for k in jobs:
    if not first_time: print 79*'-'
    first_time = False
    J = jm[k[0]]
    print "Job", J
124
    print "Command line:", J.command_line()
125 126 127
    if args.verbose:
      print "%s stdout (%s)" % (J.name(k[1]), J.stdout_filename(k[1]))
      print J.stdout(k[1])
128
    if args.verbose:
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      print "%s stderr (%s)" % (J.name(k[1]), J.stderr_filename(k[1]))
    print J.stderr(k[1])

def resubmit(args):

  jm = setup(args)
  fromjm = JobManager(args.fromdb)
  jobs = fromjm.keys()
  if args.jobid: jobs = args.jobid
  for k in jobs:
    O = fromjm[k]

    args.stdout, args.stderr = get_logdirs(args.stdout, args.stderr, args.logbase)

    J = jm.resubmit(O, args.stdout, args.stderr, args.deps, args.failed_only)
144 145

    if args.verbose:
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
      if isinstance(J, (tuple, list)):
        for k in J: print 'Re-submitted job', J
      else:
        print 'Re-submitted job', J
    else:
      if isinstance(J, (tuple, list)):
        print 'Re-submitted %d jobs' % len(J)
      else:
        print 'Re-submitted job', J.name()

    if args.cleanup:
      if args.verbose:
        O.rm_stdout(verbose='  ')
        O.rm_stderr(verbose='  ')
      else:
        O.rm_stdout()
        O.rm_stderr()
      del fromjm[k]
      print '  deleted job %s from database' % O.name()

166

167 168 169 170 171
def execute(args):
  """Executes the collected jobs on the local machine."""
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
  jm.run(parallel_jobs=args.parallel, job_ids=args.job_ids)


def ls(args):
  """Lists the jobs in the given database."""
  jm = setup(args)
  jm.list()


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
  jm.report(grid_ids=args.job_ids, array_ids=args.array_ids, unfinished=args.unfinished_also, output=not args.errors_only, error=not args.output_only)


def delete(args):
  """Deletes the jobs from the job manager."""
  jm = setup(args)
  jm.delete(grid_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)


def run_job(args):
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

199

200
class AliasedSubParsersAction(argparse._SubParsersAction):
201
  """Hack taken from https://gist.github.com/471779 to allow aliases in
202 203 204 205 206 207 208 209 210
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
211
      sup.__init__(option_strings=[], dest=dest, help=help)
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

234

235 236 237 238 239 240 241 242 243 244 245 246 247 248
def main():

  from ..config import __version__

  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
      formatter_class=argparse.RawDescriptionHelpFormatter)
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
  parser.add_argument('-v', '--verbose', dest='verbose', default=False,
      action='store_true', help='increase verbosity for this script')
  parser.add_argument('-g', '--debug', dest='debug', default=False,
      action='store_true', help='prints out lots of debugging information')
249
  parser.add_argument('-V', '--version', action='version',
250
      version='GridTk version %s' % __version__)
251 252 253

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
254
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
255

256 257 258 259 260 261 262 263 264 265
  # subcommand 'list'
  lsparser = cmdparser.add_parser('list', aliases=['ls'],
      help='lists jobs stored in the database')
  lsparser.add_argument('db', metavar='DATABASE', help='replace the default database by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
  lsparser.set_defaults(func=ls)

  # subcommand 'submit'
  subparser = cmdparser.add_parser('submit', aliases=['sub'],
      help='submits self-contained jobs to the SGE queue and logs them in a private database')
  subparser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
266
  subparser.add_argument('-q', '--queue', metavar='QNAME',
267 268 269 270 271 272
      dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
  #this is ON by default as it helps job management
  #subparser.add_argument('-c', '--cwd', default=False, action='store_true',
  #    dest='cwd', help='Makes SGE switch to the current working directory before executing the job')
  subparser.add_argument('-m', '--memory', dest='memory', help='Sets both the h_vmem **and** the mem_free parameters when submitting the job to the specified value (e.g. 8G to set the memory requirements to 8 gigabytes)')
  subparser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
273
  subparser.add_argument('-x', '--dependencies', type=int,
274
      default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
275
  subparser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected. If the jobs are executed locally, by default the result is written to console.')
276 277 278
  subparser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
      dest='env', nargs='*', default=[],
      help='Passes specific environment variables to the job')
279 280
  subparser.add_argument('-t', '--array', '--parametric', metavar='[start:]stop[-step]',
      dest='array', help='Creates a parametric (array) job. You must specify the stop value, but start (default=1) and step (default=1) can be specified as well.')
281 282
  subparser.add_argument('-p', '--py', '--python', dest='python', default=False,
      action='store_true', help='Wrap execution of your command using the current python interpreter')
283
  subparser.add_argument('-z', '--dry-run',
284
      action='store_true', help='Do not really submit anything, just print out what would submit in this case')
285 286
  subparser.add_argument('-I', '--io-big', dest='io_big', default=False,
      action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput')
287 288 289
  subparser.add_argument('job', metavar='command', nargs=argparse.REMAINDER)
  subparser.set_defaults(func=submit)

290 291
  execute_parser = cmdparser.add_parser('execute', aliases=['exe', 'x'],
      help='Executes the registered jobs on the local machine; only valid in combination with the \'--local\' option.')
292 293 294
  execute_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be executed by one provided by you', nargs='?')
  execute_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
  execute_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Execute only the jobs with the given ids (by default, all unfinished jobs are executed)')
295 296
  execute_parser.set_defaults(func=execute)

297 298 299 300 301 302 303 304 305
  report_parser = cmdparser.add_parser('report', aliases=['ref', 'r'],
      help='Iterates through the result and error log files and prints out the logs')
  report_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
  report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  report_parser.set_defaults(func=report)
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
  # subcommand 'delete'
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
      help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
  delete_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted)')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
  delete_parser.set_defaults(func=delete)


  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.add_argument('db', metavar='DATABASE', nargs='?', help=argparse.SUPPRESS)
#  run_parser.add_argument('--job-id', required = True, type=int, help=argparse.SUPPRESS)
#  run_parser.add_argument('--array-id', type=int, help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
323 324 325 326 327 328

  args = parser.parse_args()

  args.func(args)

  sys.exit(0)