diff --git a/gridtk/local.py b/gridtk/local.py index 2ab28c723575f3c46cd992cdc660d1f89563da2c..015d4cacc38c55d9c4944ae45537d5f50be6a3ec 100644 --- a/gridtk/local.py +++ b/gridtk/local.py @@ -76,32 +76,37 @@ class JobManagerLocal(JobManager): self.unlock() - def stop_jobs(self, job_ids): - """Stops the jobs in the grid.""" + def stop_jobs(self, job_ids=None): + """Resets the status of the job to 'submitted' when they are labeled as 'executing'.""" self.lock() jobs = self.get_jobs(job_ids) for job in jobs: - if job.status == 'executing': - logger.info("Reset job '%s' in the database" % job) - job.status = 'submitted' + if job.status in ('executing', 'queued', 'waiting'): + logger.info("Reset job '%s' in the database" % job.name) + job.submit() self.session.commit() self.unlock() def stop_job(self, job_id, array_id = None): - """Stops the jobs in the grid.""" + """Resets the status of the given to 'submitted' when they are labeled as 'executing'.""" self.lock() job, array_job = self._job_and_array(job_id, array_id) if job is not None: - if job.status == 'executing': - logger.info("Reset job '%s' in the database" % job) + if job.status in ('executing', 'queued', 'waiting'): + logger.info("Reset job '%s' in the database" % job.name) job.status = 'submitted' - if array_job is not None and array_job.status == 'executing': + if array_job is not None and array_job.status in ('executing', 'queued', 'waiting'): logger.debug("Reset array job '%s' in the database" % array_job) array_job.status = 'submitted' + if array_job is None: + for array_job in job.array: + if array_job.status in ('executing', 'queued', 'waiting'): + logger.debug("Reset array job '%s' in the database" % array_job) + array_job.status = 'submitted' self.session.commit() self.unlock() @@ -257,10 +262,12 @@ class JobManagerLocal(JobManager): # This is the only way to stop: you have to interrupt the scheduler except KeyboardInterrupt: + if hasattr(self, 'session'): + self.unlock() logger.info("Stopping task scheduler due to user interrupt.") for task in running_tasks: logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None)) task[0].kill() - if hasattr(self, 'session'): - self.unlock() - self.stop_job(task[1], task[2] if len(task) > 2 else None) + self.stop_job(task[1]) + # stopp all jobs that are currently running or queued + self.stop_jobs() diff --git a/gridtk/manager.py b/gridtk/manager.py index 0fa2cb4012cc84879ead1ec0fe5557f0425e16c9..0b7aabbb376aa62638f601d86ea313f945a9d40e 100644 --- a/gridtk/manager.py +++ b/gridtk/manager.py @@ -123,7 +123,7 @@ class JobManager: self.unlock() try: self.stop_jobs(list(dependent_job_ids)) - logger.warn("Deleted dependent jobs '%s' since this job failed.") + logger.warn("Deleted dependent jobs '%s' since this job failed." % str(list(dependent_job_ids))) except: pass return @@ -243,7 +243,7 @@ class JobManager: def _delete_dir_if_empty(log_dir): if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir): os.rmdir(log_dir) - logger.info("Removed log directory '%s' since it was empty" % log_dir) + logger.info("Removed empty log directory '%s'" % log_dir) def _delete(job, try_to_delete_dir=False): # delete the job from the database @@ -271,11 +271,13 @@ class JobManager: job = array_jobs[0].job for array_job in array_jobs: if array_job.status in status: - logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id) + if delete_jobs: + logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id) _delete(array_job) if not job.array: if job.status in status: - logger.info("Deleting job '%d' from the database." % job.id) + if delete_jobs: + logger.info("Deleting job '%d' from the database." % job.id) _delete(job, True) else: @@ -286,11 +288,13 @@ class JobManager: if job.array: for array_job in job.array: if array_job.status in status: - logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id)) + if delete_jobs: + logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id)) _delete(array_job) # delete this job if job.status in status: - logger.info("Deleting job '%d' from the database." % job.id) + if delete_jobs: + logger.info("Deleting job '%d' from the database." % job.id) _delete(job, True) self.session.commit() diff --git a/gridtk/models.py b/gridtk/models.py index d133ddf145a70b9edf881a7f523002f1acccca93..aeda6d2276c529a4398f43ebcadf5d22841efa4d 100644 --- a/gridtk/models.py +++ b/gridtk/models.py @@ -124,7 +124,8 @@ class Job(Base): self.status = new_status for array_job in self.array: - array_job.status = new_status + if array_job.status not in ('success', 'failure'): + array_job.status = new_status def execute(self, array_id = None): diff --git a/gridtk/script/jman.py b/gridtk/script/jman.py index 54c739049ecaf378f1b9746090ac77a16fc16ddf..0fcc1afdd548d38dbeb1e776773fe3da4ed01196 100644 --- a/gridtk/script/jman.py +++ b/gridtk/script/jman.py @@ -106,7 +106,7 @@ def resubmit(args): """Re-submits the jobs with the given ids.""" jm = setup(args) if not args.keep_logs: - jm.delete(job_ids=args.job_ids, delete_jobs = False) + jm.delete(job_ids=args.job_ids, delete_jobs=False) jm.resubmit(args.job_ids, args.failed_only, args.running_jobs) @@ -224,7 +224,7 @@ def main(command_line_options = None): submit_parser = cmdparser.add_parser('submit', aliases=['sub'], help='Submits jobs to the SGE queue or to the local job scheduler and logs them in a database.') submit_parser.add_argument('-q', '--queue', metavar='QNAME', dest='qname', default='all.q', help='the name of the SGE queue to submit the job to') submit_parser.add_argument('-m', '--memory', help='Sets both the h_vmem and the mem_free parameters when submitting the job to the specified value, e.g. 8G to set the memory requirements to 8 gigabytes') - submit_parser.add_argument('-n', '--name', dest='name', help='Sets the jobname') + submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name') submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces') submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.') submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.') @@ -265,7 +265,7 @@ def main(command_line_options = None): report_parser = cmdparser.add_parser('report', aliases=['rep', 'r'], help='Iterates through the result and error log files and prints out the logs.') report_parser.add_argument('-e', '--errors-only', action='store_true', help='Only report the error logs (by default, both logs are reported).') report_parser.add_argument('-o', '--output-only', action='store_true', help='Only report the output logs (by default, both logs are reported).') - report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs.') + report_parser.add_argument('-u', '--unfinished-also', action='store_true', help='Report also the unfinished jobs; use this option also to check error files for jobs with success status.') report_parser.add_argument('-j', '--job-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given ids (by default, all finished jobs are reported)') report_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Report only the jobs with the given array ids. If specified, a single job-id must be given as well.') report_parser.set_defaults(func=report) @@ -276,7 +276,7 @@ def main(command_line_options = None): delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.') delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.') delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.') - delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given stati; by default all jobs are deleted.') + delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given statuses; by default all jobs are deleted.') delete_parser.set_defaults(func=delete) # subcommand 'run_scheduler'