jman.py 13.4 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
4
# Wed 24 Aug 2011 16:13:31 CEST
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys

import argparse
20
import logging
21

22
from ..tools import make_shell, logger
23
from .. import local, sge
24 25 26 27

def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

28
  kwargs = {'wrapper_script' : args.wrapper_script, 'debug' : args.verbose==3, 'database' : args.database}
29
  if args.local:
30
    jm = local.JobManagerLocal(**kwargs)
31
  else:
32
    jm = sge.JobManagerSGE(**kwargs)
33 34

  # set-up logging
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
  if args.verbose not in range(0,4):
    raise ValueError("The verbosity level %d does not exist. Please reduce the number of '--verbose' parameters in your call to maximum 3" % level)

  # set up the verbosity level of the logging system
  log_level = {
      0: logging.ERROR,
      1: logging.WARNING,
      2: logging.INFO,
      3: logging.DEBUG
    }[args.verbose]

  handler = logging.StreamHandler()
  handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
  logger.addHandler(handler)
  logger.setLevel(log_level)
50 51 52

  return jm

53 54 55 56 57 58 59 60
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
61
  else:
62 63 64 65 66 67 68 69
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
      c = int(array[step+1])
70

71
  return (a,b,c)
72 73 74 75 76 77


def submit(args):
  """Submission command"""

  # set full path to command
78 79
  if args.job[0] == '--':
    del args.job[0]
80
  if not os.path.isabs(args.job[0]):
81 82 83 84 85 86 87 88 89 90
    args.job[0] = os.path.abspath(args.job[0])

  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
      'hvmem': args.memory,
91
      'io_big': args.io_big,
92 93
      }

94 95 96 97
  if args.array is not None:         kwargs['array'] = get_array(args.array)
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies

98 99 100 101 102 103
  if args.dry_run:
    print '-> Job', args.job, 'to', args.qname, 'with',
    print 'queue:', args.qname,
    print 'memory:', args.memory,
    print 'array:', args.array,
    print 'deps:', args.deps,
104 105
    print 'env:', args.env,
    print 'io_big:', args.io_big
106 107
    return

108 109 110
  # submit the job
  job_id = jm.submit(args.job, **kwargs)

111 112

def resubmit(args):
113
  """Re-submits the jobs with the given ids."""
114
  jm = setup(args)
115
  if not args.keep_logs:
116
    jm.delete(job_ids=args.job_ids, delete_jobs = False)
117
  jm.resubmit(args.job_ids, args.failed_only, args.running_jobs)
118

119

120 121
def run_scheduler(args):
  """Runs the scheduler on the local machine. To stop it, please use Ctrl-C."""
122 123 124
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
125
  jm.run_scheduler(parallel_jobs=args.parallel, sleep_time=args.sleep_time)
126 127


128
def list(args):
129 130
  """Lists the jobs in the given database."""
  jm = setup(args)
131
  jm.list(job_ids=args.job_ids, print_array_jobs=args.print_array_jobs, print_dependencies=args.print_dependencies, long=args.long)
132 133 134 135 136


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
137 138 139 140 141 142 143 144 145
  jm.report(job_ids=args.job_ids, array_ids=args.array_ids, unfinished=args.unfinished_also, output=not args.errors_only, error=not args.output_only)


def stop(args):
  """Stops (qdel's) the jobs with the given ids."""
  if args.local:
    raise ValueError("Stopping commands locally is not supported (please kill them yourself)")
  jm = setup(args)
  jm.stop_jobs(args.job_ids)
146 147 148


def delete(args):
149
  """Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
150
  jm = setup(args)
151 152 153 154 155
  # first, stop the jobs if they are running in the grid
  if not args.local:
    stop(args)
  # then, delete them from the database
  jm.delete(job_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)
156 157 158


def run_job(args):
159
  """Starts the wrapper script to execute a job, interpreting the JOB_ID and SGE_TASK_ID keywords that are set by the grid or by us."""
160 161 162 163 164
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

165

166
class AliasedSubParsersAction(argparse._SubParsersAction):
167
  """Hack taken from https://gist.github.com/471779 to allow aliases in
168 169 170 171 172 173 174 175 176
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
177
      sup.__init__(option_strings=[], dest=dest, help=help)
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

200

201
def main(command_line_options = None):
202 203 204 205

  from ..config import __version__

  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
