Commit 5bbf0cd9 authored by Manuel Günther's avatar Manuel Günther
Browse files

Implemented Scheduler independent from job submission; added logging output; Improved job listing.

parent d751b3fe
......@@ -13,7 +13,7 @@ import copy, os, sys
import gdbm, anydbm
from cPickle import dumps, loads
from tools import makedirs_safe, logger, try_get_contents, try_remove_files
from .tools import makedirs_safe, logger, try_get_contents, try_remove_files
from .manager import JobManager
......@@ -21,7 +21,7 @@ from .models import add_job, Job
class JobManagerLocal(JobManager):
"""Manages jobs run in parallel on the local machine."""
def __init__(self, database='submitted.sql3', sleep_time = 0.1, wrapper_script = './bin/jman'):
def __init__(self, **kwargs):
"""Initializes this object with a state file and a method for qsub'bing.
Keyword parameters:
......@@ -31,8 +31,7 @@ class JobManagerLocal(JobManager):
does not exist it is initialized. If it exists, it is loaded.
"""
JobManager.__init__(self, database, wrapper_script)
self._sleep_time = sleep_time
JobManager.__init__(self, **kwargs)
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, **kwargs):
......@@ -41,6 +40,7 @@ class JobManagerLocal(JobManager):
# add job to database
self.lock()
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir)
logger.debug("Added job '%s' to the database" % job)
# return the new job id
job_id = job.id
self.unlock()
......@@ -52,18 +52,43 @@ class JobManagerLocal(JobManager):
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
accepted_old_status = ('failure',) if failed_only else ('success', 'failure')
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status == 'finished':
if not failed_only or job.result != 0:
job.status = 'waiting'
job.result = None
if job.array:
for array_job in job.array:
if running_jobs or array_job.status == 'finished':
if not failed_only or array_job.result != 0:
array_job.status = 'waiting'
array_job.result = None
if running_jobs or job.status in accepted_old_status:
# re-submit job to the grid
logger.debug("Re-submitted job '%s' to the database" % job)
job.submit()
self.session.commit()
self.unlock()
def stop_jobs(self, job_ids):
"""Stops the jobs in the grid."""
self.lock()
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status == 'executing':
logger.debug("Reset job '%s' in the database" % job)
job.status = 'submitted'
self.session.commit()
self.unlock()
def stop_job(self, job_id, array_id = None):
"""Stops the jobs in the grid."""
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
if job.status == 'executing':
logger.debug("Reset job '%s' in the database" % job)
job.status = 'submitted'
if array_job is not None and array_job.status == 'executing':
logger.debug("Reset array job '%s' in the database" % array_job)
array_job.status = 'submitted'
self.session.commit()
self.unlock()
......@@ -82,16 +107,19 @@ class JobManagerLocal(JobManager):
environ['SGE_TASK_ID'] = 'undefined'
# generate call to the wrapper script
command = [self.wrapper_script, '-l', 'run-job', self._database]
command = [self.wrapper_script, '-ld', self._database, 'run-job']
logger.info("Started execution of Job '%s'" % self._format_log(job_id, array_id))
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
logger.error("Could not execute job '%s' locally, reason:\n\t%s" % ("(%d:%d)"%(job_id, array_id) if array_id else str(job_id)), e)
logger.error("Could not execute job '%s' locally, reason:\n\t%s" % self._format_log(job_id, array_id), e)
return None
def _report(self, process, job_id, array_id = None):
def _result_files(self, process, job_id, array_id = None):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
def write(file, process, std):
f = std if file is None else open(str(file), 'w')
......@@ -106,120 +134,94 @@ class JobManagerLocal(JobManager):
out, err = job.std_out_file(), job.std_err_file()
log_dir = job.log_dir
job_id = job.id
array_id = array_job.id if array_job else None
self.unlock()
if log_dir: makedirs_safe(log_dir)
if log_dir:
makedirs_safe(log_dir)
# write stdout
write(out, process.stdout, sys.stdout)
# write stderr
write(err, process.stderr, sys.stderr)
if log_dir:
j = self._format_log(job_id, array_id)
logger.debug("Wrote output of job '%s' to file '%s'" % (j,out))
logger.debug("Wrote errors of job '%s' to file '%s'" % (j,err))
def run(self, parallel_jobs = 1, job_ids = None):
"""Runs the jobs stored in this job manager on the local machine."""
self.lock()
query = self.session.query(Job).filter(Job.status != 'finished')
if job_ids is not None:
query = query.filter(Job.id.in_(job_ids))
jobs = list(query)
# collect the jobs to execute
unfinished_jobs = [job.id for job in jobs]
def _format_log(self, job_id, array_id = None):
return ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
# collect the array jobs
unfinished_array_jobs = {}
for job in jobs:
if job.array:
unfinished_array_jobs[job.id] = [array.id for array in job.array if array.status != 'finished']
# collect the dependencies for the jobs
dependencies = {}
for job in jobs:
dependencies[job.id] = [waited.id for waited in job.get_jobs_we_wait_for()]
self.unlock()
def run_scheduler(self, parallel_jobs = 1, sleep_time = 0.1):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
running_tasks = []
try:
# start the jobs
finished_array_jobs = {}
running_jobs = []
running_array_jobs = {}
while len(unfinished_jobs) > 0 or len(running_jobs) > 0:
# FIRST: check if some of the jobs finished
for task in running_jobs:
# check if the job is still running
process = task[0]
if process.poll() is not None:
# process ended
job_id = task[1]
if len(task) > 2:
# we have an array job
array_id = task[2]
while True:
# FIRST, try if there are finished processes; this does not need a lock
for task_index in range(len(running_tasks)-1, -1, -1):
task = running_tasks[task_index]
process = task[0]
if process.poll() is not None:
# process ended
job_id = task[1]
array_id = task[2] if len(task) > 2 else None
# report the result
self._report(process, job_id, array_id)
# remove from running and unfinished jobs
running_array_jobs[job_id].remove(array_id)
unfinished_array_jobs[job_id].remove(array_id)
if len(unfinished_array_jobs[job_id]) == 0:
del unfinished_array_jobs[job_id]
unfinished_jobs.remove(job_id)
else:
# non-array job
self._report(process, job_id)
unfinished_jobs.remove(job_id)
# in any case, remove the job from the list
running_jobs.remove(task)
# SECOND: run as many parallel jobs as desired
if len(running_jobs) < parallel_jobs:
# start new jobs
for job_id in unfinished_jobs:
# check if there are unsatisfied dependencies for this job
unsatisfied_dependencies = [dep for dep in dependencies[job_id]]
if len(unsatisfied_dependencies) == 0:
# all dependencies are met
if job_id in unfinished_array_jobs:
# execute one of the array jobs
for array_id in unfinished_array_jobs[job_id]:
# check if the current array_id still need to run
if job_id not in running_array_jobs or array_id not in running_array_jobs[job_id]:
# execute parallel job
process = self._run_parallel_job(job_id, array_id)
if process is not None:
# remember that we currently run this job
running_jobs.append((process, job_id, array_id))
if job_id in running_array_jobs:
running_array_jobs[job_id].add(array_id)
else:
running_array_jobs[job_id] = set([array_id])
else:
# remove the job from the list since it could not run
unfinished_array_jobs[job_id].remove(array_id)
# check if more jobs can be executed
if len(running_jobs) == parallel_jobs:
break
self._result_files(process, job_id, array_id)
logger.info("Job '%s' finished execution" % self._format_log(job_id, array_id))
# in any case, remove the job from the list
del running_tasks[task_index]
# SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE
if len(running_tasks) < parallel_jobs:
# get all unfinished jobs:
self.lock()
jobs = self.get_jobs()
# put all new jobs into the queue
for job in jobs:
if job.status == 'submitted':
job.queue()
# get all unfinished jobs
unfinished_jobs = [job for job in jobs if job.status in ('queued', 'executing')]
for job in unfinished_jobs:
if job.array:
# find array jobs that can run
for array_job in job.array:
if array_job.status == 'queued':
# start a new job from the array
process = self._run_parallel_job(job.id, array_job.id)
running_tasks.append((process, job.id, array_job.id))
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
array_job.status = 'executing'
job.status = 'executing'
if len(running_tasks) == parallel_jobs:
break
else:
# execute job
if job_id not in running_jobs:
process = self._run_parallel_job(job_id)
if process is not None:
# remember that we currently run this job
running_jobs.append((process, job_id))
else:
# remove the job that could not be started
unfinished_jobs.remove(job_id)
if not len(running_jobs) and len(unfinished_jobs) != 0:
# This is a weird case, which leads to a dead lock.
# It seems that the is a dependence that cannot be fulfilled
# This might happen, when a single job should be executed, but it depends on another job...
raise RuntimeError("Dead lock detected. There are dependencies in the database that cannot be fulfilled. Did you try to run a job that has unfulfilled dependencies?")
# sleep for some time (default: 0.1 seconds)
time.sleep(self._sleep_time)
if job.status == 'queued':
# start a new job
process = self._run_parallel_job(job.id)
running_tasks.append((process, job.id))
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
job.status = 'executing'
if len(running_tasks) == parallel_jobs:
break
self.session.commit()
self.unlock()
# THIRD: sleep the desired amount of time before re-checking
time.sleep(sleep_time)
# This is the only way to stop: you have to interrupt the scheduler
except KeyboardInterrupt:
logger.info("Stopping task scheduler due to user interrupt.")
for task in running_tasks:
logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
task[0].kill()
self.stop_job(task[1], task[2] if len(task) > 2 else None)
......@@ -5,39 +5,49 @@ from .models import Base, Job, ArrayJob
from .tools import logger
import sqlalchemy
echo = False
"""This file defines a minimum Job Manager interface."""
class JobManager:
def __init__(self, sql_database, wrapper_script = './bin/jman'):
self._database = os.path.realpath(sql_database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=echo)
if not os.path.exists(self._database):
self._create()
def __init__(self, database, wrapper_script = './bin/jman', debug = False):
self._database = os.path.realpath(database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=debug)
# store the command that this job manager was called with
self.wrapper_script = wrapper_script
def __del__(self):
# remove the database if it is empty$
# remove the database if it is empty
if os.path.isfile(self._database):
self.lock()
# in errornous cases, the session might still be active, so don't create a deadlock here!
if not hasattr(self, 'session'):
self.lock()
job_count = len(self.get_jobs())
self.unlock()
if not job_count:
logger.debug("Removed database file '%s' since database is empty" % self._database)
os.remove(self._database)
def lock(self):
"""Generates (and returns) a blocking session object to the database."""
if hasattr(self, 'session'):
raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
Session = sqlalchemy.orm.sessionmaker()
if not os.path.exists(self._database):
self._create()
self.session = Session(bind=self._engine)
logger.debug("Created new database session to '%s'" % self._database)
return self.session
def unlock(self):
"""Closes the session to the database."""
if not hasattr(self, 'session'):
raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
self.session.close()
logger.debug("Closed database session of '%s'" % self._database)
del self.session
......@@ -50,24 +60,26 @@ class JobManager:
# create all the tables
Base.metadata.create_all(self._engine)
logger.debug("Created new empty database '%s'" % self._database)
def get_jobs(self, job_ids = None):
"""Returns a list of jobs that are stored in the database."""
q = self.session.query(Job)
if job_ids:
q = q.filter(Job.id.in_(job_ids))
return list(q)
def _job_and_array(self, id, array_id=None):
def _job_and_array(self, job_id, array_id = None):
# get the job (and the array job) with the given id(s)
job = self.get_jobs((id,))
job = self.get_jobs((job_id,))
assert (len(job) == 1)
job = job[0]
job_id = job.unique
unique_id = job.unique
if array_id is not None:
array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == job_id).filter(ArrayJob.id == array_id))
array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id))
assert (len(array_job) == 1)
return (job, array_job[0])
else:
......@@ -78,18 +90,22 @@ class JobManager:
"""This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
# get the job from the database
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
job.status = 'executing'
if array_job is not None:
array_job.status = 'executing'
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
return
job = jobs[0]
# set the 'executing' status to the job
job.execute(array_id)
# get the command line of the job
command_line = job.get_command_line()
self.session.commit()
self.unlock()
# execute the command line of the job, and wait untils it has finished
# execute the command line of the job, and wait until it has finished
try:
result = subprocess.call(command_line)
except Exception:
......@@ -97,40 +113,42 @@ class JobManager:
# set a new status and the results of the job
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
if array_job is not None:
array_job.status = 'finished'
array_job.result = result
self.session.commit()
# check if there are still unfinished array jobs
if False not in [aj.status == 'finished' for aj in job.array]:
job.status = 'finished'
# check if there was any array job not finished with result 0
results = [aj.result for aj in job.array if aj.result != 0]
job.result = results[0] if len(results) else 0
else:
job.status = 'finished'
job.result = result
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
return
job = jobs[0]
job.finish(result, array_id)
self.session.commit()
self.unlock()
def list(self, job_ids, print_array_jobs = False, print_dependencies = False):
def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False):
"""Lists the jobs currently added to the database."""
# configuration for jobs
fields = ("job-id", "queue", "status", "job-name", "arguments")
lengths = (20, 9, 14, 20, 43)
format = "{:^%d} {:^%d} {:^%d} {:^%d} {:^%d}" % lengths
array_format = "{:>%d} {:^%d} {:^%d}" % lengths[:3]
delimiter = format.format(*['='*k for k in lengths])
array_delimiter = array_format.format(*["-"*k for k in lengths[:3]])
header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
# print header
print ' '.join(header)
print delimiter
self.lock()
for job in self.get_jobs(job_ids):
print job
if print_dependencies:
waiting_jobs = [j.id for j in job.get_jobs_waiting_for_us()]
waited_for_jobs = [j.id for j in job.get_jobs_we_wait_for()]
if len(waiting_jobs):
print "These jobs wait for <Job %d>:" % job.id, waiting_jobs
if len(waited_for_jobs):
print "These jobs need to run before <Job %d>:" % job.id, waited_for_jobs
print job.format(format, print_dependencies, None if long else 43)
if print_array_jobs and job.array:
print array_delimiter
for array_job in job.array:
print array_job
print array_job.format(array_format)
print array_delimiter
self.unlock()
......@@ -141,22 +159,23 @@ class JobManager:
# Writes the contents of the output and error files to command line
out_file, err_file = job.std_out_file(), job.std_err_file()
if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
print "Output file:", out_file
logger.info("Contents of output file: '%s'" % out_file)
print open(out_file).read().rstrip()
print "-"*20
if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
print "Error file:", err_file
logger.info("Contents of error file: '%s'" % err_file)
print open(err_file).read().rstrip()
print "-"*40
def _write_array_jobs(array_jobs):
for array_job in array_jobs:
if unfinished or array_job.status == 'finished':
if unfinished or array_job.status in ('success', 'failure'):
print "Array Job", str(array_job.id), ":"
_write_contents(array_job)
# check if an array job should be reported
self.lock()
# check if an array job should be reported
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
......@@ -168,14 +187,15 @@ class JobManager:
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.array:
if (unfinished or job.status in ('finished', 'executing')):
if (unfinished or job.status in ('success', 'failure', 'executing')):
print job
_write_array_jobs(job.array)
else:
if unfinished or array_job.status == 'finished':
if unfinished or job.status in ('success', 'failure'):
print job
_write_contents(job)
print "-"*60
if job.log_dir is not None:
print "-"*60
self.unlock()
......@@ -185,13 +205,18 @@ class JobManager:
def _delete_dir_if_empty(log_dir):
if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
os.rmdir(log_dir)
logger.info("Removed log directory '%s' since it was empty" % log_dir)
def _delete(job, try_to_delete_dir=False):
# delete the job from the database
if delete_logs:
out_file, err_file = job.std_out_file(), job.std_err_file()
if out_file and os.path.exists(out_file): os.remove(out_file)
if err_file and os.path.exists(err_file): os.remove(err_file)
if out_file and os.path.exists(out_file):
os.remove(out_file)
logger.debug("Removed output log file '%s'" % out_file)
if err_file and os.path.exists(err_file):
os.remove(err_file)
logger.debug("Removed error log file '%s'" % err_file)
if try_to_delete_dir:
_delete_dir_if_empty(job.log_dir)
if delete_jobs:
......@@ -199,14 +224,18 @@ class JobManager:
self.lock()
# check if array ids are specified
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
logger.debug("Deleting array job '%d' of job '%d'" % array_job.id, job.id)
_delete(array_job)
if not job.array:
logger.info("Deleting job '%d'" % job.id)
_delete(job, True)
else:
......@@ -216,10 +245,11 @@ class JobManager:
# delete all array jobs
if job.array:
for array_job in job.array:
logger.debug("Deleting array job '%d' of job '%d'" % (array_job.id, job.id))
_delete(array_job)
# delete this job
logger.info("Deleting job '%d'" % job.id)
_delete(job, True)
self.session.commit()
self.unlock()
......@@ -11,7 +11,7 @@ from .tools import logger
Base = declarative_base()
Status = ('waiting', 'executing', 'finished')
Status = ('submitted', 'queued', 'waiting', 'executing', 'success', 'failure')
class ArrayJob(Base):
"""This class defines one element of an array job."""
......@@ -43,6 +43,14 @@ class ArrayJob(Base):
else: r = "%s" % self.status
return "%s : %s" % (n, r)
def format(self, format):
"""Formats the current job into a nicer string to fit into a table."""
job_id = "%d - %d" % (self.job.id, self.id)
status = "%s" % self.status + (" (%d)" % self.result if self.result is not None else "" )
return format.format(job_id, self.job.queue_name, status)