models.py 6.06 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
import sqlalchemy
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from bob.db.sqlalchemy_migration import Enum, relationship
from sqlalchemy.orm import backref
from sqlalchemy.ext.declarative import declarative_base

import os

from cPickle import dumps, loads

Base = declarative_base()

Status = ('waiting', 'executing', 'finished')

class ArrayJob(Base):
  """This class defines one element of an array job."""
  __tablename__ = 'ArrayJob'

  unique = Column(Integer, primary_key = True)
  id = Column(Integer)
  job_id = Column(Integer, ForeignKey('Job.id'))
  status = Column(Enum(*Status))
  result = Column(Integer)

  job = relationship("Job", backref='array', order_by=id)

  def __init__(self, id, job_id):
    self.id = id
    self.job_id = job_id
    self.status = Status[0]
    self.result = None

  def std_out_file(self):
    return self.job.std_out_file() + "." + str(self.id) if self.job.log_dir else None

  def std_err_file(self):
    return self.job.std_err_file() + "." + str(self.id) if self.job.log_dir else None


class Job(Base):
  """This class defines one Job that was submitted to the Job Manager."""
  __tablename__ = 'Job'

  id = Column(Integer, primary_key = True) # The ID of the job (not corresponding to the grid ID)
  command_line = Column(String(255))       # The command line to execute, converted to one string
  name = Column(String(20))                # A hand-chosen name for the task
  arguments = Column(String(255))          # The kwargs arguments for the job submission (e.g. in the grid)
  grid_id = Column(Integer, unique = True) # The ID of the job as given from the grid
  log_dir = Column(String(255))            # The directory where the log files will be put to

  status = Column(Enum(*Status))
  result = Column(Integer)

  def __init__(self, command_line, name = None, log_dir = None, **kwargs):
    """Constructor taking the job id from the grid."""
    self.command_line = dumps(command_line)
    self.name = name
    self.status = Status[0]
    self.result = None
    self.log_dir = log_dir
    self.arguments = dumps(kwargs)

  def get_command_line(self):
    return loads(str(self.command_line))

  def set_arguments(self, **kwargs):
    previous = self.get_arguments()
    previous.update(kwargs)
    self.arguments = dumps(previous)

  def get_arguments(self):
    return loads(str(self.arguments))

  def std_out_file(self, array_id = None):
    return os.path.join(self.log_dir, "o" + str(self.grid_id)) if self.log_dir else None

  def std_err_file(self, array_id = None):
    return os.path.join(self.log_dir, "e" + str(self.grid_id)) if self.log_dir else None


  def __str__(self):
    id = "%d" % self.grid_id
    if self.array: j = "%s (%d-%d)" % (self.id, self.array[0].id, self.array[-1].id)
    else: j = "%s" % id
    if self.name is not None: n = "<Job: %s - '%s'>" % (j, self.name)
    else: n = "<Job: %s>" % j
    if self.result is not None: r = "%s (%d)" % (self.status, self.result)
    else: r = "%s" % self.status
    return "%s : %s -- %s" % (n, r, " ".join(self.get_command_line()))

  def execute(self, manager, index = None):
    """Executes the code for this job on the local machine."""
    import copy
    environ = copy.deepcopy(os.environ)

    manager.lock()
    job = manager.get_jobs(self.id)
    if 'JOB_ID' in environ:
      # we execute a job in the grid
      wait_for_job = True
    else:
      # we execute a job locally
      environ['JOB_ID'] = str(self.id)
    if index:
      environ['SGE_TASK_ID'] = str(index.id)
    self.status = "executing"

    # return the subprocess pipe to the process
    try:
      import subprocess
      return subprocess.Popen(self.get_command_line(), env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except OSError:
      self.status = "finished"
      raise



class JobDependence(Base):
  """This table defines a many-to-many relationship between Jobs."""
  __tablename__ = 'JobDependence'
  id = Column(Integer, primary_key=True)
  dependent_job_id = Column('dependent_job_id', Integer, ForeignKey('Job.id'))
  dependent_job = relationship('Job', backref = 'dependent_jobs', primaryjoin=(Job.id == dependent_job_id), order_by=id) # A list of Jobs that this one depends on
  depending_job_id = Column('depending_job_id', Integer, ForeignKey('Job.id'))
  depending_job = relationship('Job', backref = 'depending_jobs', primaryjoin=(Job.id == depending_job_id), order_by=id) # A list of Jobs that this one depends on

  def __init__(self, depending_job, dependent_job):
    self.dependent_job = dependent_job
    self.depending_job = depending_job


def add_grid_job(session, data, command_line, kwargs):
  """Helper function to create a job from the results of the grid execution via qsub."""
  # create job
  job = Job(data=data, command_line=command_line, kwargs=kwargs)

  session.add(job)
  session.flush()
  session.refresh(job)

  # add dependent jobs
  if 'deps' in kwargs:
    dependencies = session.query(Job).filter(id.in_(kwargs['deps']))
    assert(len(list(dependencies)) == len(kwargs['deps']))
    for d in dependecies:
      session.add(JobDependence(job, d))

  # create array job if desired
  if 'job-array tasks' in data:
    import re
    b = re.compile(r'^(?P<m>\d+)-(?P<n>\d+):(?P<s>\d+)$').match(data['job-array tasks']).groupdict()
    (start, stop, step) =  (int(b['m']), int(b['n']), int(b['s']))
    # add array jobs
    for i in range(start, stop+1, step):
      session.add(ArrayJob(i, job.id))

  session.commit()
  return job


def add_job(session, command_line, name=None, dependencies=[], array=None, log_dir=None, **kwargs):
  """Helper function to create a job that will run on the local machine."""
  job = Job(command_line=command_line, name=name, log_dir=log_dir, kwargs=kwargs)

  session.add(job)
  session.flush()
  session.refresh(job)

  # by default grid_id and id are identical, but the grid_id might be overwritten later on
  job.grid_id = job.id

  for d in dependencies:
    session.add(JobDependence(job, d))

  if array:
    (start, stop, step) = array
    # add array jobs
    for i in range(start, stop+1, step):
      session.add(ArrayJob(i, job.id))

  session.commit()

  return job