Commit 06e02286 authored by Manuel Günther's avatar Manuel Günther

Job ids are now different from SGE ids (makes it easier to handle jobs in the database).

parent 23c79fd4
......@@ -55,7 +55,7 @@ class JobManagerLocal(JobManager):
logger.info("Deleted job '%s' from the database due to dry-run option" % job)
job_id = None
else:
job_id = job.id
job_id = job.unique
# return the new job id
self.unlock()
......@@ -209,10 +209,10 @@ class JobManagerLocal(JobManager):
for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(job.id, array_job.id, no_log=no_log, nice=nice)
process = self._run_parallel_job(job.unique, array_job.id, no_log=no_log, nice=nice)
if process is None:
continue
running_tasks.append((process, job.id, array_job.id))
running_tasks.append((process, job.unique, array_job.id))
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
array_job.status = 'executing'
......@@ -222,10 +222,10 @@ class JobManagerLocal(JobManager):
else:
if job.status == 'queued':
# start a new job
process = self._run_parallel_job(job.id, no_log=no_log, nice=nice)
process = self._run_parallel_job(job.unique, no_log=no_log, nice=nice)
if process is None:
continue
running_tasks.append((process, job.id))
running_tasks.append((process, job.unique))
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
job.status = 'executing'
......
......@@ -83,7 +83,7 @@ class JobManager:
"""Returns a list of jobs that are stored in the database."""
q = self.session.query(Job)
if job_ids:
q = q.filter(Job.id.in_(job_ids))
q = q.filter(Job.unique.in_(job_ids))
return sorted(list(q), key=lambda job: job.unique)
......@@ -128,7 +128,7 @@ class JobManager:
# there has been a dependent job that has failed before
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.id])
dependent_job_ids = set([dep.id for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs[0]
new = dep.get_jobs_waiting_for_us()
......@@ -174,20 +174,20 @@ class JobManager:
"""Lists the jobs currently added to the database."""
# configuration for jobs
if print_dependencies:
fields = ("job-id", "queue", "status", "job-name", "dependencies", "submitted command line")
lengths = (20, 14, 14, 20, 30, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:<%d}" % lengths
fields = ("job-id", "grid-id", "queue", "status", "job-name", "dependencies", "submitted command line")
lengths = (8, 20, 14, 14, 20, 30, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:^%d} {6:<%d}" % lengths
dependency_length = lengths[4]
else:
fields = ("job-id", "queue", "status", "job-name", "submitted command line")
lengths = (20, 14, 14, 20, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:<%d}" % lengths
fields = ("job-id", "grid-id", "queue", "status", "job-name", "submitted command line")
lengths = (8, 20, 14, 14, 20, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:<%d}" % lengths
dependency_length = 0
if ids_only:
self.lock()
for job in self.get_jobs():
print(job.id, end=" ")
print(job.unique, end=" ")
self.unlock()
return
......@@ -233,7 +233,7 @@ class JobManager:
def _write_array_jobs(array_jobs):
for array_job in array_jobs:
if unfinished or array_job.status in accepted_status:
print("Array Job", str(array_job.id), ":")
print("Array Job", str(array_job.unique), ":")
_write_contents(array_job)
self.lock()
......@@ -242,7 +242,7 @@ class JobManager:
# check if an array job should be reported
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs: print(array_jobs[0].job)
_write_array_jobs(array_jobs)
......@@ -292,18 +292,18 @@ class JobManager:
# check if array ids are specified
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.id.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
if array_job.status in status:
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.unique)
_delete(array_job)
if not job.array:
if job.status in status:
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.id)
logger.info("Deleting job '%d' from the database." % job.unique)
_delete(job, True)
else:
......@@ -315,12 +315,12 @@ class JobManager:
for array_job in job.array:
if array_job.status in status:
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
_delete(array_job)
# delete this job
if job.status in status:
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.id)
logger.info("Deleting job '%d' from the database." % job.unique)
_delete(job, True)
self.session.commit()
......
......@@ -70,7 +70,7 @@ class Job(Base):
queue_name = Column(String(20)) # The name of the queue
machine_name = Column(String(10)) # The name of the machine in which the job is run
grid_arguments = Column(String(255)) # The kwargs arguments for the job submission (e.g. in the grid)
id = Column(Integer, unique = True) # The ID of the job as given from the grid
id = Column(Integer) # The ID of the job as given from the grid
log_dir = Column(String(255)) # The directory where the log files will be put to
array_string = Column(String(255)) # The array string (only needed for re-submission)
stop_on_failure = Column(Boolean) # An indicator whether to stop depending jobs when this job finishes with an error
......@@ -102,6 +102,7 @@ class Job(Base):
array_job.status = 'submitted'
array_job.result = None
array_job.machine_name = None
self.id = self.unique
def queue(self, new_job_id = None, new_job_name = None, queue_name = None):
......@@ -261,7 +262,7 @@ class Job(Base):
return c
def __str__(self):
id = "%d" % self.id
id = "%d (%d)" % (self.unique, self.id)
if self.machine_name: m = "%s - %s" % (self.queue_name, self.machine_name)
else: m = self.queue_name
if self.array: a = "[%d-%d:%d]" % self.get_array()
......@@ -291,9 +292,9 @@ class Job(Base):
deps = str(sorted(list(set([dep.id for dep in self.get_jobs_we_wait_for()]))))
if dependencies < len(deps):
deps = deps[:dependencies-3] + '...'
return format.format(job_id, queue, status, self.name, deps, command_line)
return format.format(self.unique, job_id, queue, status, self.name, deps, command_line)
else:
return format.format(job_id, queue, status, self.name, command_line)
return format.format(self.unique, job_id, queue, status, self.name, command_line)
......
......@@ -136,7 +136,7 @@ class JobManagerSGE(JobManager):
if running_jobs or job.status in accepted_old_status:
grid_status = qstat(job.id, context=self.context)
if len(grid_status) != 0:
logger.warn("Deleting job '%d' since it was still running in the grid." % job.id)
logger.warn("Deleting job '%d' since it was still running in the grid." % job.unique)
qdel(job.id, context=self.context)
# re-submit job to the grid
arguments = job.get_arguments()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment