jman.py 21.3 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5
from __future__ import print_function

6 7 8 9 10 11 12 13 14 15 16 17 18 19
"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys

import argparse
20
import logging
21
import string
22

23
from ..tools import make_shell, logger
24
from .. import local, sge
25
from ..models import Status
26

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
GPU_QUEUES = ['gpu', 'lgpu', 'sgpu', 'gpum']
QUEUES = ['all.q', 'q1d', 'q1w', 'q1m', 'q1dm', 'q1wm'] + GPU_QUEUES


def appropriate_for_gpu(args, kwargs):
  # don't set these for GPU processing or the maximum virtual memory will be
  # set on ulimit
  kwargs.pop('memfree', None)
  kwargs.pop('hvmem', None)

  if args.memory is None:
    return

  # if this is a GPU queue and args.memory is provided, we set gpumem flag
  # remove 'G' last character from the args.memory string
  if args.memory.isdigit():
    kwargs['gpumem'] = args.memory  # assign directly
  elif args.memory.endswith('G'):
    kwargs['gpumem'] = args.memory[:-1]  # remove G at the end

47

48 49 50
def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

51
  kwargs = {'wrapper_script' : args.wrapper_script, 'debug' : args.verbose==3, 'database' : args.database}
52
  if args.local:
53
    jm = local.JobManagerLocal(**kwargs)
54
  else:
55
    jm = sge.JobManagerSGE(**kwargs)
56 57

  # set-up logging
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
  if args.verbose not in range(0,4):
    raise ValueError("The verbosity level %d does not exist. Please reduce the number of '--verbose' parameters in your call to maximum 3" % level)

  # set up the verbosity level of the logging system
  log_level = {
      0: logging.ERROR,
      1: logging.WARNING,
      2: logging.INFO,
      3: logging.DEBUG
    }[args.verbose]

  handler = logging.StreamHandler()
  handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
  logger.addHandler(handler)
  logger.setLevel(log_level)
73 74 75

  return jm

76 77 78 79 80 81 82 83
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
84
  else:
85 86 87 88 89 90 91
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
92
      c = int(array[step+1:])
93

94
  return (a,b,c)
95 96


97 98 99 100 101
def get_ids(jobs):
  if jobs is None:
    return None
  indexes = []
  for job in jobs:
102
    if '-' not in job and '+' not in job:
103 104
      index = int(job)
      indexes.append(index)
105 106 107 108 109 110 111 112 113
    # check if a range is specified
    elif '-' in job and '+' not in job:
      first, last = job.split('-', 1)
      indexes.extend(range(int(first), int(last) + 1))
    # check if a plus sign is specified
    elif '+' in job and '-' not in job:
      first, add = job.split('+', 1)
      first, add = int(first), int(add)
      indexes.extend(range(first, first + add + 1))
114 115 116
  return indexes


117 118 119 120 121 122 123 124
def get_memfree(memory, parallel):
  """Computes the memory required for the memfree field."""
  number = int(memory.rstrip(string.ascii_letters))
  memtype = memory.lstrip(string.digits)
  if not memtype:
    memtype = "G"
  return "%d%s" % (number*parallel, memtype)

125 126 127 128
def submit(args):
  """Submission command"""

  # set full path to command
129 130
  if args.job[0] == '--':
    del args.job[0]
131
  if not os.path.isabs(args.job[0]):
132 133 134 135 136 137
    args.job[0] = os.path.abspath(args.job[0])

  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
138
      'verbosity' : args.verbose,
139 140 141
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
142
      'io_big': args.io_big,
143
  }
144

145
  if args.array is not None:         kwargs['array'] = get_array(args.array)
146
  if args.exec_dir is not None:      kwargs['exec_dir'] = args.exec_dir
147 148
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies
149
  if args.qname != 'all.q':          kwargs['hvmem'] = args.memory
150 151
  if args.qname in GPU_QUEUES:
    appropriate_for_gpu(args, kwargs)
152 153
  if args.parallel is not None:
    kwargs['pe_opt'] = "pe_mth %d" % args.parallel
