manager.py 11.7 KB
Newer Older
André Anjos's avatar
André Anjos committed
1

2 3
from __future__ import print_function

André Anjos's avatar
André Anjos committed
4
import os
5
import subprocess
6
import socket # to get the host name
7
from .models import Base, Job, ArrayJob, Status
8
from .tools import logger
André Anjos's avatar
André Anjos committed
9

10
import sqlalchemy
11

12
"""This file defines a minimum Job Manager interface."""
13
sqlalchemy_version = [int(v) for v in sqlalchemy.__version__.split('.')]
14

15
class JobManager:
16
  """This job manager defines the basic interface for handling jobs in the SQL database."""
17

18 19 20
  def __init__(self, database, wrapper_script = './bin/jman', debug = False):
    self._database = os.path.realpath(database)
    self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=debug)
21
    self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
22 23 24

    # store the command that this job manager was called with
    self.wrapper_script = wrapper_script
25 26


27
  def __del__(self):
28
    # remove the database if it is empty
29
    if os.path.isfile(self._database):
30 31 32
      # in errornous cases, the session might still be active, so don't create a deadlock here!
      if not hasattr(self, 'session'):
        self.lock()
33 34 35
      job_count = len(self.get_jobs())
      self.unlock()
      if not job_count:
36
        logger.debug("Removed database file '%s' since database is empty" % self._database)
37
        os.remove(self._database)
38

39

40
  def lock(self):
41 42 43
    """Generates (and returns) a blocking session object to the database."""
    if hasattr(self, 'session'):
      raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
44

45
    if sqlalchemy_version < [0,7,8]:
46
      # for old sqlalchemy versions, in some cases it is required to re-generate the engine for each session
47 48 49 50
      self._engine = sqlalchemy.create_engine("sqlite:///"+self._database)
      self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)

    # create the database if it does not exist yet
51 52
    if not os.path.exists(self._database):
      self._create()
53

54 55
    # now, create a session
    self.session = self._session_maker()
56
    logger.debug("Created new database session to '%s'" % self._database)
57
    return self.session
58

59

60
  def unlock(self):
61 62 63 64
    """Closes the session to the database."""
    if not hasattr(self, 'session'):
      raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
    logger.debug("Closed database session of '%s'" % self._database)
65
    self.session.close()
66
    del self.session
67

68

69
  def _create(self):
70 71
    """Creates a new and empty database."""
    from .tools import makedirs_safe
André Anjos's avatar
André Anjos committed
72

73
    # create directory for sql database
74
    makedirs_safe(os.path.dirname(self._database))
André Anjos's avatar
André Anjos committed
75

76
    # create all the tables
77
    Base.metadata.create_all(self._engine)
78
    logger.debug("Created new empty database '%s'" % self._database)
79 80


81

82
  def get_jobs(self, job_ids = None):
83
    """Returns a list of jobs that are stored in the database."""
84
    q = self.session.query(Job)
85 86
    if job_ids:
      q = q.filter(Job.id.in_(job_ids))
87
    return sorted(list(q), key=lambda job: job.unique)
André Anjos's avatar
André Anjos committed
88 89


90
  def _job_and_array(self, job_id, array_id = None):
91
    # get the job (and the array job) with the given id(s)
92
    job = self.get_jobs((job_id,))
93 94 95 96 97 98
    if len(job) > 1:
      logger.error("%d jobs with the same ID '%d' were detected in the database"%(len(job), job_id))
    elif not len(job):
      logger.warn("Job with ID '%d' was not found in the database."%job_id)
      return (None, None)

99
    job = job[0]
100
    unique_id = job.unique
André Anjos's avatar
André Anjos committed
101

102
    if array_id is not None:
103
      array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id))
104 105
      assert (len(array_job) == 1)
      return (job, array_job[0])
André Anjos's avatar
André Anjos committed
106
    else:
107
      return (job, None)
André Anjos's avatar
André Anjos committed
108 109


