Commit dcf089c2 authored by Manuel Günther's avatar Manuel Günther

Increased timeout for parallel access to 600 seconds; separated reading from...

Increased timeout for parallel access to 600 seconds; separated reading from writing accesses during job execution.
parent 931decce
......@@ -17,7 +17,7 @@ class JobManager:
def __init__(self, database, wrapper_script = './bin/jman', debug = False):
self._database = os.path.realpath(database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, echo=debug)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, connect_args={'timeout': 600}, echo=debug)
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
# store the command that this job manager was called with
......@@ -111,43 +111,52 @@ class JobManager:
def run_job(self, job_id, array_id = None):
"""This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
# get the job from the database
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
return
job = jobs[0]
# get the machine name we are executing on; this might only work at idiap
machine_name = socket.gethostname()
# set the 'executing' status to the job
job.execute(array_id, machine_name)
if job.status == 'failure':
# there has been a dependent job that has failed before
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs[0]
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.id for dep in new])
# set the job's status in the database
try:
# get the job from the database
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
return
job = jobs[0]
# get the machine name we are executing on; this might only work at idiap
machine_name = socket.gethostname()
# set the 'executing' status to the job
job.execute(array_id, machine_name)
""" This is currently not working, as it seems...
if job.status == 'failure':
# there has been a dependent job that has failed before
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs[0]
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.id for dep in new])
self.unlock()
try:
self.stop_jobs(list(dependent_job_ids))
logger.warn("Stopped dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
except:
pass
return
"""
self.session.commit()
except Exception:
pass
finally:
self.unlock()
try:
self.stop_jobs(list(dependent_job_ids))
logger.warn("Deleted dependent jobs '%s' since this job failed." % str(list(dependent_job_ids)))
except:
pass
return
# get the command line of the job
# get the command line of the job from the database; does not need write access
self.lock()
job = self.get_jobs((job_id,))[0]
command_line = job.get_command_line()
self.session.commit()
self.unlock()
# execute the command line of the job, and wait until it has finished
......@@ -157,19 +166,23 @@ class JobManager:
result = 69 # ASCII: 'E'
# set a new status and the results of the job
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
logger.error("The job with id '%d' could not be found in the database!" % job_id)
self.unlock()
return
try:
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
logger.error("The job with id '%d' could not be found in the database!" % job_id)
self.unlock()
return
job = jobs[0]
job.finish(result, array_id)
job = jobs[0]
job.finish(result, array_id)
self.session.commit()
self.unlock()
self.session.commit()
except Exception:
pass
finally:
self.unlock()
def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, status=Status, names=None, ids_only=False):
......
......@@ -9,7 +9,7 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)):
setup(
name='gridtk',
version='1.1.4a0',
version='1.1.5a0',
description='SGE Grid and Local Submission and Monitoring Tools for Idiap',
url='http://github.com/idiap/gridtk',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment