jman.py 21.4 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5
from __future__ import print_function

6 7 8 9 10 11 12 13 14 15 16 17 18 19
"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys

import argparse
20
import logging
21
import string
22

23
from ..tools import make_shell, logger
24
from .. import local, sge
25
from ..models import Status
26

27
QUEUES = ['all.q', 'q1d', 'q1w', 'q1m', 'q1dm', 'q1wm', 'gpu', 'lgpu', 'sgpu', 'gpum']
28

29 30 31
def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

32
  kwargs = {'wrapper_script' : args.wrapper_script, 'debug' : args.verbose==3, 'database' : args.database}
33
  if args.local:
34
    jm = local.JobManagerLocal(**kwargs)
35
  else:
36
    jm = sge.JobManagerSGE(**kwargs)
37 38

  # set-up logging
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  if args.verbose not in range(0,4):
    raise ValueError("The verbosity level %d does not exist. Please reduce the number of '--verbose' parameters in your call to maximum 3" % level)

  # set up the verbosity level of the logging system
  log_level = {
      0: logging.ERROR,
      1: logging.WARNING,
      2: logging.INFO,
      3: logging.DEBUG
    }[args.verbose]

  handler = logging.StreamHandler()
  handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
  logger.addHandler(handler)
  logger.setLevel(log_level)
54 55 56

  return jm

57 58 59 60 61 62 63 64
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
65
  else:
66 67 68 69 70 71 72
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
73
      c = int(array[step+1:])
74

75
  return (a,b,c)
76 77


78 79 80 81 82
def get_ids(jobs):
  if jobs is None:
    return None
  indexes = []
  for job in jobs:
83
    if '-' not in job and '+' not in job:
84 85
      index = int(job)
      indexes.append(index)
86 87 88 89 90 91 92 93 94
    # check if a range is specified
    elif '-' in job and '+' not in job:
      first, last = job.split('-', 1)
      indexes.extend(range(int(first), int(last) + 1))
    # check if a plus sign is specified
    elif '+' in job and '-' not in job:
      first, add = job.split('+', 1)
      first, add = int(first), int(add)
      indexes.extend(range(first, first + add + 1))
95 96 97
  return indexes


98 99 100 101 102 103 104 105
def get_memfree(memory, parallel):
  """Computes the memory required for the memfree field."""
  number = int(memory.rstrip(string.ascii_letters))
  memtype = memory.lstrip(string.digits)
  if not memtype:
    memtype = "G"
  return "%d%s" % (number*parallel, memtype)

106 107 108 109
def submit(args):
  """Submission command"""

  # set full path to command
110 111
  if args.job[0] == '--':
    del args.job[0]
112
  if not os.path.isabs(args.job[0]):
113 114 115 116 117 118
    args.job[0] = os.path.abspath(args.job[0])

  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
119
      'verbosity' : args.verbose,
120 121 122
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
123
      'io_big': args.io_big,
124
  }
125

126
  if args.array is not None:         kwargs['array'] = get_array(args.array)
127
  if args.exec_dir is not None:      kwargs['exec_dir'] = args.exec_dir
128 129
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies
130
  if args.qname != 'all.q':          kwargs['hvmem'] = args.memory
131
  # if this is a GPU queue and args.memory is provided, we set gpumem flag
132
  # remove 'G' last character from the args.memory string
133 134
  if args.qname in ('gpu', 'lgpu', 'sgpu', 'gpum') and args.memory is not None:
    kwargs['gpumem'] = args.memory
135 136
    # don't set these for GPU processing or the maximum virtual memroy will be
    # set on ulimit
137 138
    kwargs.pop('memfree', None)
    kwargs.pop('hvmem', None)
139 140
  if args.parallel is not None:
    kwargs['pe_opt'] = "pe_mth %d" % args.parallel
141 142
    if args.memory is not None:
      kwargs['memfree'] = get_memfree(args.memory, args.parallel)
143 144
  kwargs['dry_run'] = args.dry_run
  kwargs['stop_on_failure'] = args.stop_on_failure
145

