jman.py 14.8 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
4
# Wed 24 Aug 2011 16:13:31 CEST
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

"""A logging Idiap/SGE job manager
"""

__epilog__ = """ For a list of available commands:
  >>> %(prog)s --help

  For a list of options for a particular command:
  >>> %(prog)s <command> --help
"""

import os
import sys
import anydbm
from cPickle import dumps

import argparse

23
from .. import local, sge
24
from ..tools import make_shell, random_logdir, logger
25
26
27
28

def setup(args):
  """Returns the JobManager and sets up the basic infrastructure"""

29
  kwargs = {'wrapper_script' : args.wrapper_script}
30
  if args.db: kwargs['database'] = args.db
31
  if args.local:
32
    jm = local.JobManagerLocal(**kwargs)
33
  else:
34
    jm = sge.JobManagerSGE(**kwargs)
35
36

  # set-up logging
37
  import logging
38
  if args.debug:
39
40
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
41
42
43
  else:
    logger.setLevel(logging.WARNING)

44
45
46

  return jm

47
48
49
50
51
52
53
54
def get_array(array):
  if array is None:
    return None
  start = array.find('-')
  if start == -1:
    a = 1
    b = int(array)
    c = 1
55
  else:
56
57
58
59
60
61
62
63
    a = int(array[0:start])
    step = array.find(':')
    if step == -1:
      b = int(array[start+1:])
      c = 1
    else:
      b = int(array[start+1:step])
      c = int(array[step+1])
64

65
  return (a,b,c)
66
67
68
69
70
71


def submit(args):
  """Submission command"""

  # set full path to command
72
  if not os.path.isabs(args.job[0]):
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    args.job[0] = os.path.abspath(args.job[0])

  # automatically set interpreter if required
  if args.python or os.path.splitext(args.job[0])[1] in ('.py',):
    args.job = make_shell(sys.executable, args.job)


  jm = setup(args)
  kwargs = {
      'queue': args.qname,
      'cwd': True,
      'name': args.name,
      'env': args.env,
      'memfree': args.memory,
      'hvmem': args.memory,
88
      'io_big': args.io_big,
89
90
      }

91
92
93
94
  if args.array is not None:         kwargs['array'] = get_array(args.array)
  if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
  if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies

95
96
97
98
99
100
  if args.dry_run:
    print '-> Job', args.job, 'to', args.qname, 'with',
    print 'queue:', args.qname,
    print 'memory:', args.memory,
    print 'array:', args.array,
    print 'deps:', args.deps,
101
102
    print 'env:', args.env,
    print 'io_big:', args.io_big
103
104
    return

105
106
107
  # submit the job
  job_id = jm.submit(args.job, **kwargs)

108
109

def resubmit(args):
110
  """Re-submits the jobs with the given ids."""
111
  jm = setup(args)
112
113
  if args.clean:
    jm.delete(job_ids=args.job_ids, delete_jobs = False)
114
  jm.resubmit(args.job_ids, args.failed_only, args.running_jobs)
115

116

117
118
119
120
121
def execute(args):
  """Executes the collected jobs on the local machine."""
  if not args.local:
    raise ValueError("The execute command can only be used with the '--local' command line option")
  jm = setup(args)
122
123
124
  jm.run(parallel_jobs=args.parallel, job_ids=args.job_ids)


125
def list(args):
126
127
  """Lists the jobs in the given database."""
  jm = setup(args)
128
  jm.list(args.job_ids, args.print_array_jobs, args.print_dependencies)
129
130
131
132
133


def report(args):
  """Reports the results of the finished (and unfinished) jobs."""
  jm = setup(args)
134
135
136
137
138
139
140
141
142
  jm.report(job_ids=args.job_ids, array_ids=args.array_ids, unfinished=args.unfinished_also, output=not args.errors_only, error=not args.output_only)


def stop(args):
  """Stops (qdel's) the jobs with the given ids."""
  if args.local:
    raise ValueError("Stopping commands locally is not supported (please kill them yourself)")
  jm = setup(args)
  jm.stop_jobs(args.job_ids)
143
144
145


