manager.py 13.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1

2
3
from __future__ import print_function

4
import os, sys
5
import subprocess
6
import socket # to get the host name
7
from .models import Base, Job, ArrayJob, Status, times
8
from .tools import logger
André Anjos's avatar
André Anjos committed
9

10

11
import sqlalchemy
Amir Mohammadi's avatar
Amir Mohammadi committed
12
from distutils.version import LooseVersion
13

14

15
class JobManager:
16
  """This job manager defines the basic interface for handling jobs in the SQL database."""
17

18
  def __init__(self, database = 'submitted.sql3', wrapper_script = None, debug = False):
19
    self._database = os.path.realpath(database)
20
    self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, connect_args={'timeout': 600}, echo=debug)
21
    self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
22
23

    # store the command that this job manager was called with
24
25
26
27
28
29
30
31
32
    if wrapper_script is None:
      # try to find the executable, search in the bin path first
      import distutils.spawn
      wrapper_script = os.path.realpath(distutils.spawn.find_executable('jman', '.' + os.pathsep + 'bin' + os.pathsep + os.environ['PATH']))

    if wrapper_script is None:
      raise IOError("Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager.")
    if not os.path.exists(wrapper_script):
      raise IOError("Your wrapper_script cannot be found. Jobs will not be executable.")
33
    self.wrapper_script = wrapper_script
34
35


36
  def __del__(self):
37
    # remove the database if it is empty
38
    if os.path.isfile(self._database):
39
40
41
      # in errornous cases, the session might still be active, so don't create a deadlock here!
      if not hasattr(self, 'session'):
        self.lock()
42
43
44
      job_count = len(self.get_jobs())
      self.unlock()
      if not job_count:
45
        logger.debug("Removed database file '%s' since database is empty" % self._database)
46
        os.remove(self._database)
47

48

49
  def lock(self):
50
51
52
    """Generates (and returns) a blocking session object to the database."""
    if hasattr(self, 'session'):
      raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
53

Amir Mohammadi's avatar
Amir Mohammadi committed
54
    if LooseVersion(sqlalchemy.__version__) < LooseVersion('0.7.8'):
55
      # for old sqlalchemy versions, in some cases it is required to re-generate the engine for each session
56
57
58
59
      self._engine = sqlalchemy.create_engine("sqlite:///"+self._database)
      self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)

    # create the database if it does not exist yet
60
61
    if not os.path.exists(self._database):
      self._create()
62

63
64
    # now, create a session
    self.session = self._session_maker()
65
    logger.debug("Created new database session to '%s'" % self._database)
66
    return self.session
67

68

69
  def unlock(self):
70
71
72
73
    """Closes the session to the database."""
    if not hasattr(self, 'session'):
      raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
    logger.debug("Closed database session of '%s'" % self._database)
74
    self.session.close()
75
    del self.session
76

77

78
  def _create(self):
79
80
    """Creates a new and empty database."""
    from .tools import makedirs_safe
André Anjos's avatar
André Anjos committed
81

82
    # create directory for sql database
83
    makedirs_safe(os.path.dirname(self._database))
André Anjos's avatar
André Anjos committed
84

85
    # create all the tables
86
    Base.metadata.create_all(self._engine)
87
    logger.debug("Created new empty database '%s'" % self._database)
88
89


90

91
  def get_jobs(self, job_ids = None):
92
    """Returns a list of jobs that are stored in the database."""
93
94
    if job_ids is not None and len(job_ids) == 0:
      return []
95
    q = self.session.query(Job)
96
    if job_ids is not None:
97
      q = q.filter(Job.unique.in_(job_ids))
98
    return sorted(list(q), key=lambda job: job.unique)
André Anjos's avatar
André Anjos committed
99
100


101
  def _job_and_array(self, job_id, array_id = None):
102
    # get the job (and the array job) with the given id(s)
103
    job = self.get_jobs((job_id,))
104
105
106
107
108
109
    if len(job) > 1:
      logger.error("%d jobs with the same ID '%d' were detected in the database"%(len(job), job_id))
    elif not len(job):
      logger.warn("Job with ID '%d' was not found in the database."%job_id)
      return (None, None)

110
    job = job[0]
111
    unique_id = job.unique
André Anjos's avatar
André Anjos committed
112

113
    if array_id is not None:
114
      array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id))
115
116
      assert (len(array_job) == 1)
      return (job, array_job[0])
