local.py 10.5 KB
Newer Older
1 2 3 4 5 6 7 8
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST

"""Defines the job manager which can help you managing submitted grid jobs.
"""

9 10
from __future__ import print_function

11 12 13 14
import subprocess
import time
import copy, os, sys

15 16 17 18
if sys.version_info[0] >= 3:
  from pickle import dumps, loads
else:
  from cPickle import dumps, loads
19

20
from .tools import makedirs_safe, logger, str_
21 22


23 24
from .manager import JobManager
from .models import add_job, Job
25

26
class JobManagerLocal(JobManager):
27
  """Manages jobs run in parallel on the local machine."""
28
  def __init__(self, **kwargs):
29 30 31 32 33 34 35 36 37
    """Initializes this object with a state file and a method for qsub'bing.

    Keyword parameters:

    statefile
      The file containing a valid status database for the manager. If the file
      does not exist it is initialized. If it exists, it is loaded.

    """
38
    JobManager.__init__(self, **kwargs)
39 40


41
  def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
42 43
    """Submits a job that will be executed on the local machine during a call to "run".
    All kwargs will simply be ignored."""
44 45 46
    # remove duplicate dependencies
    dependencies = sorted(list(set(dependencies)))

47 48
    # add job to database
    self.lock()
49 50 51 52
    job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir, stop_on_failure=stop_on_failure)
    logger.info("Added job '%s' to the database" % job)

    if dry_run:
53
      print("Would have added the Job", job, "to the database to be executed locally.")
54 55 56 57
      self.session.delete(job)
      logger.info("Deleted job '%s' from the database due to dry-run option" % job)
      job_id = None
    else:
58
      job_id = job.unique
59

60 61 62
    # return the new job id
    self.unlock()
    return job_id
63 64


65
  def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, **kwargs):
66 67 68 69
    """Re-submit jobs automatically"""
    self.lock()
    # iterate over all jobs
    jobs = self.get_jobs(job_ids)
70 71 72 73 74
    if new_command is not None:
      if len(jobs) == 1:
        jobs[0].set_command_line(new_command)
      else:
        logger.warn("Ignoring new command since no single job id was specified")
75
    accepted_old_status = ('success', 'failure') if also_success else ('failure',)
76 77
    for job in jobs:
      # check if this job needs re-submission
78 79
      if running_jobs or job.status in accepted_old_status:
        # re-submit job to the grid
80 81
        logger.info("Re-submitted job '%s' to the database" % job)
        job.submit('local')
82 83 84 85 86

    self.session.commit()
    self.unlock()


87 88
  def stop_jobs(self, job_ids=None):
    """Resets the status of the job to 'submitted' when they are labeled as 'executing'."""
89 90 91 92
    self.lock()

    jobs = self.get_jobs(job_ids)
    for job in jobs:
93
      if job.status in ('executing', 'queued', 'waiting') and job.queue_name == 'local':
94 95
        logger.info("Reset job '%s' in the database" % job.name)
        job.submit()
96 97 98 99 100

    self.session.commit()
    self.unlock()

  def stop_job(self, job_id, array_id = None):
101
    """Resets the status of the given to 'submitted' when they are labeled as 'executing'."""
102 103 104
    self.lock()

    job, array_job = self._job_and_array(job_id, array_id)
105
    if job is not None:
106 107
      if job.status in ('executing', 'queued', 'waiting'):
        logger.info("Reset job '%s' in the database" % job.name)
108
        job.status = 'submitted'
109

110
      if array_job is not None and array_job.status in ('executing', 'queued', 'waiting'):
111 112
        logger.debug("Reset array job '%s' in the database" % array_job)
        array_job.status = 'submitted'
113 114 115 116 117
      if array_job is None:
        for array_job in job.array:
          if array_job.status in ('executing', 'queued', 'waiting'):
            logger.debug("Reset array job '%s' in the database" % array_job)
            array_job.status = 'submitted'
118 119 120 121 122 123 124 125

    self.session.commit()
    self.unlock()


#####################################################################
###### Methods to run the jobs in parallel on the local machine #####

126
  def _run_parallel_job(self, job_id, array_id = None, no_log = False, nice = None):
127 128 129 130 131 132 133
    """Executes the code for this job on the local machine."""
    environ = copy.deepcopy(os.environ)
    environ['JOB_ID'] = str(job_id)
    if array_id:
      environ['SGE_TASK_ID'] = str(array_id)
    else:
      environ['SGE_TASK_ID'] = 'undefined'
134

135
    # generate call to the wrapper script
136 137
    command = [self.wrapper_script, '-ld', self._database, 'run-job']

138 139 140
    if nice is not None:
      command = ['nice', '-n%d'%nice] + command

141
    job, array_job = self._job_and_array(job_id, array_id)
142
    logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id, len(job.array)), job.name))