def delete(args):
146
  """Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
147
  jm = setup(args)
148
149
150
151
152
  # first, stop the jobs if they are running in the grid
  if not args.local:
    stop(args)
  # then, delete them from the database
  jm.delete(job_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)
153
154
155


def run_job(args):
156
  """Starts the wrapper script to execute a job, interpreting the JOB_ID and SGE_TASK_ID keywords that are set by the grid or by us."""
157
158
159
160
161
  jm = setup(args)
  job_id = int(os.environ['JOB_ID'])
  array_id = int(os.environ['SGE_TASK_ID']) if os.environ['SGE_TASK_ID'] != 'undefined' else None
  jm.run_job(job_id, array_id)

162

163
class AliasedSubParsersAction(argparse._SubParsersAction):
164
  """Hack taken from https://gist.github.com/471779 to allow aliases in
165
166
167
168
169
170
171
172
173
  argparse for python 2.x (this has been implemented on python 3.2)
  """

  class _AliasedPseudoAction(argparse.Action):
    def __init__(self, name, aliases, help):
      dest = name
      if aliases:
        dest += ' (%s)' % ','.join(aliases)
      sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
174
      sup.__init__(option_strings=[], dest=dest, help=help)
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

  def add_parser(self, name, **kwargs):
    if 'aliases' in kwargs:
      aliases = kwargs['aliases']
      del kwargs['aliases']
    else:
      aliases = []

    parser = super(AliasedSubParsersAction, self).add_parser(name, **kwargs)

    # Make the aliases work.
    for alias in aliases:
      self._name_parser_map[alias] = parser
    # Make the help text reflect them, first removing old help entry.
    if 'help' in kwargs:
      help = kwargs.pop('help')
      self._choices_actions.pop()
      pseudo_action = self._AliasedPseudoAction(name, aliases, help)
      self._choices_actions.append(pseudo_action)

    return parser

197

198
def main(command_line_options = None):
199
200
201
202
203
204
205
206
207
208
209
210
211

  from ..config import __version__

  parser = argparse.ArgumentParser(description=__doc__, epilog=__epilog__,
      formatter_class=argparse.RawDescriptionHelpFormatter)
  # part of the hack to support aliases in subparsers
  parser.register('action', 'parsers', AliasedSubParsersAction)

  # general options
  parser.add_argument('-v', '--verbose', dest='verbose', default=False,
      action='store_true', help='increase verbosity for this script')
  parser.add_argument('-g', '--debug', dest='debug', default=False,
      action='store_true', help='prints out lots of debugging information')
212
  parser.add_argument('-V', '--version', action='version',
213
      version='GridTk version %s' % __version__)
214
215
216

  parser.add_argument('-l', '--local', action='store_true',
        help = 'Uses the local job manager instead of the SGE one.')
217
  cmdparser = parser.add_subparsers(title='commands', help='commands accepted by %(prog)s')
218

219
  # subcommand 'submit'
220
  submit_parser = cmdparser.add_parser('submit', aliases=['sub'],
221
      help='submits self-contained jobs to the SGE queue and logs them in a private database')
222
223
  submit_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  submit_parser.add_argument('-q', '--queue', metavar='QNAME',
224
      dest='qname', default='all.q', help='the name of the SGE queue to submit the job to (defaults to "%(default)s")')
225
226
227
  submit_parser.add_argument('-m', '--memory', dest='memory', help='Sets both the h_vmem **and** the mem_free parameters when submitting the job to the specified value (e.g. 8G to set the memory requirements to 8 gigabytes)')
  submit_parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
  submit_parser.add_argument('-x', '--dependencies', type=int,
228
      default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
229
230
  submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected. If the jobs are executed locally, by default the result is written to console.')
  submit_parser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
231
232
      dest='env', nargs='*', default=[],
      help='Passes specific environment variables to the job')
233
  submit_parser.add_argument('-t', '--array', '--parametric', metavar='[start:]stop[-step]',
234
      dest='array', help='Creates a parametric (array) job. You must specify the stop value, but start (default=1) and step (default=1) can be specified as well.')
235
  submit_parser.add_argument('-p', '--py', '--python', dest='python', default=False,
236
      action='store_true', help='Wrap execution of your command using the current python interpreter')
237
  submit_parser.add_argument('-z', '--dry-run',
238
      action='store_true', help='Do not really submit anything, just print out what would submit in this case')
239
  submit_parser.add_argument('-I', '--io-big', dest='io_big', default=False,
240
      action='store_true', help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput')
241
242
243
244
245
246
247
248
  submit_parser.add_argument('job', metavar='command', nargs=argparse.REMAINDER)
  submit_parser.set_defaults(func=submit)

  # re-submit parser
  resubmit_parser = cmdparser.add_parser('resubmit', aliases=['re'],
      help='Re-submits a list of jobs')
  resubmit_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  resubmit_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
249
  resubmit_parser.add_argument('-c', '--clean', action='store_true', help='Clean the log files of the old job before re-submitting')
250
251
252
253
254
255
256
257
258
  resubmit_parser.add_argument('-f', '--failed-only', action='store_true', help='Re-submit only jobs that have failed')
  resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting')
  resubmit_parser.set_defaults(func=resubmit)

  # stop parser
  stop_parser = cmdparser.add_parser('stop', help='Stops the execution of jobs in the grid')
  stop_parser.add_argument('-d', '--db', '--database', metavar='DATABASE', help='replace the default database to be used by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database')
  stop_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Stop only the jobs with the given ids (by default, all jobs are stopped)')
  stop_parser.set_defaults(func=stop)
259

260
261
262
263
264
265
266
267
  # subcommand 'list'
  list_parser = cmdparser.add_parser('list', aliases=['ls'],
      help='lists jobs stored in the database')
  list_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database by one provided by you; this option is only required if you are running outside the directory where you originally submitted the jobs from or if you have altered manually the location of the JobManager database', nargs='?')
  list_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='List only the jobs with the given ids (by default, all jobs are listed)')
  list_parser.add_argument('-a', '--print-array-jobs', action='store_true', help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  list_parser.add_argument('-x', '--print-dependencies', action='store_true', help='Print the dependencies of the jobs as well.')
  list_parser.set_defaults(func=list)
268

269
  # report parser
270
271
  report_parser = cmdparser.add_parser('report', aliases=['ref', 'r'],
      help='Iterates through the result and error log files and prints out the logs')
272
  report_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
273
274
275
276
277
278
  report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).')
  report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs  (by default, both logs are reported).')
  report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.')
  report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)')
  report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  report_parser.set_defaults(func=report)
279

280
281
282
  # subcommand 'delete'
  delete_parser = cmdparser.add_parser('delete', aliases=['del', 'rm', 'remove'],
      help='removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue')
283
  delete_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be reported by one provided by you', nargs='?')
284
285
286
287
288
289
  delete_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given ids (by default, all jobs are deleted)')
  delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well.')
  delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
  delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
  delete_parser.set_defaults(func=delete)

290
291
292
293
294
295
296
297
  # subcommand 'execute'
  execute_parser = cmdparser.add_parser('execute', aliases=['exe', 'x'],
      help='Executes the registered jobs on the local machine; only valid in combination with the \'--local\' option.')
  execute_parser.add_argument('-d', '--db', metavar='DATABASE', help='replace the default database to be executed by one provided by you', nargs='?')
  execute_parser.add_argument('-p', '--parallel', type=int, default=1, help='Select the number of parallel jobs that you want to execute locally')
  execute_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Execute only the jobs with the given ids (by default, all unfinished jobs are executed)')
  execute_parser.set_defaults(func=execute)

298

299
  # subcommand 'run-job'; this is not seen on the command line since it is actually a wrapper script
300
301
302
  run_parser = cmdparser.add_parser('run-job', help=argparse.SUPPRESS)
  run_parser.add_argument('db', metavar='DATABASE', nargs='?', help=argparse.SUPPRESS)
  run_parser.set_defaults(func=run_job)
303

304
305
306
307
308
309
310

  if command_line_options:
    args = parser.parse_args(command_line_options[1:])
    args.wrapper_script = command_line_options[0]
  else:
    args = parser.parse_args()
    args.wrapper_script = sys.argv[0]
311
312
313

  args.func(args)

314
  return 0