jman.py 14.8 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
4
# Wed 24 Aug 2011 16:13:31 CEST
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys
import anydbm
from cPickle import dumps

import argparse

23
from .. import local, sge
24
from ..tools import make_shell, random_logdir, logger
25 26 27 28

def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

29
  kwargs = {'wrapper_script' : args.wrapper_script}
30
  if args.db: kwargs['database'] = args.db
31
  if args.local:
32
    jm = local.JobManagerLocal(**kwargs)
33
  else:
34
    jm = sge.JobManagerSGE(**kwargs)
35 36

  # set-up logging
37
  import logging
38
  if args.debug:
39 40
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
41 42 43
  else:
    logger.setLevel(logging.WARNING)

44 45 46

  return jm

47 48 49 50 51 52 53 54
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
55
  else:
56 57 58 59 60 61 62 63
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
      c = int(array[step+1])
64

65
  return (a,b,c)
66 67 68 69 70 71


def submit(args):
  """Submission command"""

  # set full path to command
72
  if not os.path.isabs(args.job[0]):
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
    args.job[0] = os.path.abspath(args.job[0])

  # automatically set interpreter if required
  if args.python or os.path.splitext(args.job[0])[1] in ('.py',):
    args.job = make_shell(sys.executable, args.job)


  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
      'hvmem': args.memory,
88
      'io_big': args.io_big,
89 90
      }

91 92 93 94
  if args.array is not None:         kwargs['array'] = get_array(args.array)
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies

95 96 97 98 99 100
  if args.dry_run:
    print '-> Job', args.job, 'to', args.qname, 'with',
    print 'queue:', args.qname,
    print 'memory:', args.memory,
    print 'array:', args.array,
    print 'deps:', args.deps,
101 102
    print 'env:', args.env,
    print 'io_big:', args.io_big
103 104
    return

105 106 107
  # submit the job
  job_id = jm.submit(args.job, **kwargs)

108 109

def resubmit(args):
110
  """Re-submits the jobs with the given ids."""
111
  jm = setup(args)
112 113
  if args.clean:
    jm.delete(job_ids=args.job_ids, delete_jobs = False)
114
  jm.resubmit(args.job_ids, args.failed_only, args.running_jobs)
115

116

117 118 119 120 121
def execute(args):
  """Executes the collected jobs on the local machine."""
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
122 123 124
  jm.run(parallel_jobs=args.parallel, job_ids=args.job_ids)


125
def list(args):
126 127
  """Lists the jobs in the given database."""
  jm = setup(args)
128
  jm.list(args.job_ids, args.print_array_jobs, args.print_dependencies)
129 130 131 132 133


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
134 135 136 137 138 139 140 141 142
  jm.report(job_ids=args.job_ids, array_ids=args.array_ids, unfinished=args.unfinished_also, output=not args.errors_only, error=not args.output_only)


def stop(args):
  """Stops (qdel's) the jobs with the given ids."""
  if args.local:
    raise ValueError("Stopping commands locally is not supported (please kill them yourself)")
  jm = setup(args)
  jm.stop_jobs(args.job_ids)
143 144 145


def delete(args):
146
  """Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
147
  jm = setup(args)
148 149 150 151 152
  # first, stop the jobs if they are running in the grid
  if not args.local:
    stop(args)
  # then, delete them from the database
  jm.delete(job_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)
153 154 155


def run_job(args):
156
  """Starts the wrapper script to execute a job, interpreting the JOB_ID and SGE_TASK_ID keywords that are set by the grid or by us."""
157 158 159 160 161
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

162

163
class AliasedSubParsersAction(argparse._SubParsersAction):
164
  """Hack taken from https://gist.github.com/471779 to allow aliases in
165 166 167 168 169 170 171 172 173
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
174
      sup.__init__(option_strings=[], dest=dest, help=help)
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

197

198
def main(command_line_options = None):
199 200 201 202 203 204 205 206 207 208 209 210 211

  from ..config import __version__

  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
      formatter_class=argparse.RawDescriptionHelpFormatter)
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
  parser.add_argument('-v', '--verbose', dest='verbose', default=False,
      action='store_true', help='increase verbosity for this script')
  parser.add_argument('-g', '--debug', dest='debug', default=False,
      action='store_true', help='prints out lots of debugging information')