143 144 145 146 147
    # create log files
    if no_log or job.log_dir is None:
      out, err = sys.stdout, sys.stderr
    else:
      makedirs_safe(job.log_dir)
148
      # create line-buffered files for writing output and error status
149
      if array_job is not None:
150
        out, err = open(array_job.std_out_file(), 'w', 1), open(array_job.std_err_file(), 'w', 1)
151
      else:
152
        out, err = open(job.std_out_file(), 'w', 1), open(job.std_err_file(), 'w', 1)
153

154 155
    # return the subprocess pipe to the process
    try:
156
      return subprocess.Popen(command, env=environ, stdout=out, stderr=err, bufsize=1)
157
    except OSError as e:
158
      logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id, len(job.array)), e, job.get_command_line()))
159
      job.finish(117, array_id) # ASCII 'O'
160
      return None
161 162


163 164
  def _format_log(self, job_id, array_id = None, array_count = 0):
    return ("%d (%d/%d)" % (job_id, array_id, array_count)) if array_id is not None and array_count else ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
165

166
  def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False, nice = None):
167 168 169
    """Starts the scheduler, which is constantly checking for jobs that should be ran."""
    running_tasks = []
    try:
170

171
      # keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught
172
      while True:
173 174
        # Flag that might be set in some rare cases, and that prevents the scheduler to die
        repeat_execution = False
175
        # FIRST, try if there are finished processes
176 177 178
        for task_index in range(len(running_tasks)-1, -1, -1):
          task = running_tasks[task_index]
          process = task[0]
179

180 181 182 183
          if process.poll() is not None:
            # process ended
            job_id = task[1]
            array_id = task[2] if len(task) > 2 else None
184 185 186
            self.lock()
            job, array_job = self._job_and_array(job_id, array_id)
            if array_job: job = array_job
187
            result = "%s (%d)" % (job.status, job.result) if job.result is not None else "%s (?)" % job.status
188 189
            self.unlock()
            logger.info("Job '%s' finished execution with result %s" % (self._format_log(job_id, array_id), result))
190 191
            # in any case, remove the job from the list
            del running_tasks[task_index]
192

193 194 195 196
        # SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE
        if len(running_tasks) < parallel_jobs:
          # get all unfinished jobs:
          self.lock()
197
          jobs = self.get_jobs(job_ids)
198 199
          # put all new jobs into the queue
          for job in jobs:
200
            if job.status == 'submitted' and job.queue_name == 'local':
201
              job.queue()
202 203 204

          # get all unfinished jobs that are submitted to the local queue
          unfinished_jobs = [job for job in jobs if job.status in ('queued', 'executing') and job.queue_name == 'local']
205 206 207
          for job in unfinished_jobs:
            if job.array:
              # find array jobs that can run
208 209 210 211 212 213 214 215
              queued_array_jobs = [array_job for array_job in job.array if array_job.status == 'queued']
              if not len(queued_array_jobs):
                job.finish(0, -1)
                repeat_execution = True
              else:
                # there are new array jobs to run
                for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
                  array_job = queued_array_jobs[i]
216
                  # start a new job from the array
217
                  process = self._run_parallel_job(job.unique, array_job.id, no_log=no_log, nice=nice)
218 219
                  if process is None:
                    continue
220
                  running_tasks.append((process, job.unique, array_job.id))
221 222 223 224 225 226
                  # we here set the status to executing manually to avoid jobs to be run twice
                  # e.g., if the loop is executed while the asynchronous job did not start yet
                  array_job.status = 'executing'
                  job.status = 'executing'
                  if len(running_tasks) == parallel_jobs:
                    break
227
            else:
228 229
              if job.status == 'queued':
                # start a new job
230
                process = self._run_parallel_job(job.unique, no_log=no_log, nice=nice)
231 232
                if process is None:
                  continue
233
                running_tasks.append((process, job.unique))
234 235 236 237 238 239 240 241 242
                # we here set the status to executing manually to avoid jobs to be run twice
                # e.g., if the loop is executed while the asynchronous job did not start yet
                job.status = 'executing'
            if len(running_tasks) == parallel_jobs:
              break

          self.session.commit()
          self.unlock()

243 244 245 246 247
        # if after the submission of jobs there are no jobs running, we should have finished all the queue.
        if die_when_finished and not repeat_execution and len(running_tasks) == 0:
          logger.info("Stopping task scheduler since there are no more jobs running.")
          break

248 249 250 251 252
        # THIRD: sleep the desired amount of time before re-checking
        time.sleep(sleep_time)

    # This is the only way to stop: you have to interrupt the scheduler
    except KeyboardInterrupt:
253 254
      if hasattr(self, 'session'):
        self.unlock()
255 256 257 258
      logger.info("Stopping task scheduler due to user interrupt.")
      for task in running_tasks:
        logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
        task[0].kill()
259
        self.stop_job(task[1])
260
      # stop all jobs that are currently running or queued
261
      self.stop_jobs(job_ids)