manager.py 11.6 KB
Newer Older
André Anjos's avatar
André Anjos committed
1

2 3
from __future__ import print_function

André Anjos's avatar
André Anjos committed
4
import os
5
import subprocess
6
import socket # to get the host name
7
from .models import Base, Job, ArrayJob, Status
8
from .tools import logger
André Anjos's avatar
André Anjos committed
9

10
import sqlalchemy
11

12
"""This file defines a minimum Job Manager interface."""
13
sqlalchemy_version = [int(v) for v in sqlalchemy.__version__.split('.')]
14

15
class JobManager:
16
  """This job manager defines the basic interface for handling jobs in the SQL database."""
17

18 19 20
  def __init__(self, database, wrapper_script = './bin/jman', debug = False):
    self._database = os.path.realpath(database)
    self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=debug)
21
    self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
22 23 24

    # store the command that this job manager was called with
    self.wrapper_script = wrapper_script
25 26


27
  def __del__(self):
28
    # remove the database if it is empty
29
    if os.path.isfile(self._database):
30 31 32
      # in errornous cases, the session might still be active, so don't create a deadlock here!
      if not hasattr(self, 'session'):
        self.lock()
33 34 35
      job_count = len(self.get_jobs())
      self.unlock()
      if not job_count:
36
        logger.debug("Removed database file '%s' since database is empty" % self._database)
37
        os.remove(self._database)
38

39

40
  def lock(self):
41 42 43
    """Generates (and returns) a blocking session object to the database."""
    if hasattr(self, 'session'):
      raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
44

45
    if sqlalchemy_version < [0,7,8]:
46 47 48 49 50
      # for old sqlalchemy versions, in some cases it is required to re-generate the enging for each session
      self._engine = sqlalchemy.create_engine("sqlite:///"+self._database)
      self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)

    # create the database if it does not exist yet
51 52
    if not os.path.exists(self._database):
      self._create()
53

54 55
    # now, create a session
    self.session = self._session_maker()
56
    logger.debug("Created new database session to '%s'" % self._database)
57
    return self.session
58

59

60
  def unlock(self):
61 62 63 64
    """Closes the session to the database."""
    if not hasattr(self, 'session'):
      raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
    logger.debug("Closed database session of '%s'" % self._database)
65
    self.session.close()
66
    del self.session
67

68

69
  def _create(self):
70 71
    """Creates a new and empty database."""
    from .tools import makedirs_safe
André Anjos's avatar
André Anjos committed
72

73
    # create directory for sql database
74
    makedirs_safe(os.path.dirname(self._database))
André Anjos's avatar
André Anjos committed
75

76
    # create all the tables
77
    Base.metadata.create_all(self._engine)
78
    logger.debug("Created new empty database '%s'" % self._database)
79 80


81

82
  def get_jobs(self, job_ids = None):
83
    """Returns a list of jobs that are stored in the database."""
84
    q = self.session.query(Job)
85 86
    if job_ids:
      q = q.filter(Job.id.in_(job_ids))
87
    return list(q)
André Anjos's avatar
André Anjos committed
88 89


90
  def _job_and_array(self, job_id, array_id = None):
91
    # get the job (and the array job) with the given id(s)
92
    job = self.get_jobs((job_id,))
93 94 95 96 97 98
    if len(job) > 1:
      logger.error("%d jobs with the same ID '%d' were detected in the database"%(len(job), job_id))
    elif not len(job):
      logger.warn("Job with ID '%d' was not found in the database."%job_id)
      return (None, None)

99
    job = job[0]
100
    unique_id = job.unique
André Anjos's avatar
André Anjos committed
101

102
    if array_id is not None:
103
      array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id))
104 105
      assert (len(array_job) == 1)
      return (job, array_job[0])
André Anjos's avatar
André Anjos committed
106
    else:
107
      return (job, None)
André Anjos's avatar
André Anjos committed
108 109


