models.py 15.6 KB
Newer Older
1
import sqlalchemy
2
from sqlalchemy import Table, Column, Integer, DateTime, String, Boolean, ForeignKey
3
from sqlalchemy.orm import backref
4
from sqlalchemy.ext.declarative import declarative_base
5
from .tools import Enum, relationship
6 7

import os
8
import sys
9
from datetime import datetime
10 11 12 13 14

if sys.version_info[0] >= 3:
  from pickle import dumps, loads
else:
  from cPickle import dumps, loads
15

16
from .tools import logger
17 18 19

Base = declarative_base()

20
Status = ('submitted', 'queued', 'waiting', 'executing', 'success', 'failure')
21 22 23 24 25 26 27

class ArrayJob(Base):
  """This class defines one element of an array job."""
  __tablename__ = 'ArrayJob'

  unique = Column(Integer, primary_key = True)
  id = Column(Integer)
28
  job_id = Column(Integer, ForeignKey('Job.unique'))
29 30
  status = Column(Enum(*Status))
  result = Column(Integer)
31
  machine_name = Column(String(10))
32

33 34 35 36
  submit_time = Column(DateTime)
  start_time = Column(DateTime)
  finish_time = Column(DateTime)

37 38 39 40 41 42 43
  job = relationship("Job", backref='array', order_by=id)

  def __init__(self, id, job_id):
    self.id = id
    self.job_id = job_id
    self.status = Status[0]
    self.result = None
44
    self.machine_name = None # will be set later, by the Job class
45

46 47 48 49 50
    self.submit_time = datetime.now()
    self.start_time = None
    self.finish_time = None


51 52 53 54 55 56
  def std_out_file(self):
    return self.job.std_out_file() + "." + str(self.id) if self.job.log_dir else None

  def std_err_file(self):
    return self.job.std_err_file() + "." + str(self.id) if self.job.log_dir else None

57 58 59 60 61 62
  def __str__(self):
    n = "<ArrayJob %d> of <Job %d>" % (self.id, self.job.id)
    if self.result is not None: r = "%s (%d)" % (self.status, self.result)
    else: r = "%s" % self.status
    return "%s : %s" % (n, r)

63 64 65 66
  def format(self, format):
    """Formats the current job into a nicer string to fit into a table."""

    job_id = "%d - %d" % (self.job.id, self.id)
67
    queue = self.job.queue_name if self.machine_name is None else self.machine_name
68 69
    status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )

70
    return format.format("", job_id, queue, status)
71

72 73 74 75 76

class Job(Base):
  """This class defines one Job that was submitted to the Job Manager."""
  __tablename__ = 'Job'

77 78 79
  unique = Column(Integer, primary_key = True) # The unique ID of the job (not corresponding to the grid ID)
  command_line = Column(String(255))           # The command line to execute, converted to one string
  name = Column(String(20))                    # A hand-chosen name for the task
80
  queue_name = Column(String(20))              # The name of the queue
81
  machine_name = Column(String(10))            # The name of the machine in which the job is run
82
  grid_arguments = Column(String(255))         # The kwargs arguments for the job submission (e.g. in the grid)
83
  id = Column(Integer)                         # The ID of the job as given from the grid
84
  exec_dir = Column(String(255))               # The directory in which the command should be executed
85 86
  log_dir = Column(String(255))                # The directory where the log files will be put to
  array_string = Column(String(255))           # The array string (only needed for re-submission)
87
  stop_on_failure = Column(Boolean)            # An indicator whether to stop depending jobs when this job finishes with an error
88

89 90 91 92 93
  submit_time = Column(DateTime)
  start_time = Column(DateTime)
  finish_time = Column(DateTime)


94 95 96
  status = Column(Enum(*Status))
  result = Column(Integer)