154 155
    if args.memory is not None:
      kwargs['memfree'] = get_memfree(args.memory, args.parallel)
156 157
  kwargs['dry_run'] = args.dry_run
  kwargs['stop_on_failure'] = args.stop_on_failure
158

159 160 161 162 163 164
  # submit the job(s)
  for _ in range(args.repeat):
    job_id = jm.submit(args.job, **kwargs)
    dependencies = kwargs.get('dependencies', [])
    dependencies.append(job_id)
    kwargs['dependencies'] = dependencies
165

166 167 168
  if args.print_id:
    print (job_id, end='')

169 170

def resubmit(args):
171
  """Re-submits the jobs with the given ids."""
172
  jm = setup(args)
173 174

  kwargs = {
175 176
      'cwd': True,
      'verbosity' : args.verbose
177 178 179 180 181
  }
  if args.qname is not None:
    kwargs['queue'] = args.qname
  if args.memory is not None:
    kwargs['memfree'] = args.memory
182 183
    if args.qname not in (None, 'all.q'):
      kwargs['hvmem'] = args.memory
184 185
    if args.qname in GPU_QUEUES:
      appropriate_for_gpu(args, kwargs)
186 187
  if args.parallel is not None:
    kwargs['pe_opt'] = "pe_mth %d" % args.parallel
188
    kwargs['memfree'] = get_memfree(args.memory, args.parallel)
189 190 191 192 193
  if args.io_big:
    kwargs['io_big'] = True
  if args.no_io_big:
    kwargs['io_big'] = False

194
  jm.resubmit(get_ids(args.job_ids), args.also_success, args.running_jobs, args.overwrite_command, keep_logs=args.keep_logs, **kwargs)
195

196

197 198
def run_scheduler(args):
  """Runs the scheduler on the local machine. To stop it, please use Ctrl-C."""
199 200 201
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
202
  jm.run_scheduler(parallel_jobs=args.parallel, job_ids=get_ids(args.job_ids), sleep_time=args.sleep_time, die_when_finished=args.die_when_finished, no_log=args.no_log_files, nice=args.nice, verbosity=args.verbose)
203 204


205
def list(args):
206 207
  """Lists the jobs in the given database."""
  jm = setup(args)
208
  jm.list(job_ids=get_ids(args.job_ids), print_array_jobs=args.print_array_jobs, print_dependencies=args.print_dependencies, status=args.status, long=args.long, print_times=args.print_times, ids_only=args.ids_only, names=args.names)
209 210 211 212 213 214 215


def communicate(args):
  """Uses qstat to get the status of the requested jobs."""
  if args.local:
    raise ValueError("The communicate command can only be used without the '--local' command line option")
  jm = setup(args)
216
  jm.communicate(job_ids=get_ids(args.job_ids))
217 218 219 220 221


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
222
  jm.report(job_ids=get_ids(args.job_ids), array_ids=get_ids(args.array_ids), output=not args.errors_only, error=not args.output_only, status=args.status, name=args.name)
223 224 225 226 227 228 229


def stop(args):
  """Stops (qdel's) the jobs with the given ids."""
  if args.local:
    raise ValueError("Stopping commands locally is not supported (please kill them yourself)")
  jm = setup(args)
230
  jm.stop_jobs(get_ids(args.job_ids))
231 232 233


def delete(args):
234
  """Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
235
  jm = setup(args)
236
  # first, stop the jobs if they are running in the grid
237
  if not args.local and 'executing' in args.status:
238 239
    stop(args)
  # then, delete them from the database
240
  jm.delete(job_ids=get_ids(args.job_ids), array_ids=get_ids(args.array_ids), delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir, status=args.status)
241 242 243


def run_job(args):
244
  """Starts the wrapper script to execute a job, interpreting the JOB_ID and SGE_TASK_ID keywords that are set by the grid or by us."""
245 246 247 248 249
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

250

251
class AliasedSubParsersAction(argparse._SubParsersAction):
252
  """Hack taken from https://gist.github.com/471779 to allow aliases in
253 254 255 256 257 258 259 260 261
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
262
      sup.__init__(option_strings=[], dest=dest, help=help)
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

285

286
def main(command_line_options = None):
287 288 289

  from ..config import __version__

290
  formatter = argparse.ArgumentDefaultsHelpFormatter
291
  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
292
      formatter_class=formatter)
293 294 295 296
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
297 298
  parser.add_argument('-v', '--verbose', action = 'count', default = 0,
      help = "Increase the verbosity level from 0 (only error messages) to 1 (warnings), 2 (log messages), 3 (debug information) by adding the --verbose option as often as desired (e.g. '-vvv' for debug).")
299
  parser.add_argument('-V', '--version', action='version',
300
      version='GridTk version %s' % __version__)
301 302
  parser.add_argument('-d', '--database', '--db', metavar='DATABASE', default = 'submitted.sql3',
      help='replace the default database "submitted.sql3" by one provided by you.')
303 304 305

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
306
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
307

308
  # subcommand 'submit'
309
  submit_parser = cmdparser.add_parser('submit', aliases=['sub'], formatter_class=formatter, help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.')
310
  submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', choices=QUEUES, help='the name of the SGE queue to submit the job to')
311 312 313 314
  submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting '
                                                    'the job to a non-GPU queue, e.g., 8G to set the memory '
                                                    'requirements to 8 gigabytes. Sets gpumem parameter when '
                                                    'submitting the job to a GPU-based queue.')
315
  submit_parser.add_argument('-p', '--parallel', '--pe_mth', type=int, help='Sets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.')
316
  submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
317 318
  submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
  submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
319
  submit_parser.add_argument('-d', '--exec-dir', metavar='DIR', help='Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory')
320 321
  submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.')
  submit_parser.add_argument('-s', '--environment', metavar='KEY=VALUE', dest='env', nargs='*', default=[], help='Passes specific environment variables to the job.')
322
  submit_parser.add_argument('-t', '--array', '--parametric', metavar='(first-)last(:step)', help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).")
323
  submit_parser.add_argument('-z', '--dry-run', action='store_true', help='Do not really submit anything, just print out what would submit in this case')
324
  submit_parser.add_argument('-i', '--io-big', action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput.')
325
  submit_parser.add_argument('-r', '--repeat', type=int, metavar='N', default=1, help='Submits the job N times. Each job will depend on the job before.')
326
  submit_parser.add_argument('-o', '--print-id', action='store_true', help='Prints the new job id (so that they can be parsed by automatic scripts).')
327
  submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER, help = "The job that should be executed. Sometimes a -- is required to separate the job from other command line options.")
328 329
  submit_parser.set_defaults(func=submit)

330
  # subcommand 're-submit'
331
  resubmit_parser = cmdparser.add_parser('resubmit', aliases=['reset', 'requeue', 're'], formatter_class=formatter, help='Re-submits a list of jobs.')
332
  resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Re-submit only the jobs with the given ids (by default, all finished jobs are re-submitted).')
333
  resubmit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', choices=QUEUES, help='Reset the SGE queue to submit the job to')
334 335 336 337
  resubmit_parser.add_argument('-m', '--memory', help='Resets both the h_vmem and the mem_free parameters when '
                                                      'submitting the job to a non-GPU queue, e.g., 8G '
                                                      'to set the memory requirements to 8 gigabytes. Resets gpumem '
                                                      'parameter when submitting the job to a GPU-based queue.')
338 339 340
  resubmit_parser.add_argument('-p', '--parallel', '--pe_mth', type=int, help='Resets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.')
  resubmit_parser.add_argument('-i', '--io-big', action='store_true', help='Resubmits the job to the "io_big" queue.')
  resubmit_parser.add_argument('-I', '--no-io-big', action='store_true', help='Resubmits the job NOT to the "io_big" queue.')
341
  resubmit_parser.add_argument('-k', '--keep-logs', action='store_true', help='Do not clean the log files of the old job before re-submitting.')
342
  resubmit_parser.add_argument('-s', '--also-success', action='store_true', help='Re-submit also jobs that have finished successfully.')
