Commit d599f2ef authored by Manuel Günther's avatar Manuel Günther
Browse files

Added functionality to re-submit jobs; some bug fixes; added first unit test...

Added functionality to re-submit jobs; some bug fixes; added first unit test (so far it tests local execution only).
parent ff2d46ea
......@@ -9,3 +9,8 @@ MANIFEST
develop-eggs
eggs
bin
sphinx
.project
.pydevproject
.settings
......@@ -21,7 +21,7 @@ from .models import add_job, Job
class JobManagerLocal(JobManager):
"""Manages jobs run in parallel on the local machine."""
def __init__(self, database='submitted.sql3', sleep_time = 0.1):
def __init__(self, database='submitted.sql3', sleep_time = 0.1, wrapper_script = './bin/jman'):
"""Initializes this object with a state file and a method for qsub'bing.
Keyword parameters:
......@@ -31,7 +31,7 @@ class JobManagerLocal(JobManager):
does not exist it is initialized. If it exists, it is loaded.
"""
JobManager.__init__(self, database)
JobManager.__init__(self, database, wrapper_script)
self._sleep_time = sleep_time
......@@ -47,6 +47,31 @@ class JobManagerLocal(JobManager):
return job_id
def resubmit(self, job_ids = None, failed_only = False, running_jobs = False):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status == 'finished':
if not failed_only or job.result != 0:
job.status = 'waiting'
job.result = None
if job.array:
for array_job in job.array:
if running_jobs or array_job.status == 'finished':
if not failed_only or array_job.result != 0:
array_job.status = 'waiting'
array_job.result = None
self.session.commit()
self.unlock()
#####################################################################
###### Methods to run the jobs in parallel on the local machine #####
def _run_parallel_job(self, job_id, array_id = None):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
......@@ -56,10 +81,8 @@ class JobManagerLocal(JobManager):
else:
environ['SGE_TASK_ID'] = 'undefined'
# get the name of the file that was called originally
jman = os.path.realpath(sys.argv[0])
# generate call to the wrapper script
command = [jman, '-l', 'run-job', self.database]
command = [self.wrapper_script, '-l', 'run-job', self._database]
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
......@@ -97,8 +120,8 @@ class JobManagerLocal(JobManager):
"""Runs the jobs stored in this job manager on the local machine."""
self.lock()
query = self.session.query(Job).filter(Job.status != 'finished')
if job_ids:
query.filter(Job.id.in_(job_ids))
if job_ids is not None:
query = query.filter(Job.id.in_(job_ids))
jobs = list(query)
......@@ -114,7 +137,7 @@ class JobManagerLocal(JobManager):
# collect the dependencies for the jobs
dependencies = {}
for job in jobs:
dependencies[job.id] = [dependent.id for dependent in job.dependent_jobs]
dependencies[job.id] = [waited.id for waited in job.get_jobs_we_wait_for()]
self.unlock()
......@@ -155,7 +178,7 @@ class JobManagerLocal(JobManager):
# start new jobs
for job_id in unfinished_jobs:
# check if there are unsatisfied dependencies for this job
unsatisfied_dependencies = [dep for dep in dependencies[job_id] if dep in unfinished_jobs]
unsatisfied_dependencies = [dep for dep in dependencies[job_id]]
if len(unsatisfied_dependencies) == 0:
# all dependencies are met
......@@ -191,6 +214,12 @@ class JobManagerLocal(JobManager):
# remove the job that could not be started
unfinished_jobs.remove(job_id)
if not len(running_jobs) and len(unfinished_jobs) != 0:
# This is a weird case, which leads to a dead lock.
# It seems that the is a dependence that cannot be fulfilled
# This might happen, when a single job should be executed, but it depends on another job...
raise RuntimeError("Dead lock detected. There are dependencies in the database that cannot be fulfilled. Did you try to run a job that has unfulfilled dependencies?")
# sleep for some time (default: 0.1 seconds)
time.sleep(self._sleep_time)
import bob
import os
import subprocess
from .models import Base, Job, ArrayJob
from .tools import logger
import sqlalchemy
echo = False
"""This file defines a minimum Job Manager interface."""
class JobManager:
def __init__(self, sql_database):
self.database = os.path.realpath(sql_database)
if not os.path.exists(self.database):
self.create()
def __init__(self, sql_database, wrapper_script = './bin/jman'):
self._database = os.path.realpath(sql_database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=echo)
if not os.path.exists(self._database):
self._create()
# store the command that this job manager was called with
self.wrapper_script = wrapper_script
# get the next free job id (simply as the largest ID in the database + 1)
# self.lock()
# self.next_job_id = max([job.id for job in self.session.query(Job)] + [0]) + 1
# self.unlock()
def __del__(self):
# remove the database if it is empty
self.lock()
job_count = len(self.get_jobs())
self.unlock()
if not job_count:
os.remove(self.database)
# remove the database if it is empty$
if os.path.isfile(self._database):
self.lock()
job_count = len(self.get_jobs())
self.unlock()
if not job_count:
os.remove(self._database)
def lock(self):
self.session = bob.db.utils.SQLiteConnector(self.database).session(echo=echo)
Session = sqlalchemy.orm.sessionmaker()
self.session = Session(bind=self._engine)
return self.session
def unlock(self):
self.session.close()
del self.session
def create(self):
def _create(self):
"""Creates a new and empty database."""
from .tools import makedirs_safe
# create directory for sql database
makedirs_safe(os.path.dirname(self.database))
makedirs_safe(os.path.dirname(self._database))
# create an engine
engine = bob.db.utils.create_engine_try_nolock('sqlite', self.database, echo=echo)
# create all the tables
Base.metadata.create_all(engine)
def list(self):
"""Lists the jobs currently added to the database."""
self.lock()
for job in self.get_jobs():
print job
self.unlock()
Base.metadata.create_all(self._engine)
def get_jobs(self, grid_ids = None):
def get_jobs(self, job_ids = None):
q = self.session.query(Job)
if grid_ids:
q = q.filter(Job.grid_id.in_(grid_ids))
if job_ids:
q = q.filter(Job.id.in_(job_ids))
return list(q)
def _job_and_array(self, grid_id, array_id=None):
def _job_and_array(self, id, array_id=None):
# get the job (and the array job) with the given id(s)
job = self.get_jobs((grid_id,))
job = self.get_jobs((id,))
assert (len(job) == 1)
job = job[0]
job_id = job.id
job_id = job.unique
if array_id is not None:
array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == job_id).filter(ArrayJob.id == array_id))
......@@ -99,7 +92,7 @@ class JobManager:
# execute the command line of the job, and wait untils it has finished
try:
result = subprocess.call(command_line)
except Error:
except Exception:
result = 69 # ASCII: 'E'
# set a new status and the results of the job
......@@ -123,15 +116,36 @@ class JobManager:
self.unlock()
def report(self, grid_ids=None, array_ids=None, unfinished=False, output=True, error=True):
def list(self, job_ids, print_array_jobs = False, print_dependencies = False):
"""Lists the jobs currently added to the database."""
self.lock()
for job in self.get_jobs(job_ids):
print job
if print_dependencies:
waiting_jobs = [j.id for j in job.get_jobs_waiting_for_us()]
waited_for_jobs = [j.id for j in job.get_jobs_we_wait_for()]
if len(waiting_jobs):
print "These jobs wait for <Job %d>:" % job.id, waiting_jobs
if len(waited_for_jobs):
print "These jobs need to run before <Job %d>:" % job.id, waited_for_jobs
if print_array_jobs and job.array:
for array_job in job.array:
print array_job
self.unlock()
def report(self, job_ids=None, array_ids=None, unfinished=False, output=True, error=True):
"""Iterates through the output and error files and write the results to command line."""
def _write_contents(job):
# Writes the contents of the output and error files to command line
out_file, err_file = job.std_out_file(), job.std_err_file()
if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size:
if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
print "Output file:", out_file
print open(out_file).read().rstrip()
print "-"*20
if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size:
if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
print "Error file:", err_file
print open(err_file).read().rstrip()
print "-"*40
......@@ -144,14 +158,14 @@ class JobManager:
# check if an array job should be reported
self.lock()
if array_ids:
if len(grid_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.grid_id.in_(grid_ids)).filter(Job.id == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs: print array_jobs[0].job
_write_array_jobs(array_jobs)
else:
# iterate over all jobs
jobs = self.get_jobs(grid_ids)
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.array:
if (unfinished or job.status in ('finished', 'executing')):
......@@ -166,7 +180,7 @@ class JobManager:
self.unlock()
def delete(self, grid_ids, array_ids = None, delete_logs = True, delete_log_dir = False):
def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False):
"""Deletes the jobs with the given ids from the database."""
def _delete_dir_if_empty(log_dir):
if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
......@@ -185,8 +199,8 @@ class JobManager:
self.lock()
if array_ids:
if len(grid_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.grid_id.in_(grid_ids)).filter(Job.id == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
......@@ -196,7 +210,7 @@ class JobManager:
else:
# iterate over all jobs
jobs = self.get_jobs(grid_ids)
jobs = self.get_jobs(job_ids)
for job in jobs:
# delete all array jobs
if job.array:
......@@ -208,5 +222,3 @@ class JobManager:
self.session.commit()
self.unlock()
......@@ -7,6 +7,7 @@ from sqlalchemy.ext.declarative import declarative_base
import os
from cPickle import dumps, loads
from .tools import logger
Base = declarative_base()
......@@ -18,7 +19,7 @@ class ArrayJob(Base):
unique = Column(Integer, primary_key = True)
id = Column(Integer)
job_id = Column(Integer, ForeignKey('Job.id'))
job_id = Column(Integer, ForeignKey('Job.unique'))
status = Column(Enum(*Status))
result = Column(Integer)
......@@ -36,33 +37,44 @@ class ArrayJob(Base):
def std_err_file(self):
return self.job.std_err_file() + "." + str(self.id) if self.job.log_dir else None
def __str__(self):
n = "<ArrayJob %d> of <Job %d>" % (self.id, self.job.id)
if self.result is not None: r = "%s (%d)" % (self.status, self.result)
else: r = "%s" % self.status
return "%s : %s" % (n, r)
class Job(Base):
"""This class defines one Job that was submitted to the Job Manager."""
__tablename__ = 'Job'
id = Column(Integer, primary_key = True) # The ID of the job (not corresponding to the grid ID)
command_line = Column(String(255)) # The command line to execute, converted to one string
name = Column(String(20)) # A hand-chosen name for the task
arguments = Column(String(255)) # The kwargs arguments for the job submission (e.g. in the grid)
grid_id = Column(Integer, unique = True) # The ID of the job as given from the grid
log_dir = Column(String(255)) # The directory where the log files will be put to
unique = Column(Integer, primary_key = True) # The unique ID of the job (not corresponding to the grid ID)
command_line = Column(String(255)) # The command line to execute, converted to one string
name = Column(String(20)) # A hand-chosen name for the task
arguments = Column(String(255)) # The kwargs arguments for the job submission (e.g. in the grid)
id = Column(Integer, unique = True) # The ID of the job as given from the grid
log_dir = Column(String(255)) # The directory where the log files will be put to
array_string = Column(String(255)) # The array string (only needed for re-submission)
status = Column(Enum(*Status))
result = Column(Integer)
def __init__(self, command_line, name = None, log_dir = None, **kwargs):
"""Constructor taking the job id from the grid."""
def __init__(self, command_line, name = None, log_dir = None, array_string = None, **kwargs):
"""Constructs a Job object without an ID (needs to be set later)."""
self.command_line = dumps(command_line)
self.name = name
self.status = Status[0]
self.result = None
self.log_dir = log_dir
self.array_string = dumps(array_string)
self.arguments = dumps(kwargs)
def get_command_line(self):
return loads(str(self.command_line))
def get_array(self):
return loads(str(self.array_string))
def set_arguments(self, **kwargs):
previous = self.get_arguments()
previous.update(kwargs)
......@@ -71,16 +83,23 @@ class Job(Base):
def get_arguments(self):
return loads(str(self.arguments))
def get_jobs_we_wait_for(self):
return [j.waited_for_job for j in self.jobs_we_have_to_wait_for if j.waited_for_job is not None]
def get_jobs_waiting_for_us(self):
return [j.waiting_job for j in self.jobs_that_wait_for_us if j.waiting_job is not None]
def std_out_file(self, array_id = None):
return os.path.join(self.log_dir, "o" + str(self.grid_id)) if self.log_dir else None
return os.path.join(self.log_dir, (self.name if self.name else 'job') + ".o" + str(self.id)) if self.log_dir else None
def std_err_file(self, array_id = None):
return os.path.join(self.log_dir, "e" + str(self.grid_id)) if self.log_dir else None
return os.path.join(self.log_dir, (self.name if self.name else 'job') + ".e" + str(self.id)) if self.log_dir else None
def __str__(self):
id = "%d" % self.grid_id
if self.array: j = "%s (%d-%d)" % (self.id, self.array[0].id, self.array[-1].id)
id = "%d" % self.id
if self.array: j = "%s (%d-%d)" % (id, self.array[0].id, self.array[-1].id)
else: j = "%s" % id
if self.name is not None: n = "<Job: %s - '%s'>" % (j, self.name)
else: n = "<Job: %s>" % j
......@@ -88,95 +107,50 @@ class Job(Base):
else: r = "%s" % self.status
return "%s : %s -- %s" % (n, r, " ".join(self.get_command_line()))
def execute(self, manager, index = None):
"""Executes the code for this job on the local machine."""
import copy
environ = copy.deepcopy(os.environ)
manager.lock()
job = manager.get_jobs(self.id)
if 'JOB_ID' in environ:
# we execute a job in the grid
wait_for_job = True
else:
# we execute a job locally
environ['JOB_ID'] = str(self.id)
if index:
environ['SGE_TASK_ID'] = str(index.id)
self.status = "executing"
# return the subprocess pipe to the process
try:
import subprocess
return subprocess.Popen(self.get_command_line(), env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
self.status = "finished"
raise
class JobDependence(Base):
"""This table defines a many-to-many relationship between Jobs."""
__tablename__ = 'JobDependence'
id = Column(Integer, primary_key=True)
dependent_job_id = Column('dependent_job_id', Integer, ForeignKey('Job.id'))
dependent_job = relationship('Job', backref = 'dependent_jobs', primaryjoin=(Job.id == dependent_job_id), order_by=id) # A list of Jobs that this one depends on
depending_job_id = Column('depending_job_id', Integer, ForeignKey('Job.id'))
depending_job = relationship('Job', backref = 'depending_jobs', primaryjoin=(Job.id == depending_job_id), order_by=id) # A list of Jobs that this one depends on
waiting_job_id = Column(Integer, ForeignKey('Job.unique')) # The ID of the waiting job
waited_for_job_id = Column(Integer, ForeignKey('Job.unique')) # The ID of the job to wait for
def __init__(self, depending_job, dependent_job):
self.dependent_job = dependent_job
self.depending_job = depending_job
# This is twisted: The 'jobs_we_have_to_wait_for' field in the Job class needs to be joined with the waiting job id, so that jobs_we_have_to_wait_for.waiting_job is correct
# Honestly, I am lost but it seems to work...
waiting_job = relationship('Job', backref = 'jobs_we_have_to_wait_for', primaryjoin=(Job.unique == waiting_job_id), order_by=id) # The job that is waited for
waited_for_job = relationship('Job', backref = 'jobs_that_wait_for_us', primaryjoin=(Job.unique == waited_for_job_id), order_by=id) # The job that waits
def __init__(self, waiting_job_id, waited_for_job_id):
self.waiting_job_id = waiting_job_id
self.waited_for_job_id = waited_for_job_id
def add_grid_job(session, data, command_line, kwargs):
"""Helper function to create a job from the results of the grid execution via qsub."""
# create job
job = Job(data=data, command_line=command_line, kwargs=kwargs)
session.add(job)
session.flush()
session.refresh(job)
# add dependent jobs
if 'deps' in kwargs:
dependencies = session.query(Job).filter(id.in_(kwargs['deps']))
assert(len(list(dependencies)) == len(kwargs['deps']))
for d in dependecies:
session.add(JobDependence(job, d))
# create array job if desired
if 'job-array tasks' in data:
import re
b = re.compile(r'^(?P<m>\d+)-(?P<n>\d+):(?P<s>\d+)$').match(data['job-array tasks']).groupdict()
(start, stop, step) = (int(b['m']), int(b['n']), int(b['s']))
# add array jobs
for i in range(start, stop+1, step):
session.add(ArrayJob(i, job.id))
session.commit()
return job
def add_job(session, command_line, name=None, dependencies=[], array=None, log_dir=None, **kwargs):
"""Helper function to create a job that will run on the local machine."""
job = Job(command_line=command_line, name=name, log_dir=log_dir, kwargs=kwargs)
def add_job(session, command_line, name = 'job', dependencies = [], array = None, log_dir = None, **kwargs):
"""Helper function to create a job, add the dependencies and the array jobs."""
job = Job(command_line=command_line, name=name, log_dir=log_dir, array_string=array, kwargs=kwargs)
session.add(job)
session.flush()
session.refresh(job)
# by default grid_id and id are identical, but the grid_id might be overwritten later on
job.grid_id = job.id
# by default id and unique id are identical, but the id might be overwritten later on
job.id = job.unique
for d in dependencies:
session.add(JobDependence(job, d))
depending = list(session.query(Job).filter(Job.id == d))
if len(depending):
session.add(JobDependence(job.unique, depending[0].unique))
else:
logger.warn("Could not find dependent job with id %d in database" % d)
if array:
(start, stop, step) = array
# add array jobs
for i in range(start, stop+1, step):
session.add(ArrayJob(i, job.id))
session.add(ArrayJob(i, job.unique))
session.commit()
......
<
......@@ -26,7 +26,7 @@ from ..tools import make_shell, random_logdir, logger
def setup(args):
"""Returns the JobManager and sets up the basic infrastructure"""
kwargs = {}
kwargs = {'wrapper_script' : args.wrapper_script}
if args.db: kwargs['database'] = args.db
if args.local:
jm = local.JobManagerLocal(**kwargs)
......@@ -34,10 +34,13 @@ def setup(args):
jm = sge.JobManagerSGE(**kwargs)
# set-up logging
import logging
if args.debug:
import logging
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
return jm
......@@ -103,65 +106,10 @@ def submit(args):
job_id = jm.submit(args.job, **kwargs)
def explain(args):
"""Explain action"""
jm = setup(args)
if args.jobid:
jobs = [[int(n) for n in k.split('.', 1)] for k in args.jobid]
for v in jobs:
if len(v) == 1: v.append(None)
else:
jobs = [(k, None) for k in jm.keys()]
first_time = True
for k in jobs:
if not first_time: print 79*'-'
first_time = False
J = jm[k[0]]
print "Job", J
print "Command line:", J.command_line()
if args.verbose:
print "%s stdout (%s)" % (J.name(k[1]), J.stdout_filename(k[1]))
print J.stdout(k[1])
if args.verbose:
print "%s stderr (%s)" % (J.name(k[1]), J.stderr_filename(k[1]))
print J.stderr(k[1])
def resubmit(args):
"""Re-submits the jobs with the given ids."""
jm = setup(args)
fromjm = JobManager(args.fromdb)
jobs = fromjm.keys()
if args.jobid: jobs = args.jobid
for k in jobs:
O = fromjm[k]
args.stdout, args.stderr = get_logdirs(args.stdout, args.stderr, args.logbase)
J = jm.resubmit(O, args.stdout, args.stderr, args.deps, args.failed_only)
if args.verbose:
if isinstance(J, (tuple, list)):
for k in J: print 'Re-submitted job', J
else:
print 'Re-submitted job', J
else:
if isinstance(J, (tuple, list)):
print 'Re-submitted %d jobs' % len(J)
else:
print 'Re-submitted job', J.name()