97
  def __init__(self, command_line, name = None, exec_dir = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
98
    """Constructs a Job object without an ID (needs to be set later)."""
99 100
    self.command_line = dumps(command_line)
    self.name = name
101
    self.queue_name = queue_name   # will be set during the queue command later
102
    self.machine_name = machine_name   # will be set during the execute command later
103
    self.grid_arguments = dumps(kwargs)
104
    self.exec_dir = exec_dir
105
    self.log_dir = log_dir
106
    self.stop_on_failure = stop_on_failure
107
    self.array_string = dumps(array_string)
108 109 110
    self.submit()


111
  def submit(self, new_queue = None):
112 113 114
    """Sets the status of this job to 'submitted'."""
    self.status = 'submitted'
    self.result = None
115
    self.machine_name = None
116 117
    if new_queue is not None:
      self.queue_name = new_queue
118 119 120
    for array_job in self.array:
      array_job.status = 'submitted'
      array_job.result = None
121
      array_job.machine_name = None
122 123 124
    self.submit_time = datetime.now()
    self.start_time = None
    self.finish_time = None
125

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

  def queue(self, new_job_id = None, new_job_name = None, queue_name = None):
    """Sets the status of this job to 'queued' or 'waiting'."""
    # update the job id (i.e., when the job is executed in the grid)
    if new_job_id is not None:
      self.id = new_job_id

    if new_job_name is not None:
      self.name = new_job_name

    if queue_name is not None:
      self.queue_name = queue_name

    new_status = 'queued'
    self.result = None
    # check if we have to wait for another job to finish
    for job in self.get_jobs_we_wait_for():
143
      if job.status not in ('success', 'failure'):
144
        new_status = 'waiting'
145 146
      elif self.stop_on_failure and job.status == 'failure':
        new_status = 'failure'
147 148 149

    # reset the queued jobs that depend on us to waiting status
    for job in self.get_jobs_waiting_for_us():
150 151
      if job.status == 'queued':
        job.status = 'failure' if new_status == 'failure' else 'waiting'
152 153 154

    self.status = new_status
    for array_job in self.array:
155 156
      if array_job.status not in ('success', 'failure'):
        array_job.status = new_status
157 158


159
  def execute(self, array_id = None, machine_name = None):
160 161 162 163 164 165
    """Sets the status of this job to 'executing'."""
    self.status = 'executing'
    if array_id is not None:
      for array_job in self.array:
        if array_job.id == array_id:
          array_job.status = 'executing'
166 167
          if machine_name is not None:
            array_job.machine_name = machine_name
168
            array_job.start_time = datetime.now()
169 170
    elif machine_name is not None:
      self.machine_name = machine_name
171 172
    if self.start_time is None:
      self.start_time = datetime.now()
173

174 175 176 177 178 179 180
    # sometimes, the 'finish' command did not work for array jobs,
    # so check if any old job still has the 'executing' flag set
    for job in self.get_jobs_we_wait_for():
      if job.array and job.status == 'executing':
        job.finish(0, -1)


181 182 183 184 185 186 187 188 189 190 191
  def finish(self, result, array_id = None):
    """Sets the status of this job to 'success' or 'failure'."""
    # check if there is any array job still running
    new_status = 'success' if result == 0 else 'failure'
    new_result = result
    finished = True
    if array_id is not None:
      for array_job in self.array:
        if array_job.id == array_id:
          array_job.status = new_status
          array_job.result = result
192
          array_job.finish_time = datetime.now()
193
        if array_job.status not in ('success', 'failure'):
194 195 196 197 198 199 200 201
          finished = False
        elif new_result == 0:
          new_result = array_job.result

    if finished:
      # There was no array job, or all array jobs finished
      self.status = 'success' if new_result == 0 else 'failure'
      self.result = new_result
202
      self.finish_time = datetime.now()
203 204 205 206 207 208

      # update all waiting jobs
      for job in self.get_jobs_waiting_for_us():
        if job.status == 'waiting':
          job.queue()

209

210 211 212 213 214 215 216 217 218 219 220 221 222 223
  def refresh(self):
    """Refreshes the status information."""
    if self.status == 'executing' and self.array:
      new_result = 0
      for array_job in self.array:
        if array_job.status == 'failure' and new_result is not None:
          new_result = array_job.result
        elif array_job.status not in ('success', 'failure'):
          new_result = None
      if new_result is not None:
        self.status = 'success' if new_result == 0 else 'failure'
        self.result = new_result


224
  def get_command_line(self):
225 226 227
    """Returns the command line for the job."""
    # In python 2, the command line is unicode, which needs to be converted to string before pickling;
    # In python 3, the command line is bytes, which can be pickled directly
228
    return loads(self.command_line) if isinstance(self.command_line, bytes) else loads(self.command_line.encode())
229

230 231 232 233
  def set_command_line(self, command_line):
    """Sets / overwrites the command line for the job."""
    self.command_line = dumps(command_line)

234 235 236 237 238 239 240
  def get_exec_dir(self):
    """Returns the command line for the job."""
    # In python 2, the command line is unicode, which needs to be converted to string before pickling;
    # In python 3, the command line is bytes, which can be pickled directly
    return str(os.path.realpath(self.exec_dir)) if self.exec_dir is not None else None


241

242
  def get_array(self):
243 244 245
    """Returns the array arguments for the job; usually a string."""
    # In python 2, the command line is unicode, which needs to be converted to string before pickling;
    # In python 3, the command line is bytes, which can be pickled directly
246
    return loads(self.array_string) if isinstance(self.array_string, bytes) else loads(self.array_string.encode())
247

248

249
  def get_arguments(self):
250 251 252
    """Returns the additional options for the grid (such as the queue, memory requirements, ...)."""
    # In python 2, the command line is unicode, which needs to be converted to string before pickling;
    # In python 3, the command line is bytes, which can be pickled directly
253
    args = loads(self.grid_arguments)['kwargs'] if isinstance(self.grid_arguments, bytes) else loads(self.grid_arguments.encode())['kwargs']
254
    # in any case, the commands have to be converted to str
255 256 257 258 259 260 261
    retval = {}
    if 'pe_opt' in args:
      retval['pe_opt'] = args['pe_opt']
    if 'memfree' in args and args['memfree'] is not None:
      retval['memfree'] = args['memfree']
    if 'hvmem' in args and args['hvmem'] is not None:
      retval['hvmem'] = args['hvmem']
262 263
    if 'gpumem' in args and args['gpumem'] is not None:
      retval['gpumem'] = args['gpumem']
264 265 266 267 268
    if 'env' in args and len(args['env']) > 0:
      retval['env'] = args['env']
    if 'io_big' in args and args['io_big']:
      retval['io_big'] = True

269 270 271 272
    # also add the queue
    if self.queue_name is not None:
      retval['queue'] = str(self.queue_name)

273 274
    return retval

275 276
  def set_arguments(self, **kwargs):
    self.grid_arguments = dumps(kwargs)
277

278 279 280 281 282 283 284
  def get_jobs_we_wait_for(self):
    return [j.waited_for_job for j in self.jobs_we_have_to_wait_for if j.waited_for_job is not None]

  def get_jobs_waiting_for_us(self):
    return [j.waiting_job for j in self.jobs_that_wait_for_us if j.waiting_job is not None]


285
  def std_out_file(self, array_id = None):
286
    return os.path.join(self.log_dir, (self.name if self.name else 'job') + ".o" + str(self.id)) if self.log_dir else None
287 288

  def std_err_file(self, array_id = None):
289
    return os.path.join(self.log_dir, (self.name if self.name else 'job') + ".e" + str(self.id)) if self.log_dir else None
290 291


292 293 294 295 296 297 298 299 300 301
  def _cmdline(self):
    cmdline = self.get_command_line()
    c = ""
    for cmd in cmdline:
      if cmd[0] == '-':
        c += "%s " % cmd
      else:
        c += "'%s' " % cmd
    return c

302
  def __str__(self):
303
    id = "%d (%d)" % (self.unique, self.id)
304 305
    if self.machine_name: m = "%s - %s" % (self.queue_name, self.machine_name)
    else: m = self.queue_name
306 307 308 309
    if self.array: a = "[%d-%d:%d]" % self.get_array()
    else: a = ""
    if self.name is not None: n = "<Job: %s %s - '%s'>" % (id, a, self.name)
    else: n = "<Job: %s>" % id
310 311
    if self.result is not None: r = "%s (%d)" % (self.status, self.result)
    else: r = "%s" % self.status
312
    return "%s | %s : %s -- %s" % (n, m, r, self._cmdline())
313

314
  def format(self, format, dependencies = 0, limit_command_line = None):
315
    """Formats the current job into a nicer string to fit into a table."""
316
    command_line = self._cmdline()
317
    if limit_command_line is not None and len(command_line) > limit_command_line:
318 319 320 321
      command_line = command_line[:limit_command_line-3] + '...'

    job_id = "%d" % self.id + (" [%d-%d:%d]" % self.get_array() if self.array else "")
    status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
322
    queue = self.queue_name if self.machine_name is None else self.machine_name
323 324 325 326
    if limit_command_line is None:
      grid_opt = self.get_arguments()
      if grid_opt:
        # add additional information about the job at the end
André Anjos's avatar
André Anjos committed
327
        command_line = "<" + ",".join(["%s=%s" % (key,value) for key,value in grid_opt.items()]) + ">: " + command_line
328 329
      if self.exec_dir is not None:
        command_line += "; [Executed in directory: '%s']" % self.exec_dir
330

331
    if dependencies:
332
      deps = str(sorted(list(set([dep.unique for dep in self.get_jobs_we_wait_for()]))))
333 334
      if dependencies < len(deps):
        deps = deps[:dependencies-3] + '...'
335
      return format.format(self.unique, job_id, queue[:12], status, self.name, deps, command_line)
336
    else:
337
      return format.format(self.unique, job_id, queue[:12], status, self.name, command_line)
338

339 340 341 342 343 344


class JobDependence(Base):
  """This table defines a many-to-many relationship between Jobs."""
  __tablename__ = 'JobDependence'
  id = Column(Integer, primary_key=True)
345 346
  waiting_job_id = Column(Integer, ForeignKey('Job.unique')) # The ID of the waiting job
  waited_for_job_id = Column(Integer, ForeignKey('Job.unique')) # The ID of the job to wait for
347

348 349 350 351
  # This is twisted: The 'jobs_we_have_to_wait_for' field in the Job class needs to be joined with the waiting job id, so that jobs_we_have_to_wait_for.waiting_job is correct
  # Honestly, I am lost but it seems to work...
  waiting_job = relationship('Job', backref = 'jobs_we_have_to_wait_for', primaryjoin=(Job.unique == waiting_job_id), order_by=id) # The job that is waited for
  waited_for_job = relationship('Job', backref = 'jobs_that_wait_for_us', primaryjoin=(Job.unique == waited_for_job_id), order_by=id) # The job that waits
352

353 354 355
  def __init__(self, waiting_job_id, waited_for_job_id):
    self.waiting_job_id = waiting_job_id
    self.waited_for_job_id = waited_for_job_id
356 357 358



359
def add_job(session, command_line, name = 'job', dependencies = [], array = None, exec_dir=None, log_dir = None, stop_on_failure = False, **kwargs):
360
  """Helper function to create a job, add the dependencies and the array jobs."""
361
  job = Job(command_line=command_line, name=name, exec_dir=exec_dir, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
362 363 364 365 366

  session.add(job)
  session.flush()
  session.refresh(job)

367 368
  # by default id and unique id are identical, but the id might be overwritten later on
  job.id = job.unique
369 370

  for d in dependencies:
371 372 373
    if d == job.unique:
      logger.warn("Adding self-dependency of job %d is not allowed" % d)
      continue
374
    depending = list(session.query(Job).filter(Job.unique == d))
375 376 377 378 379
    if len(depending):
      session.add(JobDependence(job.unique, depending[0].unique))
    else:
      logger.warn("Could not find dependent job with id %d in database" % d)

380 381 382 383
  if array:
    (start, stop, step) = array
    # add array jobs
    for i in range(start, stop+1, step):
384
      session.add(ArrayJob(i, job.unique))
385 386 387

  session.commit()

388
  return job
389 390 391 392 393 394 395 396 397

def times(job):
  """Returns a string containing timing information for teh given job, which might be a :py:class:`Job` or an :py:class:`ArrayJob`."""
  timing = "Submitted: %s" % job.submit_time.ctime()
  if job.start_time is not None:
    timing += "\nStarted  : %s \t Job waited  : %s" % (job.start_time.ctime(), job.start_time - job.submit_time)
  if job.finish_time is not None:
    timing += "\nFinished : %s \t Job executed: %s" % (job.finish_time.ctime(), job.finish_time - job.start_time)
  return timing