146 147 148 149 150 151
  # submit the job(s)
  for _ in range(args.repeat):
    job_id = jm.submit(args.job, **kwargs)
    dependencies = kwargs.get('dependencies', [])
    dependencies.append(job_id)
    kwargs['dependencies'] = dependencies
152

153 154 155
  if args.print_id:
    print (job_id, end='')

156 157

def resubmit(args):
158
  """Re-submits the jobs with the given ids."""
159
  jm = setup(args)
160 161

  kwargs = {
162 163
      'cwd': True,
      'verbosity' : args.verbose
164 165 166 167 168
  }
  if args.qname is not None:
    kwargs['queue'] = args.qname
  if args.memory is not None:
    kwargs['memfree'] = args.memory
169 170
    if args.qname not in (None, 'all.q'):
      kwargs['hvmem'] = args.memory
171 172 173
    # if this is a GPU queue and args.memory is provided, we set gpumem flag
    # remove 'G' last character from the args.memory string
    if args.qname in ('gpu', 'lgpu', 'sgpu', 'gpum') and args.memory is not None:
174
      kwargs['gpumem'] = args.memory
175 176 177 178
      # don't set these for GPU processing or the maximum virtual memroy will be
      # set on ulimit
      kwargs.pop('memfree', None)
      kwargs.pop('hvmem', None)
179 180
  if args.parallel is not None:
    kwargs['pe_opt'] = "pe_mth %d" % args.parallel
181
    kwargs['memfree'] = get_memfree(args.memory, args.parallel)
182 183 184 185 186
  if args.io_big:
    kwargs['io_big'] = True
  if args.no_io_big:
    kwargs['io_big'] = False

187
  jm.resubmit(get_ids(args.job_ids), args.also_success, args.running_jobs, args.overwrite_command, keep_logs=args.keep_logs, **kwargs)
188

189

190 191
def run_scheduler(args):
  """Runs the scheduler on the local machine. To stop it, please use Ctrl-C."""
192 193 194
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
195
  jm.run_scheduler(parallel_jobs=args.parallel, job_ids=get_ids(args.job_ids), sleep_time=args.sleep_time, die_when_finished=args.die_when_finished, no_log=args.no_log_files, nice=args.nice, verbosity=args.verbose)
196 197


198
def list(args):
199 200
  """Lists the jobs in the given database."""
  jm = setup(args)
201
  jm.list(job_ids=get_ids(args.job_ids), print_array_jobs=args.print_array_jobs, print_dependencies=args.print_dependencies, status=args.status, long=args.long, print_times=args.print_times, ids_only=args.ids_only, names=args.names)
202 203 204 205 206 207 208


def communicate(args):
  """Uses qstat to get the status of the requested jobs."""
  if args.local:
    raise ValueError("The communicate command can only be used without the '--local' command line option")
  jm = setup(args)
209
  jm.communicate(job_ids=get_ids(args.job_ids))
210 211 212 213 214


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
215
  jm.report(job_ids=get_ids(args.job_ids), array_ids=get_ids(args.array_ids), output=not args.errors_only, error=not args.output_only, status=args.status, name=args.name)
216 217 218 219 220 221 222


def stop(args):
  """Stops (qdel's) the jobs with the given ids."""
  if args.local:
    raise ValueError("Stopping commands locally is not supported (please kill them yourself)")
  jm = setup(args)
223
  jm.stop_jobs(get_ids(args.job_ids))
224 225 226


def delete(args):
227
  """Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
228
  jm = setup(args)
229
  # first, stop the jobs if they are running in the grid
230
  if not args.local and 'executing' in args.status:
231 232
    stop(args)
  # then, delete them from the database
233
  jm.delete(job_ids=get_ids(args.job_ids), array_ids=get_ids(args.array_ids), delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir, status=args.status)
234 235 236


def run_job(args):
237
  """Starts the wrapper script to execute a job, interpreting the JOB_ID and SGE_TASK_ID keywords that are set by the grid or by us."""
238 239 240 241 242
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

243

244
class AliasedSubParsersAction(argparse._SubParsersAction):
245
  """Hack taken from https://gist.github.com/471779 to allow aliases in