343
  resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting (use this flag with care).')
344
  resubmit_parser.add_argument('-o', '--overwrite-command', nargs=argparse.REMAINDER, help = "Overwrite the command line (of a single job) that should be executed (useful to keep job dependencies).")
345 346
  resubmit_parser.set_defaults(func=resubmit)

347
  # subcommand 'stop'
348
  stop_parser = cmdparser.add_parser('stop', formatter_class=formatter, help='Stops the execution of jobs in the grid.')
349
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Stop only the jobs with the given ids (by default, all jobs are stopped).')
350
  stop_parser.set_defaults(func=stop)
351

352
  # subcommand 'list'
353
  list_parser = cmdparser.add_parser('list', aliases=['ls'], formatter_class=formatter,  help='Lists jobs stored in the database. Use the -vv option to get a long listing.')
354
  list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='List only the jobs with the given ids (by default, all jobs are listed)')
355
  list_parser.add_argument('-n', '--names', metavar='NAME', nargs='+', help='List only the jobs with the given names (by default, all jobs are listed)')
356 357
  list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Also list the array ids.')
  list_parser.add_argument('-l', '--long', action='store_true', help='Prints additional information about the submitted job.')
358
  list_parser.add_argument('-t', '--print-times', action='store_true', help='Prints timing information on when jobs were submited, executed and finished')
359
  list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
360
  list_parser.add_argument('-o', '--ids-only', action='store_true', help='Prints ONLY the job ids (so that they can be parsed by automatic scripts).')
361
  list_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
362
  list_parser.set_defaults(func=list)
363

364
  # subcommand 'communicate'
365
  stop_parser = cmdparser.add_parser('communicate', aliases = ['com'], formatter_class=formatter, help='Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.')
366
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Check only the jobs with the given ids (by default, all jobs are checked)')
367 368 369
  stop_parser.set_defaults(func=communicate)


370
  # subcommand 'report'
371
  report_parser = cmdparser.add_parser('report', aliases=['rep', 'r', 'explain', 'why'], formatter_class=formatter, help='Iterates through the result and error log files and prints out the logs.')
372 373
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
374 375
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='+', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
376 377
  report_parser.add_argument('-n', '--name', help="Report only the jobs with the given name; by default all jobs are reported.")
  report_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Report only jobs that have the given statuses; by default all jobs are reported.')
378
  report_parser.set_defaults(func=report)
379

380
  # subcommand 'delete'
381
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'], formatter_class=formatter, help='Removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.')
382 383
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Delete only the jobs with the given ids (by default, all jobs are deleted).')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='+', help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.')
384 385
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
386
  delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
387 388
  delete_parser.set_defaults(func=delete)

389
  # subcommand 'run_scheduler'
390
  scheduler_parser = cmdparser.add_parser('run-scheduler', aliases=['sched', 'x'], formatter_class=formatter, help='Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the \'--local\' option.')
391
  scheduler_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
392
  scheduler_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Select the job ids that should be run (be default, all submitted and queued jobs are run).')
393 394 395
  scheduler_parser.add_argument('-s', '--sleep-time', type=float, default=0.1, help='Set the sleep time between for the scheduler in seconds.')
  scheduler_parser.add_argument('-x', '--die-when-finished', action='store_true', help='Let the job manager die when it has finished all jobs of the database.')
  scheduler_parser.add_argument('-l', '--no-log-files', action='store_true', help='Overwrites the log file setup to print the results to the console.')
396
  scheduler_parser.add_argument('-n', '--nice', type=int, help='Jobs will be run with the given priority (can only be positive, i.e., to have lower priority')
397
  scheduler_parser.set_defaults(func=run_scheduler)
398

399

400
  # subcommand 'run-job'; this should not be seen on the command line since it is actually a wrapper script
401 402
  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
403

404 405 406 407 408 409 410

  if command_line_options:
    args = parser.parse_args(command_line_options[1:])
    args.wrapper_script = command_line_options[0]
  else:
    args = parser.parse_args()
    args.wrapper_script = sys.argv[0]
411 412 413

  args.func(args)

414
  return 0