110 111 112 113
  def run_job(self, job_id, array_id = None):
    """This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
    # get the job from the database
    self.lock()
André Anjos's avatar
André Anjos committed
114

115 116 117 118 119 120
    jobs = self.get_jobs((job_id,))
    if not len(jobs):
      # it seems that the job has been deleted in the meanwhile
      return
    job = jobs[0]

121 122 123
    # get the machine name we are executing on; this might only work at idiap
    machine_name = socket.gethostname()

124
    # set the 'executing' status to the job
125
    job.execute(array_id, machine_name)
André Anjos's avatar
André Anjos committed
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140
    if job.status == 'failure':
      # there has been a dependent job that has failed before
      # stop this and all dependent jobs from execution
      dependent_jobs = job.get_jobs_waiting_for_us()
      dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.id])
      while len(dependent_jobs):
        dep = dependent_jobs[0]
        new = dep.get_jobs_waiting_for_us()
        dependent_jobs += new
        dependent_job_ids.update([dep.id for dep in new])

      self.unlock()
      try:
        self.stop_jobs(list(dependent_job_ids))
141
        logger.warn("Deleted dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
142 143 144 145
      except:
        pass
      return

146 147 148 149
    # get the command line of the job
    command_line = job.get_command_line()
    self.session.commit()
    self.unlock()
André Anjos's avatar
André Anjos committed
150

151
    # execute the command line of the job, and wait until it has finished
152 153
    try:
      result = subprocess.call(command_line)
154
    except Exception:
155 156 157 158
      result = 69 # ASCII: 'E'

    # set a new status and the results of the job
    self.lock()
159 160 161
    jobs = self.get_jobs((job_id,))
    if not len(jobs):
      # it seems that the job has been deleted in the meanwhile
162
      logger.error("The job with id '%d' could not be found in the database!" % job_id)
163 164 165 166
      return

    job = jobs[0]
    job.finish(result, array_id)
167 168 169 170 171

    self.session.commit()
    self.unlock()


172
  def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, status=Status, ids_only=False):
173
    """Lists the jobs currently added to the database."""
174
    # configuration for jobs
175 176
    if print_dependencies:
      fields = ("job-id", "queue", "status", "job-name", "dependencies", "submitted command line")
177
      lengths = (20, 14, 14, 20, 30, 43)
178
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:^%d}  {5:<%d}" % lengths
179 180 181
      dependency_length = lengths[4]
    else:
      fields = ("job-id", "queue", "status", "job-name", "submitted command line")
182
      lengths = (20, 14, 14, 20, 43)
183
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:<%d}" % lengths
184 185
      dependency_length = 0

186 187 188 189 190 191 192
    if ids_only:
      self.lock()
      for job in self.get_jobs():
        print(job.id, end=" ")
      self.unlock()
      return

193
    array_format = "{0:>%d}  {1:^%d}  {2:^%d}" % lengths[:3]
194 195 196 197 198
    delimiter = format.format(*['='*k for k in lengths])
    array_delimiter = array_format.format(*["-"*k for k in lengths[:3]])
    header = [fields[k].center(lengths[k]) for k in range(len(lengths))]

    # print header
199 200
    print('  '.join(header))
    print(delimiter)
201 202


203 204
    self.lock()
    for job in self.get_jobs(job_ids):
205 206 207 208 209 210 211
      if job.status in status:
        print(job.format(format, dependency_length, None if long else 43))
        if print_array_jobs and job.array:
          print(array_delimiter)
          for array_job in job.array:
            print(array_job.format(array_format))
          print(array_delimiter)
212 213 214 215 216

    self.unlock()


  def report(self, job_ids=None, array_ids=None, unfinished=False, output=True, error=True):
217 218 219 220
    """Iterates through the output and error files and write the results to command line."""
    def _write_contents(job):
      # Writes the contents of the output and error files to command line
      out_file, err_file = job.std_out_file(), job.std_err_file()
221
      if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
222
        logger.info("Contents of output file: '%s'" % out_file)
223 224
        print(open(out_file).read().rstrip())
        print("-"*20)
225
      if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
226
        logger.info("Contents of error file: '%s'" % err_file)
227 228
        print(open(err_file).read().rstrip())
        print("-"*40)
229 230 231

    def _write_array_jobs(array_jobs):
      for array_job in array_jobs:
232
        if unfinished or array_job.status in accepted_status:
233
          print("Array Job", str(array_job.id), ":")
234 235 236
          _write_contents(array_job)

    self.lock()
237

238
    accepted_status = ('failure',) if error and not output else ('success', 'failure')
239
    # check if an array job should be reported
240
    if array_ids:
241 242
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
243
      if array_jobs: print(array_jobs[0].job)
244
      _write_array_jobs(array_jobs)
André Anjos's avatar
André Anjos committed
245

246
    else:
247
      # iterate over all jobs
248
      jobs = self.get_jobs(job_ids)
249 250
      for job in jobs:
        if job.array:
251
          if unfinished or job.status in accepted_status or job.status == 'executing':
252
            print(job)
253 254
            _write_array_jobs(job.array)
        else:
255
          if unfinished or job.status in accepted_status:
256
            print(job)
257
            _write_contents(job)
258
        if job.log_dir is not None and job.status in accepted_status:
259
          print("-"*60)
260 261 262 263

    self.unlock()


264
  def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True):
265 266 267 268
    """Deletes the jobs with the given ids from the database."""
    def _delete_dir_if_empty(log_dir):
      if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
        os.rmdir(log_dir)
269
        logger.info("Removed empty log directory '%s'" % log_dir)
270 271 272 273 274

    def _delete(job, try_to_delete_dir=False):
      # delete the job from the database
      if delete_logs:
        out_file, err_file = job.std_out_file(), job.std_err_file()
275 276 277 278 279 280
        if out_file and os.path.exists(out_file):
          os.remove(out_file)
          logger.debug("Removed output log file '%s'" % out_file)
        if err_file and os.path.exists(err_file):
          os.remove(err_file)
          logger.debug("Removed error log file '%s'" % err_file)
281 282
        if try_to_delete_dir:
          _delete_dir_if_empty(job.log_dir)
283 284
      if delete_jobs:
        self.session.delete(job)
285 286 287


    self.lock()
288 289

    # check if array ids are specified
290
    if array_ids:
291 292
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
293 294 295
      if array_jobs:
        job = array_jobs[0].job
        for array_job in array_jobs:
296
          if array_job.status in status:
297 298
            if delete_jobs:
              logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
299
            _delete(array_job)
300
        if not job.array:
301
          if job.status in status:
302 303
            if delete_jobs:
              logger.info("Deleting job '%d' from the database." % job.id)
304
            _delete(job, True)
André Anjos's avatar
André Anjos committed
305

306 307
    else:
      # iterate over all jobs
308
      jobs = self.get_jobs(job_ids)
309 310 311 312
      for job in jobs:
        # delete all array jobs
        if job.array:
          for array_job in job.array:
313
            if array_job.status in status:
314 315
              if delete_jobs:
                logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
316
              _delete(array_job)
317
        # delete this job
318
        if job.status in status:
319 320
          if delete_jobs:
            logger.info("Deleting job '%d' from the database." % job.id)
321
          _delete(job, True)
André Anjos's avatar
André Anjos committed
322

323 324
    self.session.commit()
    self.unlock()