246 247 248 249 250 251 252 253 254
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
255
      sup.__init__(option_strings=[], dest=dest, help=help)
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

278

279
def main(command_line_options = None):
280 281 282

  from ..config import __version__

283
  formatter = argparse.ArgumentDefaultsHelpFormatter
284
  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
285
      formatter_class=formatter)
286 287 288 289
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
290 291
  parser.add_argument('-v', '--verbose', action = 'count', default = 0,
      help = "Increase the verbosity level from 0 (only error messages) to 1 (warnings), 2 (log messages), 3 (debug information) by adding the --verbose option as often as desired (e.g. '-vvv' for debug).")
292
  parser.add_argument('-V', '--version', action='version',
293
      version='GridTk version %s' % __version__)
294 295
  parser.add_argument('-d', '--database', '--db', metavar='DATABASE', default = 'submitted.sql3',
      help='replace the default database "submitted.sql3" by one provided by you.')
296 297 298

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
299
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
300

301
  # subcommand 'submit'
302
  submit_parser = cmdparser.add_parser('submit', aliases=['sub'], formatter_class=formatter, help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.')
303
  submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', choices=QUEUES, help='the name of the SGE queue to submit the job to')
304 305 306 307
  submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting '
                                                    'the job to a non-GPU queue, e.g., 8G to set the memory '
                                                    'requirements to 8 gigabytes. Sets gpumem parameter when '
                                                    'submitting the job to a GPU-based queue.')
308
  submit_parser.add_argument('-p', '--parallel', '--pe_mth', type=int, help='Sets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.')
309
  submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
310 311
  submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
  submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
312
  submit_parser.add_argument('-d', '--exec-dir', metavar='DIR', help='Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory')
313 314
  submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.')
  submit_parser.add_argument('-s', '--environment', metavar='KEY=VALUE', dest='env', nargs='*', default=[], help='Passes specific environment variables to the job.')
315
  submit_parser.add_argument('-t', '--array', '--parametric', metavar='(first-)last(:step)', help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).")
316
  submit_parser.add_argument('-z', '--dry-run', action='store_true', help='Do not really submit anything, just print out what would submit in this case')
317
  submit_parser.add_argument('-i', '--io-big', action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput.')
318
  submit_parser.add_argument('-r', '--repeat', type=int, metavar='N', default=1, help='Submits the job N times. Each job will depend on the job before.')
319
  submit_parser.add_argument('-o', '--print-id', action='store_true', help='Prints the new job id (so that they can be parsed by automatic scripts).')
320
  submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER, help = "The job that should be executed. Sometimes a -- is required to separate the job from other command line options.")
321 322
  submit_parser.set_defaults(func=submit)

323
  # subcommand 're-submit'
324
  resubmit_parser = cmdparser.add_parser('resubmit', aliases=['reset', 'requeue', 're'], formatter_class=formatter, help='Re-submits a list of jobs.')
325
  resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Re-submit only the jobs with the given ids (by default, all finished jobs are re-submitted).')
326
  resubmit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', choices=QUEUES, help='Reset the SGE queue to submit the job to')
327 328 329 330
  resubmit_parser.add_argument('-m', '--memory', help='Resets both the h_vmem and the mem_free parameters when '
                                                      'submitting the job to a non-GPU queue, e.g., 8G '
                                                      'to set the memory requirements to 8 gigabytes. Resets gpumem '
                                                      'parameter when submitting the job to a GPU-based queue.')
331 332 333
  resubmit_parser.add_argument('-p', '--parallel', '--pe_mth', type=int, help='Resets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.')
  resubmit_parser.add_argument('-i', '--io-big', action='store_true', help='Resubmits the job to the "io_big" queue.')
  resubmit_parser.add_argument('-I', '--no-io-big', action='store_true', help='Resubmits the job NOT to the "io_big" queue.')
334
  resubmit_parser.add_argument('-k', '--keep-logs', action='store_true', help='Do not clean the log files of the old job before re-submitting.')
335
  resubmit_parser.add_argument('-s', '--also-success', action='store_true', help='Re-submit also jobs that have finished successfully.')