206
      formatter_class=argparse.ArgumentDefaultsHelpFormatter)
207 208 209 210
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
211 212
  parser.add_argument('-v', '--verbose', action = 'count', default = 0,
      help = "Increase the verbosity level from 0 (only error messages) to 1 (warnings), 2 (log messages), 3 (debug information) by adding the --verbose option as often as desired (e.g. '-vvv' for debug).")
213
  parser.add_argument('-V', '--version', action='version',
214
      version='GridTk version %s' % __version__)
215 216
  parser.add_argument('-d', '--database', '--db', metavar='DATABASE', default = 'submitted.sql3',
      help='replace the default database "submitted.sql3" by one provided by you.')
217 218 219

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
220
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
221

222
  # subcommand 'submit'
223 224 225
  submit_parser = cmdparser.add_parser('submit', aliases=['sub'], help='submits self-contained jobs to the SGE queue and logs them in a private database')
  submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', help='the name of the SGE queue to submit the job to')
  submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting the job to the specified value, e.g. 8G to set the memory requirements to 8 gigabytes')
226
  submit_parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
227
  submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
228
  submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected. If the jobs are executed locally, by default the result is written to console.')
229 230 231 232 233
  submit_parser.add_argument('-s', '--environment', metavar='KEY=VALUE', dest='env', nargs='*', default=[], help='Passes specific environment variables to the job')
  submit_parser.add_argument('-t', '--array', '--parametric', metavar='(start:)stop(-step)', help='Creates a parametric (array) job. You must specify the stop value, but start (default=1) and step (default=1) can be specified as well.')
  submit_parser.add_argument('-z', '--dry-run', action='store_true', help='Do not really submit anything, just print out what would submit in this case')
  submit_parser.add_argument('-I', '--io-big', action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput')
  submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER, help = "The job that should be executed")
234 235
  submit_parser.set_defaults(func=submit)

236 237
  # subcommand 're-submit'
  resubmit_parser = cmdparser.add_parser('resubmit', aliases=['reset', 're'],
238 239
      help='Re-submits a list of jobs')
  resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
240
  resubmit_parser.add_argument('-k', '--keep-logs', action='store_true', help='Do not clean the log files of the old job before re-submitting')
241 242 243 244
  resubmit_parser.add_argument('-f', '--failed-only', action='store_true', help='Re-submit only jobs that have failed')
  resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting')
  resubmit_parser.set_defaults(func=resubmit)

245
  # subcommand 'stop'
246 247 248
  stop_parser = cmdparser.add_parser('stop', help='Stops the execution of jobs in the grid')
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Stop only the jobs with the given ids (by default, all jobs are stopped)')
  stop_parser.set_defaults(func=stop)
249

250 251 252 253
  # subcommand 'list'
  list_parser = cmdparser.add_parser('list', aliases=['ls'],
      help='lists jobs stored in the database')
  list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
254
  list_parser.add_argument('-l', '--long', action='store_true', help='Lists the complete command line (will be shortened otherwise).')
255 256 257
  list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
  list_parser.set_defaults(func=list)
258

259 260
  # subcommand 'report'
  report_parser = cmdparser.add_parser('report', aliases=['rep', 'r'],
261 262 263 264 265 266 267
      help='Iterates through the result and error log files and prints out the logs')
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
  report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  report_parser.set_defaults(func=report)
268

269 270 271 272 273 274 275 276 277
  # subcommand 'delete'
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
      help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted)')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
  delete_parser.set_defaults(func=delete)

278
  # subcommand 'execute'
279 280
  execute_parser = cmdparser.add_parser('run-scheduler', aliases=['sched', 'x'],
      help='Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the \'--local\' option.')
281
  execute_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
282 283
  execute_parser.add_argument('-s', '--sleep-time', type=float, default=0.1, help='Set the sleep time between for the scheduler in seconds.')
  execute_parser.set_defaults(func=run_scheduler)
284

285

286
  # subcommand 'run-job'; this is not seen on the command line since it is actually a wrapper script
287 288
  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
289

290 291 292 293 294 295 296

  if command_line_options:
    args = parser.parse_args(command_line_options[1:])
    args.wrapper_script = command_line_options[0]
  else:
    args = parser.parse_args()
    args.wrapper_script = sys.argv[0]
297 298 299

  args.func(args)

300
  return 0