André Anjos's avatar
André Anjos committed
117
    else:
118
      return (job, None)
André Anjos's avatar
André Anjos committed
119
120


121
122
  def run_job(self, job_id, array_id = None):
    """This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
    # set the job's status in the database
    try:
      # get the job from the database
      self.lock()
      jobs = self.get_jobs((job_id,))
      if not len(jobs):
        # it seems that the job has been deleted in the meanwhile
        return
      job = jobs[0]

      # get the machine name we are executing on; this might only work at idiap
      machine_name = socket.gethostname()

      # set the 'executing' status to the job
      job.execute(array_id, machine_name)

      self.session.commit()
140
141
    except Exception as e:
      logger.error("Caught exception '%s'", e)
142
143
      pass
    finally:
144
      self.unlock()
145
146
147
148

    # get the command line of the job from the database; does not need write access
    self.lock()
    job = self.get_jobs((job_id,))[0]
149
    command_line = job.get_command_line()
150
    exec_dir = job.get_exec_dir()
151
    self.unlock()
André Anjos's avatar
André Anjos committed
152

153
154
    logger.info("Starting job %d: %s", job_id, " ".join(command_line))

155
    # execute the command line of the job, and wait until it has finished
156
    try:
157
      result = subprocess.call(command_line, cwd=exec_dir)
158
      logger.info("Job %d finished with result %s", job_id, str(result))
159
    except Exception as e:
160
      logger.error("The job with id '%d' could not be executed: %s", job_id, e)
161
162
163
      result = 69 # ASCII: 'E'

    # set a new status and the results of the job
164
165
166
167
168
    try:
      self.lock()
      jobs = self.get_jobs((job_id,))
      if not len(jobs):
        # it seems that the job has been deleted in the meanwhile
169
        logger.error("The job with id '%d' could not be found in the database!", job_id)
170
171
172
173
174
175
176
        self.unlock()
        return

      job = jobs[0]
      job.finish(result, array_id)

      self.session.commit()
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192

      # This might not be working properly, so use with care!
      if job.stop_on_failure and job.status == 'failure':
        # the job has failed
        # stop this and all dependent jobs from execution
        dependent_jobs = job.get_jobs_waiting_for_us()
        dependent_job_ids = set([dep.unique for dep in dependent_jobs] + [job.unique])
        while len(dependent_jobs):
          dep = dependent_jobs.pop(0)
          new = dep.get_jobs_waiting_for_us()
          dependent_jobs += new
          dependent_job_ids.update([dep.unique for dep in new])

        self.unlock()
        deps = sorted(list(dependent_job_ids))
        self.stop_jobs(deps)
193
        logger.warn ("Stopped dependent jobs '%s' since this job failed.", str(deps))
194
195

    except Exception as e:
196
      logger.error("Caught exception '%s'", e)
197
198
      pass
    finally:
199
200
201
202
      if hasattr(self, 'session'):
        self.unlock()


203
  def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, print_times = False, status=Status, names=None, ids_only=False):
204
    """Lists the jobs currently added to the database."""
205
    # configuration for jobs
206
    if print_dependencies:
207
208
209
      fields = ("job-id", "grid-id", "queue", "status", "job-name", "dependencies", "submitted command line")
      lengths = (8, 20, 14, 14, 20, 30, 43)
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:^%d}  {5:^%d}  {6:<%d}" % lengths
210
211
      dependency_length = lengths[4]
    else:
212
213
214
      fields = ("job-id", "grid-id", "queue", "status", "job-name", "submitted command line")
      lengths = (8, 20, 14, 14, 20, 43)
      format = "{0:^%d}  {1:^%d}  {2:^%d}  {3:^%d}  {4:^%d}  {5:<%d}" % lengths
215
216
      dependency_length = 0

217
218
219
220
221
222
    # if ids_only:
    #   self.lock()
    #   for job in self.get_jobs():
    #     print(job.unique, end=" ")
    #   self.unlock()
    #   return
223

224
    array_format = "{0:^%d}  {1:>%d}  {2:^%d}  {3:^%d}" % lengths[:4]
225
    delimiter = format.format(*['='*k for k in lengths])
226
    array_delimiter = array_format.format(*["-"*k for k in lengths[:4]])
227
228
229
    header = [fields[k].center(lengths[k]) for k in range(len(lengths))]

    # print header
230
231
232
    if not ids_only:
      print('  '.join(header))
      print(delimiter)
233

234
235
    self.lock()
    for job in self.get_jobs(job_ids):
236
      job.refresh()
237
      if job.status in status and (names is None or job.name in names):
238
239
240
241
        if ids_only:
          print(job.unique, end=" ")
        else:
          print(job.format(format, dependency_length, None if long else 43))
242
243
244
        if print_times:
          print(times(job))

245
        if (not ids_only) and print_array_jobs and job.array:
246
247
          print(array_delimiter)
          for array_job in job.array:
248
249
            if array_job.status in status:
              print(array_job.format(array_format))
250
251
              if print_times:
                print(times(array_job))
252
          print(array_delimiter)
253
254
255
256

    self.unlock()


257
  def report(self, job_ids=None, array_ids=None, output=True, error=True, status=Status, name=None):
258
259
260
261
    """Iterates through the output and error files and write the results to command line."""
    def _write_contents(job):
      # Writes the contents of the output and error files to command line
      out_file, err_file = job.std_out_file(), job.std_err_file()
262
      logger.info("Contents of output file: '%s'" % out_file)
263
      if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
264
265
        print(open(out_file).read().rstrip())
        print("-"*20)
266
      if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
267
        logger.info("Contents of error file: '%s'" % err_file)
268
269
        print(open(err_file).read().rstrip())
        print("-"*40)
270
271
272

    def _write_array_jobs(array_jobs):
      for array_job in array_jobs:
273
274
        print("Array Job", str(array_job.id), ("(%s) :"%array_job.machine_name if array_job.machine_name is not None else ":"))
        _write_contents(array_job)
275
276

    self.lock()
277
278

    # check if an array job should be reported
279
    if array_ids:
280
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
281
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
282
      if array_jobs: print(array_jobs[0].job)
283
      _write_array_jobs(array_jobs)
André Anjos's avatar
André Anjos committed
284

285
    else:
286
      # iterate over all jobs
287
      jobs = self.get_jobs(job_ids)
288
      for job in jobs:
289
290
291
292
        if name is not None and job.name != name:
          continue
        if job.status not in status:
          continue
293
        if job.array:
294
295
          print(job)
          _write_array_jobs(job.array)
296
        else:
297
298
299
          print(job)
          _write_contents(job)
        if job.log_dir is not None:
300
          print("-"*60)
301
302
303
304

    self.unlock()


305
  def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True):
306
307
308
309
    """Deletes the jobs with the given ids from the database."""
    def _delete_dir_if_empty(log_dir):
      if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
        os.rmdir(log_dir)
310
        logger.info("Removed empty log directory '%s'" % log_dir)
311
312
313
314
315

    def _delete(job, try_to_delete_dir=False):
      # delete the job from the database
      if delete_logs:
        out_file, err_file = job.std_out_file(), job.std_err_file()
316
317
318
319
320
321
        if out_file and os.path.exists(out_file):
          os.remove(out_file)
          logger.debug("Removed output log file '%s'" % out_file)
        if err_file and os.path.exists(err_file):
          os.remove(err_file)
          logger.debug("Removed error log file '%s'" % err_file)
322
323
        if try_to_delete_dir:
          _delete_dir_if_empty(job.log_dir)
324
325
      if delete_jobs:
        self.session.delete(job)
326
327
328


    self.lock()
329
330

    # check if array ids are specified
331
    if array_ids:
332
      if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
333
      array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
334
335
336
      if array_jobs:
        job = array_jobs[0].job
        for array_job in array_jobs:
337
          if array_job.status in status:
338
            if delete_jobs:
339
              logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
340
            _delete(array_job)
341
        if not job.array:
342
          if job.status in status:
343
            if delete_jobs:
344
              logger.info("Deleting job '%d' from the database." % job.unique)
345
            _delete(job, delete_jobs)
André Anjos's avatar
André Anjos committed
346

347
348
    else:
      # iterate over all jobs
349
      jobs = self.get_jobs(job_ids)
350
351
352
353
      for job in jobs:
        # delete all array jobs
        if job.array:
          for array_job in job.array:
354
            if array_job.status in status:
355
              if delete_jobs:
356
                logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
357
              _delete(array_job)
358
        # delete this job
359
        if job.status in status:
360
          if delete_jobs:
361
            logger.info("Deleting job '%d' from the database." % job.unique)
362
          _delete(job, delete_jobs)
André Anjos's avatar
André Anjos committed
363

364
365
    self.session.commit()
    self.unlock()