336
  resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting (use this flag with care).')
337
  resubmit_parser.add_argument('-o', '--overwrite-command', nargs=argparse.REMAINDER, help = "Overwrite the command line (of a single job) that should be executed (useful to keep job dependencies).")
338 339
  resubmit_parser.set_defaults(func=resubmit)

340
  # subcommand 'stop'
341
  stop_parser = cmdparser.add_parser('stop', formatter_class=formatter, help='Stops the execution of jobs in the grid.')
342
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Stop only the jobs with the given ids (by default, all jobs are stopped).')
343
  stop_parser.set_defaults(func=stop)
344

345
  # subcommand 'list'
346
  list_parser = cmdparser.add_parser('list', aliases=['ls'], formatter_class=formatter,  help='Lists jobs stored in the database. Use the -vv option to get a long listing.')
347
  list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='List only the jobs with the given ids (by default, all jobs are listed)')
348
  list_parser.add_argument('-n', '--names', metavar='NAME', nargs='+', help='List only the jobs with the given names (by default, all jobs are listed)')
349 350
  list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Also list the array ids.')
  list_parser.add_argument('-l', '--long', action='store_true', help='Prints additional information about the submitted job.')
351
  list_parser.add_argument('-t', '--print-times', action='store_true', help='Prints timing information on when jobs were submited, executed and finished')
352
  list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
353
  list_parser.add_argument('-o', '--ids-only', action='store_true', help='Prints ONLY the job ids (so that they can be parsed by automatic scripts).')
354
  list_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
355
  list_parser.set_defaults(func=list)
356

357
  # subcommand 'communicate'
358
  stop_parser = cmdparser.add_parser('communicate', aliases = ['com'], formatter_class=formatter, help='Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.')
359
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Check only the jobs with the given ids (by default, all jobs are checked)')
360 361 362
  stop_parser.set_defaults(func=communicate)


363
  # subcommand 'report'
364
  report_parser = cmdparser.add_parser('report', aliases=['rep', 'r', 'explain', 'why'], formatter_class=formatter, help='Iterates through the result and error log files and prints out the logs.')
365 366
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
367 368
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='+', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
369 370
  report_parser.add_argument('-n', '--name', help="Report only the jobs with the given name; by default all jobs are reported.")
  report_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Report only jobs that have the given statuses; by default all jobs are reported.')
371
  report_parser.set_defaults(func=report)
372

373
  # subcommand 'delete'
374
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'], formatter_class=formatter, help='Removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.')
375 376
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Delete only the jobs with the given ids (by default, all jobs are deleted).')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='+', help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.')
377 378
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
379
  delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.')
380 381
  delete_parser.set_defaults(func=delete)

382
  # subcommand 'run_scheduler'
383
  scheduler_parser = cmdparser.add_parser('run-scheduler', aliases=['sched', 'x'], formatter_class=formatter, help='Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the \'--local\' option.')
384
  scheduler_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
385
  scheduler_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='+', help='Select the job ids that should be run (be default, all submitted and queued jobs are run).')
386 387 388
  scheduler_parser.add_argument('-s', '--sleep-time', type=float, default=0.1, help='Set the sleep time between for the scheduler in seconds.')
  scheduler_parser.add_argument('-x', '--die-when-finished', action='store_true', help='Let the job manager die when it has finished all jobs of the database.')
  scheduler_parser.add_argument('-l', '--no-log-files', action='store_true', help='Overwrites the log file setup to print the results to the console.')
389
  scheduler_parser.add_argument('-n', '--nice', type=int, help='Jobs will be run with the given priority (can only be positive, i.e., to have lower priority')
390
  scheduler_parser.set_defaults(func=run_scheduler)
391

392

393
  # subcommand 'run-job'; this should not be seen on the command line since it is actually a wrapper script
394 395
  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
396

397 398 399 400 401 402 403

  if command_line_options:
    args = parser.parse_args(command_line_options[1:])
    args.wrapper_script = command_line_options[0]
  else:
    args = parser.parse_args()
    args.wrapper_script = sys.argv[0]
404 405 406

  args.func(args)

407
  return 0