tools.py 8.71 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 09:26:46 CEST 

"""Functions that replace the shell based utilities for the grid submission and
probing.
"""

import os
import re
import logging
13
14
import hashlib
import random
André Anjos's avatar
André Anjos committed
15
16

# Constant regular expressions
17
QSTAT_FIELD_SEPARATOR = re.compile(':\s+')
André Anjos's avatar
André Anjos committed
18

19
20
21
22
23
24
def random_logdir():
  """Generates a random log directory for placing the command output"""

  x = hashlib.md5(str(random.randint(100000,999999))).hexdigest()
  return os.path.join(x[:2], x[2:4], x[4:6])

André Anjos's avatar
André Anjos committed
25
26
27
28
29
30
31
32
33
34
35
def makedirs_safe(fulldir):
  """Creates a directory if it does not exists. Takes into consideration
  concurrent access support. Works like the shell's 'mkdir -p'.
  """

  try:
    if not os.path.exists(fulldir): os.makedirs(fulldir)
  except OSError as exc: # Python >2.5
    if exc.errno == errno.EEXIST: pass
    else: raise

Laurent EL SHAFEY's avatar
Laurent EL SHAFEY committed
36
def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
37
    stderr='', env=[], array=None, context='grid', mem=None, hostname=None,
38
    pe_opt=None):
André Anjos's avatar
André Anjos committed
39
40
41
42
43
44
45
46
  """Submits a shell job to a given grid queue
  
  Keyword parameters:

  command
    The command to be submitted to the grid

  queue
André Anjos's avatar
André Anjos committed
47
    A valid queue name or None, to use the default queue
André Anjos's avatar
André Anjos committed
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

  cwd
    If the job should change to the current working directory before starting

  name
    An optional name to set for the job. If not given, defaults to the script
    name being launched.

  deps
    Job ids to which this job will be dependent on

  stdout
    The standard output directory. If not given, defaults to what qsub has as a
    default.

  stderr
    The standard error directory (if not given, defaults to the stdout
    directory).

  env
    This is a list of extra variables that will be set on the environment
    running the command of your choice.

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  array
    If set should be either:
    
    1. a string in the form m[-n[:s]] which indicates the starting range 'm',
       the closing range 'n' and the step 's'. 
    2. an integer value indicating the total number of jobs to be submitted.
       This is equivalent ot set the parameter to a string "1-k:1" where "k" is
       the passed integer value
    3. a tuple that contains either 1, 2 or 3 elements indicating the start,
       end and step arguments ("m", "n", "s").

    The minimum value for "m" is 1. Giving "0" is an error.
    
    If submitted with this option, the job to be created will be an SGE
    parametric job. In this mode SGE does not allow individual control of each
    job. The environment variable SGE_TASK_ID will be set on the executing
    process automatically by SGE and indicates the unique identifier in the
    range for which the current job instance is for.

André Anjos's avatar
André Anjos committed
90
91
92
93
94
95
  context
    The setshell context in which we should try a 'qsub'. Normally you don't
    need to change the default. This variable can also be set to a context
    dictionary in which case we just setup using that context instead of
    probing for a new one, what can be fast.

96
97
98
  mem
    If set, it asks the queue for a node with a minimum amount of memory 
    (cf. qsub -l mem_free=<...> -l h_vmem=<...>)
99

100
101
102
103
104
  hostname
    If set, it asks the queue to use only a subset of the available nodes
    Symbols: | for OR, & for AND, ! for NOT, etc.
    (cf. qsub -l hostname=<...>)

105
106
107
  pe_opt
    If set, add a -pe option when launching a job (for instance pe_exclusive* 1-)

André Anjos's avatar
André Anjos committed
108
109
110
  Returns a list of job ids assigned to this job (integers)
  """

Laurent EL SHAFEY's avatar
Laurent EL SHAFEY committed
111
112
  scmd = ['qsub']

André Anjos's avatar
André Anjos committed
113
  if isinstance(queue, str) and queue not in ('all.q', 'default'):
Laurent EL SHAFEY's avatar
Laurent EL SHAFEY committed
114
    scmd += ['-l', queue]
André Anjos's avatar
André Anjos committed
115

116
117
118
  if mem: 
    scmd += ['-l', 'mem_free=%s' % mem]
    scmd += ['-l', 'h_vmem=%s' % mem]
119

120
121
  if hostname: scmd += ['-l', 'hostname=%s' % hostname]

Laurent EL SHAFEY's avatar
Laurent EL SHAFEY committed
122
  if pe_opt: scmd += ['-pe'] + pe_opt.split()
123

André Anjos's avatar
André Anjos committed
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
  if cwd: scmd += ['-cwd']

  if name: scmd += ['-N', name]

  if deps: scmd += ['-hold_jid', ','.join(['%d' % k for k in deps])]

  if stdout:
    
    if not cwd:
      # pivot, temporarily, to home directory
      curdir = os.path.realpath(os.curdir)
      os.chdir(os.environ['HOME'])
    
    if not os.path.exists(stdout): makedirs_safe(stdout)

    if not cwd:
      # go back
      os.chdir(os.path.realpath(curdir))

    scmd += ['-o', stdout]

  if stderr:
146
    if not os.path.exists(stderr): makedirs_safe(stderr)
André Anjos's avatar
André Anjos committed
147
148
149
150
151
152
153
154
    scmd += ['-e', stderr]
  elif stdout: #just re-use the stdout settings
    scmd += ['-e', stdout]

  scmd += ['-terse'] # simplified job identifiers returned by the command line

  for k in env: scmd += ['-v', k]

