sge.py 8.46 KB
Newer Older
1 2 3 4 5 6 7 8
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST

"""Defines the job manager which can help you managing submitted grid jobs.
"""

9 10
from __future__ import print_function

11 12
from .manager import JobManager
from .setshell import environ
13
from .models import add_job, Job
14
from .tools import logger, qsub, qstat, qdel, make_shell, makedirs_safe
15

16 17 18 19
import os
import sys
import re

20 21 22 23

class JobManagerSGE(JobManager):
  """The JobManager will submit and control the status of submitted jobs"""

24
  def __init__(self, context='grid', **kwargs):
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
    """Initializes this object with a state file and a method for qsub'bing.

    Keyword parameters:

    statefile
      The file containing a valid status database for the manager. If the file
      does not exist it is initialized. If it exists, it is loaded.

    context
      The context to provide when setting up the environment to call the SGE
      utilities such as qsub, qstat and qdel (normally 'grid', which also
      happens to be default)
    """

    self.context = environ(context)
40 41 42 43 44
    JobManager.__init__(self, **kwargs)


  def _queue(self, kwargs):
    """The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
45
    process it we have to split it twice (',' and then on '='), create a
46 47
    dictionary and extract just the qname"""
    if not 'hard resource_list' in kwargs: return 'all.q'
48 49 50 51
    d = dict([k.split('=') for k in kwargs['hard resource_list'].split(',')])
    for k in d:
      if k[0] == 'q' and d[k] == 'TRUE': return k
    return 'all.q'
52 53


54
  def _submit_to_grid(self, job, name, array, dependencies, log_dir, verbosity, **kwargs):
55 56
    # ... what we will actually submit to the grid is a wrapper script that will call the desired command...
    # get the name of the file that was called originally
57
    jman = self.wrapper_script
58 59
    python = sys.executable

60 61 62
    # get the grid id's for the dependencies and remove duplicates
    dependent_jobs = self.get_jobs(dependencies)
    deps = sorted(list(set([j.id for j in dependent_jobs])))
63

64 65
    # make sure log directory is created and is a directory
    makedirs_safe(job.log_dir)
66
    assert os.path.isdir(job.log_dir), "Please make sure --log-dir `{}' either does not exist or is a directory.".format(job.log_dir)
67

68
    # generate call to the wrapper script
69
    command = make_shell(python, [jman, '-d%s' % ('v'*verbosity), self._database, 'run-job'])
70
    q_array = "%d-%d:%d" % array if array else None
71
    grid_id = qsub(command, context=self.context, name=name, deps=deps, array=q_array, stdout=log_dir, stderr=log_dir, **kwargs)
72 73 74

    # get the result of qstat
    status = qstat(grid_id, context=self.context)
75 76

    # set the grid id of the job
77 78
    job.queue(new_job_id = int(status['job_number']), new_job_name = status['job_name'], queue_name = self._queue(status))

79
    logger.info("Submitted job '%s' with dependencies '%s' to the SGE grid." % (job, str(deps)))
80

81 82 83 84
    if 'io_big' in kwargs and kwargs['io_big'] and ('queue' not in kwargs or kwargs['queue'] == 'all.q'):
      logger.warn("This job will never be executed since the 'io_big' flag is not available for the 'all.q'.")
    if 'pe_opt' in kwargs and ('queue' not in kwargs or kwargs['queue'] not in ('q1dm', 'q_1day_mth', 'q1wm', 'q_1week_mth')):
      logger.warn("This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." % kwargs['queue'] if 'queue' in kwargs else 'all.q')
85
    if 'gpumem' in kwargs and 'queue' in kwargs and kwargs['queue'] in ('gpu', 'lgpu', 'sgpu') and int(re.sub("\D", "", kwargs['gpumem'])) > 24:
86
      logger.warn("This job will never be executed since the GPU queue '%s' cannot have more than 24GB of memory." % kwargs['queue'])
87

88
    assert job.id == grid_id
89
    return job.unique
90 91


92
  def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = "logs", dry_run = False, verbosity = 0, stop_on_failure = False, **kwargs):
93 94 95
    """Submits a job that will be executed in the grid."""
    # add job to database
    self.lock()
