jman.py 13.3 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
4
# Wed 24 Aug 2011 16:13:31 CEST
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys
import anydbm
from cPickle import dumps

import argparse

23
from .. import local, sge
24
from ..tools import make_shell, random_logdir, logger
25
26
27
28
29

def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

  kwargs = {}
30
  if args.db: kwargs['database'] = args.db
31
  if args.local:
32
    jm = local.JobManagerLocal(**kwargs)
33
  else:
34
    jm = sge.JobManagerSGE(**kwargs)
35
36
37
38

  # set-up logging
  if args.debug:
    import logging
39
40
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
41
42
43

  return jm

44
45
46
47
48
49
50
51
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
52
  else:
53
54
55
56
57
58
59
60
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
      c = int(array[step+1])
61

62
  return (a,b,c)
63
64
65
66
67
68


def submit(args):
  """Submission command"""

  # set full path to command
69
  if not os.path.isabs(args.job[0]):
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
    args.job[0] = os.path.abspath(args.job[0])

  # automatically set interpreter if required
  if args.python or os.path.splitext(args.job[0])[1] in ('.py',):
    args.job = make_shell(sys.executable, args.job)


  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
      'hvmem': args.memory,
85
      'io_big': args.io_big,
86
87
      }

88
89
90
91
  if args.array is not None:         kwargs['array'] = get_array(args.array)
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies

92
93
94
95
96
97
  if args.dry_run:
    print '-> Job', args.job, 'to', args.qname, 'with',
    print 'queue:', args.qname,
    print 'memory:', args.memory,
    print 'array:', args.array,
    print 'deps:', args.deps,
98
99
    print 'env:', args.env,
    print 'io_big:', args.io_big
100
101
    return

102
103
104
  # submit the job
  job_id = jm.submit(args.job, **kwargs)

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

def explain(args):
  """Explain action"""

  jm = setup(args)

  if args.jobid:
    jobs = [[int(n) for n in k.split('.', 1)] for k in args.jobid]
    for v in jobs:
      if len(v) == 1: v.append(None)
  else:
    jobs = [(k, None) for k in jm.keys()]

  first_time = True
  for k in jobs:
    if not first_time: print 79*'-'
    first_time = False
    J = jm[k[0]]
    print "Job", J
124
    print "Command line:", J.command_line()
125
126
127
    if args.verbose:
      print "%s stdout (%s)" % (J.name(k[1]), J.stdout_filename(k[1]))
      print J.stdout(k[1])
128
    if args.verbose:
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
      print "%s stderr (%s)" % (J.name(k[1]), J.stderr_filename(k[1]))
    print J.stderr(k[1])

def resubmit(args):

  jm = setup(args)
  fromjm = JobManager(args.fromdb)
  jobs = fromjm.keys()
  if args.jobid: jobs = args.jobid
  for k in jobs:
    O = fromjm[k]

    args.stdout, args.stderr = get_logdirs(args.stdout, args.stderr, args.logbase)

    J = jm.resubmit(O, args.stdout, args.stderr, args.deps, args.failed_only)
144
145

    if args.verbose:
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
      if isinstance(J, (tuple, list)):
        for k in J: print 'Re-submitted job', J
      else:
        print 'Re-submitted job', J
    else:
      if isinstance(J, (tuple, list)):
        print 'Re-submitted %d jobs' % len(J)
      else:
        print 'Re-submitted job', J.name()

    if args.cleanup:
      if args.verbose:
        O.rm_stdout(verbose='  ')
        O.rm_stderr(verbose='  ')
      else:
        O.rm_stdout()
        O.rm_stderr()
      del fromjm[k]
      print '  deleted job %s from database' % O.name()

166

167
168
169
170
171
def execute(args):
  """Executes the collected jobs on the local machine."""
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
  jm.run(parallel_jobs=args.parallel, job_ids=args.job_ids)


def ls(args):
  """Lists the jobs in the given database."""
  jm = setup(args)
  jm.list()


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
  jm.report(grid_ids=args.job_ids, array_ids=args.array_ids, unfinished=args.unfinished_also, output=not args.errors_only, error=not args.output_only)