110 111 112 113
  def run_job(self, job_id, array_id = None):
    """This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
    # get the job from the database
    self.lock()
André Anjos's avatar
André Anjos committed
114

115 116 117 118 119 120
    jobs = self.get_jobs((job_id,))
    if not len(jobs):
      # it seems that the job has been deleted in the meanwhile
      return
    job = jobs[0]

121 122 123
    # get the machine name we are executing on; this might only work at idiap
    machine_name = socket.gethostname()

124
    # set the 'executing' status to the job
125
    job.execute(array_id, machine_name)
André Anjos's avatar
André Anjos committed
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140
    if job.status == 'failure':
      # there has been a dependent job that has failed before
      # stop this and all dependent jobs from execution
      dependent_jobs = job.get_jobs_waiting_for_us()
      dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.id])
      while len(dependent_jobs):
        dep = dependent_jobs[0]
        new = dep.get_jobs_waiting_for_us()
        dependent_jobs += new
        dependent_job_ids.update([dep.id for dep in new])

      self.unlock()
      try:
        self.stop_jobs(list(dependent_job_ids))
141
        logger.warn("Deleted dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
142 143 144 145
      except:
        pass
      return

146 147 148 149
    # get the command line of the job
    command_line = job.get_command_line()
    self.session.commit()
    self.unlock()
André Anjos's avatar
André Anjos committed
150

151
    # execute the command line of the job, and wait until it has finished
152 153
    try:
      result = subprocess.call(command_line)
154
    except Exception:
155 156 157 158
      result = 69 # ASCII: 'E'

    # set a new status and the results of the job
    self.lock()
159 160 161
    jobs = self.get_jobs((job_id,))
    if not len(jobs):
      # it seems that the job has been deleted in the meanwhile
162
      logger.error("The job with id '%d' could not be found in the database!" % job_id)
163
      self.unlock()
164 165 166 167
      return

    job = jobs[0]
    job.finish(result, array_id)
168 169 170 171 172

    self.session.commit()
    self.unlock()


173
  def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, status=Status, names=None, ids_only=False):
174
    """Lists the jobs currently added to the database."""
175
    # configuration for jobs
176 177
    if print_dependencies:
      fields = ("job-id", "queue", "status", "job-name", "dependencies", "submitted command line")
178
      lengths = (20, 14, 14, 20, 30, 43)
179
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:^%d}  {5:<%d}" % lengths
180 181 182
      dependency_length = lengths[4]
    else:
      fields = ("job-id", "queue", "status", "job-name", "submitted command line")
183
      lengths = (20, 14, 14, 20, 43)
184
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:<%d}" % lengths
185 186
      dependency_length = 0

187 188 189 190 191 192 193
    if ids_only:
      self.lock()
      for job in self.get_jobs():
        print(job.id, end=" ")
      self.unlock()
      return

194
    array_format = "{0:>%d}  {1:^%d}  {2:^%d}" % lengths[:3]
195 196 197 198 199
    delimiter = format.format(*['='*k for k in lengths])
    array_delimiter = array_format.format(*["-"*k for k in lengths[:3]])
    header = [fields[k].center(lengths[k]) for k in range(len(lengths))]

    # print header
200 201
    print('  '.join(header))
    print(delimiter)
202 203


204 205
    self.lock()
    for job in self.get_jobs(job_ids):
206
      job.refresh()
207
      if job.status in status and names is None or job.name in names:
208 209 210 211
        print(job.format(format, dependency_length, None if long else 43))
        if print_array_jobs and job.array:
          print(array_delimiter)
          for array_job in job.array:
212 213
            if array_job.status in status:
              print(array_job.format(array_format))
214
          print(array_delimiter)
215 216 217 218 219

    self.unlock()


  def report(self, job_ids=None, array_ids=None, unfinished=False, output=True, error=True):
220 221 222 223
    """Iterates through the output and error files and write the results to command line."""
    def _write_contents(job):
      # Writes the contents of the output and error files to command line
      out_file, err_file = job.std_out_file(), job.std_err_file()
224
      if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
225
        logger.info("Contents of output file: '%s'" % out_file)
226 227
        print(open(out_file).read().rstrip())
        print("-"*20)
228
      if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
229
        logger.info("Contents of error file: '%s'" % err_file)
230 231
        print(open(err_file).read().rstrip())
        print("-"*40)
232 233 234

    def _write_array_jobs(array_jobs):
      for array_job in array_jobs:
235
        if unfinished or array_job.status in accepted_status:
236
          print("Array Job", str(array_job.id), ":")
237 238 239
          _write_contents(array_job)

    self.lock()
240

241
    accepted_status = ('failure',) if error and not output else ('success', 'failure')
242
    # check if an array job should be reported
243
    if array_ids:
244 245
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
246
      if array_jobs: print(array_jobs[0].job)
247
      _write_array_jobs(array_jobs)
André Anjos's avatar
André Anjos committed
248

249
    else:
250
      # iterate over all jobs
251
      jobs = self.get_jobs(job_ids)
252 253
      for job in jobs:
        if job.array:
254
          if unfinished or job.status in accepted_status or job.status == 'executing':
255
            print(job)
256 257
            _write_array_jobs(job.array)
        else:
258
          if unfinished or job.status in accepted_status:
259
            print(job)
260
            _write_contents(job)
261
        if job.log_dir is not None and job.status in accepted_status:
262
          print("-"*60)
263 264 265 266

    self.unlock()


267
  def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True):
268 269 270 271
    """Deletes the jobs with the given ids from the database."""
    def _delete_dir_if_empty(log_dir):
      if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
        os.rmdir(log_dir)
272
        logger.info("Removed empty log directory '%s'" % log_dir)
273 274 275 276 277

    def _delete(job, try_to_delete_dir=False):
      # delete the job from the database
      if delete_logs:
        out_file, err_file = job.std_out_file(), job.std_err_file()
278 279 280 281 282 283
        if out_file and os.path.exists(out_file):
          os.remove(out_file)
          logger.debug("Removed output log file '%s'" % out_file)
        if err_file and os.path.exists(err_file):
          os.remove(err_file)
          logger.debug("Removed error log file '%s'" % err_file)
284 285
        if try_to_delete_dir:
          _delete_dir_if_empty(job.log_dir)
286 287
      if delete_jobs:
        self.session.delete(job)
288 289 290


    self.lock()
291 292

    # check if array ids are specified
293
    if array_ids:
294 295
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
296 297 298
      if array_jobs:
        job = array_jobs[0].job
        for array_job in array_jobs:
299
          if array_job.status in status:
300 301
            if delete_jobs:
              logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
302
            _delete(array_job)
303
        if not job.array:
304
          if job.status in status:
305 306
            if delete_jobs:
              logger.info("Deleting job '%d' from the database." % job.id)
307
            _delete(job, True)
André Anjos's avatar
André Anjos committed
308

309 310
    else:
      # iterate over all jobs
311
      jobs = self.get_jobs(job_ids)
312 313 314 315
      for job in jobs:
        # delete all array jobs
        if job.array:
          for array_job in job.array:
316
            if array_job.status in status:
317 318
              if delete_jobs:
                logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
319
              _delete(array_job)
320
        # delete this job
321
        if job.status in status:
322 323
          if delete_jobs:
            logger.info("Deleting job '%d' from the database." % job.id)
324
          _delete(job, True)
André Anjos's avatar
André Anjos committed
325

326 327
    self.session.commit()
    self.unlock()