212
  parser.add_argument('-V', '--version', action='version',
213
      version='GridTk version %s' % __version__)
214 215 216

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
217
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
218

219
  # subcommand 'submit'
220
  submit_parser = cmdparser.add_parser('submit', aliases=['sub'],
221
      help='submits self-contained jobs to the SGE queue and logs them in a private database')
222 223
  submit_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  submit_parser.add_argument('-q', '--queue', metavar='QNAME',
224
      dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
225 226 227
  submit_parser.add_argument('-m', '--memory', dest='memory', help='Sets both the h_vmem **and** the mem_free parameters when submitting the job to the specified value (e.g. 8G to set the memory requirements to 8 gigabytes)')
  submit_parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
  submit_parser.add_argument('-x', '--dependencies', type=int,
228
      default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
229 230
  submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected. If the jobs are executed locally, by default the result is written to console.')
  submit_parser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
231 232
      dest='env', nargs='*', default=[],
      help='Passes specific environment variables to the job')
233
  submit_parser.add_argument('-t', '--array', '--parametric', metavar='[start:]stop[-step]',
234
      dest='array', help='Creates a parametric (array) job. You must specify the stop value, but start (default=1) and step (default=1) can be specified as well.')
235
  submit_parser.add_argument('-p', '--py', '--python', dest='python', default=False,
236
      action='store_true', help='Wrap execution of your command using the current python interpreter')
237
  submit_parser.add_argument('-z', '--dry-run',
238
      action='store_true', help='Do not really submit anything, just print out what would submit in this case')
239
  submit_parser.add_argument('-I', '--io-big', dest='io_big', default=False,
240
      action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput')
241 242 243 244 245 246 247 248
  submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER)
  submit_parser.set_defaults(func=submit)

  # re-submit parser
  resubmit_parser = cmdparser.add_parser('resubmit', aliases=['re'],
      help='Re-submits a list of jobs')
  resubmit_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
249
  resubmit_parser.add_argument('-c', '--clean', action='store_true', help='Clean the log files of the old job before re-submitting')
250 251 252 253 254 255 256 257 258
  resubmit_parser.add_argument('-f', '--failed-only', action='store_true', help='Re-submit only jobs that have failed')
  resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting')
  resubmit_parser.set_defaults(func=resubmit)

  # stop parser
  stop_parser = cmdparser.add_parser('stop', help='Stops the execution of jobs in the grid')
  stop_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Stop only the jobs with the given ids (by default, all jobs are stopped)')
  stop_parser.set_defaults(func=stop)
259

260 261 262 263 264 265 266 267
  # subcommand 'list'
  list_parser = cmdparser.add_parser('list', aliases=['ls'],
      help='lists jobs stored in the database')
  list_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
  list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
  list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
  list_parser.set_defaults(func=list)
268

269
  # report parser
270 271
  report_parser = cmdparser.add_parser('report', aliases=['ref', 'r'],
      help='Iterates through the result and error log files and prints out the logs')
272
  report_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
273 274 275 276 277 278
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
  report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  report_parser.set_defaults(func=report)
279

280 281 282
  # subcommand 'delete'
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
      help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
283
  delete_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
284 285 286 287 288 289
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted)')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
  delete_parser.set_defaults(func=delete)

290 291 292 293 294 295 296 297
  # subcommand 'execute'
  execute_parser = cmdparser.add_parser('execute', aliases=['exe', 'x'],
      help='Executes the registered jobs on the local machine; only valid in combination with the \'--local\' option.')
  execute_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be executed by one provided by you', nargs='?')
  execute_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
  execute_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Execute only the jobs with the given ids (by default, all unfinished jobs are executed)')
  execute_parser.set_defaults(func=execute)

298

299
  # subcommand 'run-job'; this is not seen on the command line since it is actually a wrapper script
300 301 302
  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.add_argument('db', metavar='DATABASE', nargs='?', help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
303

304 305 306 307 308 309 310

  if command_line_options:
    args = parser.parse_args(command_line_options[1:])
    args.wrapper_script = command_line_options[0]
  else:
    args = parser.parse_args()
    args.wrapper_script = sys.argv[0]
311 312 313

  args.func(args)

314
  return 0