def delete(args):
  """Deletes the jobs from the job manager."""
  jm = setup(args)
  jm.delete(grid_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)


def run_job(args):
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

199

200
class AliasedSubParsersAction(argparse._SubParsersAction):
201
  """Hack taken from https://gist.github.com/471779 to allow aliases in
202
203
204
205
206
207
208
209
210
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
211
      sup.__init__(option_strings=[], dest=dest, help=help)
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
def main():

  from ..config import __version__

  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
      formatter_class=argparse.RawDescriptionHelpFormatter)
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
  parser.add_argument('-v', '--verbose', dest='verbose', default=False,
      action='store_true', help='increase verbosity for this script')
  parser.add_argument('-g', '--debug', dest='debug', default=False,
      action='store_true', help='prints out lots of debugging information')
249
  parser.add_argument('-V', '--version', action='version',
250
      version='GridTk version %s' % __version__)
251
252
253

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
254
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
255

256
257
258
259
260
261
262
263
264
265
  # subcommand 'list'
  lsparser = cmdparser.add_parser('list', aliases=['ls'],
      help='lists jobs stored in the database')
  lsparser.add_argument('db', metavar='DATABASE', help='replace the default database by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
  lsparser.set_defaults(func=ls)

  # subcommand 'submit'
  subparser = cmdparser.add_parser('submit', aliases=['sub'],
      help='submits self-contained jobs to the SGE queue and logs them in a private database')
  subparser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
266
  subparser.add_argument('-q', '--queue', metavar='QNAME',
267
268
269
270
271
272
      dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
  #this is ON by default as it helps job management
  #subparser.add_argument('-c', '--cwd', default=False, action='store_true',
  #    dest='cwd', help='Makes SGE switch to the current working directory before executing the job')
  subparser.add_argument('-m', '--memory', dest='memory', help='Sets both the h_vmem **and** the mem_free parameters when submitting the job to the specified value (e.g. 8G to set the memory requirements to 8 gigabytes)')
  subparser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
273
  subparser.add_argument('-x', '--dependencies', type=int,
274
      default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
275
  subparser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected. If the jobs are executed locally, by default the result is written to console.')
276
277
278
  subparser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
      dest='env', nargs='*', default=[],
      help='Passes specific environment variables to the job')
279
280
  subparser.add_argument('-t', '--array', '--parametric', metavar='[start:]stop[-step]',
      dest='array', help='Creates a parametric (array) job. You must specify the stop value, but start (default=1) and step (default=1) can be specified as well.')
281
282
  subparser.add_argument('-p', '--py', '--python', dest='python', default=False,
      action='store_true', help='Wrap execution of your command using the current python interpreter')
283
  subparser.add_argument('-z', '--dry-run',
284
      action='store_true', help='Do not really submit anything, just print out what would submit in this case')
285
286
  subparser.add_argument('-I', '--io-big', dest='io_big', default=False,
      action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput')
287
288
289
  subparser.add_argument('job', metavar='command', nargs=argparse.REMAINDER)
  subparser.set_defaults(func=submit)

290
291
  execute_parser = cmdparser.add_parser('execute', aliases=['exe', 'x'],
      help='Executes the registered jobs on the local machine; only valid in combination with the \'--local\' option.')
292
293
294
  execute_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be executed by one provided by you', nargs='?')
  execute_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
  execute_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Execute only the jobs with the given ids (by default, all unfinished jobs are executed)')
295
296
  execute_parser.set_defaults(func=execute)

297
298
299
300
301
302
303
304
305
  report_parser = cmdparser.add_parser('report', aliases=['ref', 'r'],
      help='Iterates through the result and error log files and prints out the logs')
  report_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
  report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  report_parser.set_defaults(func=report)
306

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
  # subcommand 'delete'
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
      help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
  delete_parser.add_argument('db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted)')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
  delete_parser.set_defaults(func=delete)


  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.add_argument('db', metavar='DATABASE', nargs='?', help=argparse.SUPPRESS)
#  run_parser.add_argument('--job-id', required = True, type=int, help=argparse.SUPPRESS)
#  run_parser.add_argument('--array-id', type=int, help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
323
324
325
326
327
328

  args = parser.parse_args()

  args.func(args)

  sys.exit(0)