155
156
157
158
159
160
161
162
163
164
165
166
167
168
  if array is not None:
    scmd.append('-t')
    if isinstance(array, (str, unicode, int, long)):
      scmd.append('%s' % array)
    if isinstance(array, (tuple, list)):
      if len(array) < 1 or len(array) > 3:
        raise RuntimeError, "Array tuple should have length between 1 and 3"
      elif len(array) == 1:
        scmd.append('%s' % array[0])
      elif len(array) == 2:
        scmd.append('%s-%s' % (array[0], array[1]))
      elif len(array) == 3:
        scmd.append('%s-%s:%s' % (array[0], array[1], array[2]))

André Anjos's avatar
André Anjos committed
169
170
171
172
173
174
175
176
  if not isinstance(command, (list, tuple)): command = [command]
  scmd += command

  logging.debug("Qsub command '%s'", ' '.join(scmd))
  from .setshell import sexec
  jobid = sexec(context, scmd)
  return int(jobid.split('.',1)[0])

André Anjos's avatar
André Anjos committed
177
def make_shell(shell, command):
André Anjos's avatar
André Anjos committed
178
179
  """Returns a single command given a shell and a command to be qsub'ed
  
André Anjos's avatar
André Anjos committed
180
  Keyword parameters:
André Anjos's avatar
André Anjos committed
181
182
183
184
185
186
187

  shell
    The path to the shell to use when submitting the job.

  command
    The script path to be submitted

André Anjos's avatar
André Anjos committed
188
  Returns the command parameters to be supplied to qsub()
André Anjos's avatar
André Anjos committed
189
190
  """

André Anjos's avatar
André Anjos committed
191
  return ['-S', shell] + command
André Anjos's avatar
André Anjos committed
192

André Anjos's avatar
André Anjos committed
193
def make_python_wrapper(wrapper, command):
André Anjos's avatar
André Anjos committed
194
195
196
  """Returns a single command given a python wrapper and a command to be
  qsub'ed by that wrapper.
  
André Anjos's avatar
André Anjos committed
197
  Keyword parameters:
André Anjos's avatar
André Anjos committed
198
199
200
201
202
203
204
205
206

  wrapper
    This is the python wrapper to be used for prefixing the environment in
    which the **command** will execute. This parameter must be either a path to
    the wrapper or a list with the wrapper and **wrapper** command options.

  command
    The script path to be submitted

André Anjos's avatar
André Anjos committed
207
  Returns the wrapper command to be supplied to qsub()
André Anjos's avatar
André Anjos committed
208
209
210
211
  """

  if not isinstance(wrapper, (list, tuple)): wrapper = [wrapper]
  if not isinstance(command, (list, tuple)): command = [command]
André Anjos's avatar
André Anjos committed
212
  return make_shell('/usr/bin/python', wrapper + command)
André Anjos's avatar
André Anjos committed
213

214
def make_torch_wrapper(torch, debug, command):
André Anjos's avatar
André Anjos committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
  """Submits a command using the Torch python wrapper so the **command**
  executes in a valid Torch context.
  
  Keyword parameters: (please read the help of qsub())
    (read the help of qsub() for details on extra arguments that may be
    supplied)

  torch
    This is the root directory for the torch installation you would like to use
    for wrapping the execution of **command**.

  debug
    If set, this flag will switch the torch libraries to debug versions with
    symbols loaded.

  command
    The script path to be submitted

233
  Returns the command and environment parameters to be supplied to qsub()
André Anjos's avatar
André Anjos committed
234
  """
235

André Anjos's avatar
André Anjos committed
236
237
238
239
240
241
242
243
244
  binroot = os.path.join(torch, 'bin')
  shell = os.path.join(binroot, 'shell.py')
  if not os.path.exists(shell):
    raise RuntimeError, 'Cannot locate wrapper "%s"' % shell

  wrapper = [shell]

  if debug: wrapper += ['--debug']

245
  env = 'OVERWRITE_TORCH5SPRO_BINROOT=%s' % binroot
André Anjos's avatar
André Anjos committed
246

247
  return make_python_wrapper(wrapper, command), env
André Anjos's avatar
André Anjos committed
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278

def qstat(jobid, context='grid'):
  """Queries status of a given job.
  
  Keyword parameters:

  jobid
    The job identifier as returned by qsub()
  
  context
    The setshell context in which we should try a 'qsub'. Normally you don't
    need to change the default. This variable can also be set to a context
    dictionary in which case we just setup using that context instead of
    probing for a new one, what can be fast.

  Returns a dictionary with the specific job properties
  """

  scmd = ['qstat', '-j', '%d' % jobid, '-f']

  logging.debug("Qstat command '%s'", ' '.join(scmd))

  from .setshell import sexec
  data = sexec(context, scmd, error_on_nonzero=False)

  # some parsing:
  retval = {}
  for line in data.split('\n'):
    s = line.strip()
    if s.lower().find('do not exist') != -1: return {}
    if not s or s.find(10*'=') != -1: continue
Laurent EL SHAFEY's avatar
Laurent EL SHAFEY committed
279
280
    kv = QSTAT_FIELD_SEPARATOR.split(s, 1)
    if len(kv) == 2: retval[kv[0]] = kv[1]
André Anjos's avatar
André Anjos committed
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304

  return retval

def qdel(jobid, context='grid'):
  """Halts a given job.
  
  Keyword parameters:

  jobid
    The job identifier as returned by qsub()
  
  context
    The setshell context in which we should try a 'qsub'. Normally you don't
    need to change the default. This variable can also be set to a context
    dictionary in which case we just setup using that context instead of
    probing for a new one, what can be fast.
  """

  scmd = ['qdel', '%d' % jobid]

  logging.debug("Qdel command '%s'", ' '.join(scmd))

  from .setshell import sexec
  sexec(context, scmd, error_on_nonzero=False)