96
    job = add_job(self.session, command_line, name, dependencies, array, exec_dir=exec_dir, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
97 98
    logger.info("Added job '%s' to the database." % job)
    if dry_run:
99 100 101
      print("Would have added the Job")
      print(job)
      print("to the database to be executed in the grid with options:", str(kwargs))
102 103 104 105 106
      self.session.delete(job)
      logger.info("Deleted job '%s' from the database due to dry-run option" % job)
      job_id = None

    else:
107
      job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, verbosity, **kwargs)
108

109
    self.session.commit()
110 111 112 113 114
    self.unlock()

    return job_id


115 116 117 118 119 120
  def communicate(self, job_ids = None):
    """Communicates with the SGE grid (using qstat) to see if jobs are still running."""
    self.lock()
    # iterate over all jobs
    jobs = self.get_jobs(job_ids)
    for job in jobs:
121
      job.refresh()
122
      if job.status in ('queued', 'executing', 'waiting') and job.queue_name != 'local':
123 124 125 126 127
        status = qstat(job.id, context=self.context)
        if len(status) == 0:
          job.status = 'failure'
          job.result = 70 # ASCII: 'F'
          logger.warn("The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files." % job)
128 129 130 131 132
          for array_job in job.array:
            if array_job.status in ('queued', 'executing'):
              array_job.status = 'failure'
              array_job.result = 70 # ASCII: 'F'

133 134 135 136

    self.session.commit()
    self.unlock()

137

138
  def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, verbosity=0, keep_logs=False, **kwargs):
139
    """Re-submit jobs automatically"""
140 141 142
    self.lock()
    # iterate over all jobs
    jobs = self.get_jobs(job_ids)
143 144 145 146 147
    if new_command is not None:
      if len(jobs) == 1:
        jobs[0].set_command_line(new_command)
      else:
        logger.warn("Ignoring new command since no single job id was specified")
148
    accepted_old_status = ('submitted', 'success', 'failure') if also_success else ('submitted', 'failure',)
149 150
    for job in jobs:
      # check if this job needs re-submission
151
      if running_jobs or job.status in accepted_old_status:
152 153
        grid_status = qstat(job.id, context=self.context)
        if len(grid_status) != 0:
154
          logger.warn("Deleting job '%d' since it was still running in the grid." % job.unique)
155
          qdel(job.id, context=self.context)
156
        # re-submit job to the grid
157 158
        arguments = job.get_arguments()
        arguments.update(**kwargs)
159 160 161 162
        if ('queue' not in arguments or arguments['queue'] == 'all.q'):
          for arg in ('hvmem', 'pe_opt', 'io_big'):
            if arg in arguments:
              del arguments[arg]
163 164
        job.set_arguments(kwargs=arguments)
        # delete old status and result of the job
165 166
        if not keep_logs:
          self.delete_logs(job)
167 168
        job.submit()
        if job.queue_name == 'local' and 'queue' not in arguments:
169 170
          logger.warn("Re-submitting job '%s' locally (since no queue name is specified)." % job)
        else:
171
          deps = [dep.unique for dep in job.get_jobs_we_wait_for()]
172
          logger.debug("Re-submitting job '%s' with dependencies '%s' to the grid." % (job, deps))
173
          self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, verbosity, **arguments)
174

175 176
        # commit after each job to avoid failures of not finding the job during execution in the grid
        self.session.commit()
177
    self.unlock()
178 179


180 181 182 183
  def run_job(self, job_id, array_id = None):
    """Overwrites the run-job command from the manager to extract the correct job id before calling base class implementation."""
    # get the unique job id from the given grid id
    self.lock()
184 185 186 187 188
    jobs = list(self.session.query(Job).filter(Job.id == job_id))
    if len(jobs) != 1:
      self.unlock()
      raise ValueError("Could not find job id '%d' in the database'" % job_id)
    job_id = jobs[0].unique
189 190 191 192 193
    self.unlock()
    # call base class implementation with the corrected job id
    return JobManager.run_job(self, job_id, array_id)


194 195 196
  def stop_jobs(self, job_ids):
    """Stops the jobs in the grid."""
    self.lock()
197

198 199
    jobs = self.get_jobs(job_ids)
    for job in jobs:
200
      if job.status in ('executing', 'queued', 'waiting'):
201 202
        qdel(job.id, context=self.context)
        logger.info("Stopped job '%s' in the SGE grid." % job)
203
        job.submit()
204

205
      self.session.commit()
206
    self.unlock()