sge.py 7.11 KB
Newer Older
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST

"""Defines the job manager which can help you managing submitted grid jobs.
"""

9
10
from __future__ import print_function

11
12
from .manager import JobManager
from .setshell import environ
13
from .models import add_job, Job
14
from .tools import logger, qsub, qstat, qdel, make_shell
15
16
17
18
19
20

import os, sys

class JobManagerSGE(JobManager):
  """The JobManager will submit and control the status of submitted jobs"""

21
  def __init__(self, context='grid', **kwargs):
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    """Initializes this object with a state file and a method for qsub'bing.

    Keyword parameters:

    statefile
      The file containing a valid status database for the manager. If the file
      does not exist it is initialized. If it exists, it is loaded.

    context
      The context to provide when setting up the environment to call the SGE
      utilities such as qsub, qstat and qdel (normally 'grid', which also
      happens to be default)
    """

    self.context = environ(context)
37
38
39
40
41
    JobManager.__init__(self, **kwargs)


  def _queue(self, kwargs):
    """The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
42
    process it we have to split it twice (',' and then on '='), create a
43
44
    dictionary and extract just the qname"""
    if not 'hard resource_list' in kwargs: return 'all.q'
45
46
47
48
    d = dict([k.split('=') for k in kwargs['hard resource_list'].split(',')])
    for k in d:
      if k[0] == 'q' and d[k] == 'TRUE': return k
    return 'all.q'
49
50


51
  def _submit_to_grid(self, job, name, array, dependencies, log_dir, **kwargs):
52
53
    # ... what we will actually submit to the grid is a wrapper script that will call the desired command...
    # get the name of the file that was called originally
54
    jman = self.wrapper_script
55
56
    python = sys.executable

57
58
59
    # remove duplicate dependencies
    dependencies = sorted(list(set(dependencies)))

60
    # generate call to the wrapper script
61
    command = make_shell(python, [jman, '-d', self._database, 'run-job'])
62
    q_array = "%d-%d:%d" % array if array else None
63
64
65
66
    grid_id = qsub(command, context=self.context, name=name, deps=dependencies, array=q_array, stdout=log_dir, stderr=log_dir, **kwargs)

    # get the result of qstat
    status = qstat(grid_id, context=self.context)
67
68

    # set the grid id of the job
69
70
71
72
    job.queue(new_job_id = int(status['job_number']), new_job_name = status['job_name'], queue_name = self._queue(status))

    logger.info("Submitted job '%s' to the SGE grid." % job)

73
74
75
76
77
    if 'io_big' in kwargs and kwargs['io_big'] and ('queue' not in kwargs or kwargs['queue'] == 'all.q'):
      logger.warn("This job will never be executed since the 'io_big' flag is not available for the 'all.q'.")
    if 'pe_opt' in kwargs and ('queue' not in kwargs or kwargs['queue'] not in ('q1dm', 'q_1day_mth', 'q1wm', 'q_1week_mth')):
      logger.warn("This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." % kwargs['queue'] if 'queue' in kwargs else 'all.q')

78
    assert job.id == grid_id
79
    return grid_id
80
81


82
  def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
83
84
85
    """Submits a job that will be executed in the grid."""
    # add job to database
    self.lock()
86
87
88
    job = add_job(self.session, command_line, name, dependencies, array, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
    logger.info("Added job '%s' to the database." % job)
    if dry_run:
89
90
91
      print("Would have added the Job")
      print(job)
      print("to the database to be executed in the grid with options:", str(kwargs))
92
93
94
95
96
97
      self.session.delete(job)
      logger.info("Deleted job '%s' from the database due to dry-run option" % job)
      job_id = None

    else:
      job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, **kwargs)
98

99
    self.session.commit()
100
101
102
103
104
    self.unlock()

    return job_id


105
106
107
108
109
110
  def communicate(self, job_ids = None):
    """Communicates with the SGE grid (using qstat) to see if jobs are still running."""
    self.lock()
    # iterate over all jobs
    jobs = self.get_jobs(job_ids)
    for job in jobs:
111
      job.refresh()
112
      if job.status in ('queued', 'executing', 'waiting'):
113
114
115
116
117
        status = qstat(job.id, context=self.context)
        if len(status) == 0:
          job.status = 'failure'
          job.result = 70 # ASCII: 'F'
          logger.warn("The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files." % job)
118
119
120
121
122
          for array_job in job.array:
            if array_job.status in ('queued', 'executing'):
              array_job.status = 'failure'
              array_job.result = 70 # ASCII: 'F'

123
124
125
126

    self.session.commit()
    self.unlock()

127

128
  def resubmit(self, job_ids = None, also_success = False, running_jobs = False, **kwargs):
129
    """Re-submit jobs automatically"""
130
131
132
    self.lock()
    # iterate over all jobs
    jobs = self.get_jobs(job_ids)
133
    accepted_old_status = ('success', 'failure') if also_success else ('failure',)
134
135
    for job in jobs:
      # check if this job needs re-submission
136
      if running_jobs or job.status in accepted_old_status:
137
138
        grid_status = qstat(job.id, context=self.context)
        if len(grid_status) != 0:
139
          logger.warn("Deleting job '%d' since it was still running in the grid." % job.unique)
140
          qdel(job.id, context=self.context)
141
        # re-submit job to the grid
142
143
144
145
146
147
        arguments = job.get_arguments()
        arguments.update(**kwargs)
        job.set_arguments(kwargs=arguments)
        # delete old status and result of the job
        job.submit()
        if job.queue_name == 'local' and 'queue' not in arguments:
148
149
          logger.warn("Re-submitting job '%s' locally (since no queue name is specified)." % job)
        else:
150
151
152
          deps = [dep.id for dep in job.get_jobs_we_wait_for()]
          logger.debug("Re-submitting job '%s' with dependencies '%s' to the grid." % (job, deps))
          self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, **arguments)
153

154
155
    self.session.commit()
    self.unlock()
156
157


158
159
160
161
162
163
164
165
166
167
168
  def run_job(self, job_id, array_id = None):
    """Overwrites the run-job command from the manager to extract the correct job id before calling base class implementation."""
    # get the unique job id from the given grid id
    self.lock()
    job = self.session.query(Job).filter(Job.id == job_id)
    job_id = list(job)[0].unique
    self.unlock()
    # call base class implementation with the corrected job id
    return JobManager.run_job(self, job_id, array_id)


169
170
171
  def stop_jobs(self, job_ids):
    """Stops the jobs in the grid."""
    self.lock()
172

173
174
    jobs = self.get_jobs(job_ids)
    for job in jobs:
175
      if job.status in ('executing', 'queued', 'waiting'):
176
177
178
        qdel(job.id, context=self.context)
        logger.info("Stopped job '%s' in the SGE grid." % job)
        job.status = 'submitted'
179
180
181
182
        for array_job in job.array:
          if array_job.status in ('executing', 'queued', 'waiting'):
            array_job.status = 'submitted'

183

184
      self.session.commit()
185
    self.unlock()