Commit 14cb0de3 authored by Manuel Günther's avatar Manuel Günther

Added possibility to resubmit job with new command line; few bug fixes.

parent 261c1d66
......@@ -62,11 +62,16 @@ class JobManagerLocal(JobManager):
return job_id
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, **kwargs):
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, **kwargs):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
if new_command is not None:
if len(jobs) == 1:
jobs[0].set_command_line(new_command)
else:
logger.warn("Ignoring new command since no single job id was specified")
accepted_old_status = ('success', 'failure') if also_success else ('failure',)
for job in jobs:
# check if this job needs re-submission
......
......@@ -249,7 +249,7 @@ class JobManager:
def _write_array_jobs(array_jobs):
for array_job in array_jobs:
if unfinished or array_job.status in accepted_status:
print("Array Job", str(array_job.id), ":")
print("Array Job", str(array_job.id), ("(%s) :"%array_job.machine_name if array_job.machine_name is not None else ":"))
_write_contents(array_job)
self.lock()
......@@ -314,7 +314,7 @@ class JobManager:
for array_job in array_jobs:
if array_job.status in status:
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.unique)
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
_delete(array_job)
if not job.array:
if job.status in status:
......
......@@ -203,6 +203,11 @@ class Job(Base):
# In python 3, the command line is bytes, which can be pickled directly
return loads(self.command_line) if isinstance(self.command_line, bytes) else loads(str(self.command_line))
def set_command_line(self, command_line):
"""Sets / overwrites the command line for the job."""
self.command_line = dumps(command_line)
def get_array(self):
"""Returns the array arguments for the job; usually a string."""
# In python 2, the command line is unicode, which needs to be converted to string before pickling;
......
......@@ -155,7 +155,7 @@ def resubmit(args):
kwargs['io_big'] = False
jm.resubmit(get_ids(args.job_ids), args.also_success, args.running_jobs, **kwargs)
jm.resubmit(get_ids(args.job_ids), args.also_success, args.running_jobs, args.overwrite_command, **kwargs)
def run_scheduler(args):
......@@ -296,6 +296,7 @@ def main(command_line_options = None):
resubmit_parser.add_argument('-k', '--keep-logs', action='store_true', help='Do not clean the log files of the old job before re-submitting.')
resubmit_parser.add_argument('-s', '--also-success', action='store_true', help='Re-submit also jobs that have finished successfully.')
resubmit_parser.add_argument('-a', '--running-jobs', action='store_true', help='Re-submit even jobs that are running or waiting (use this flag with care).')
resubmit_parser.add_argument('-o', '--overwrite-command', nargs=argparse.REMAINDER, help = "Overwrite the command line (of a single job) that should be executed (useful to keep job dependencies).")
resubmit_parser.set_defaults(func=resubmit)
# subcommand 'stop'
......
......@@ -126,11 +126,16 @@ class JobManagerSGE(JobManager):
self.unlock()
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, **kwargs):
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, **kwargs):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
if new_command is not None:
if len(jobs) == 1:
jobs[0].set_command_line(new_command)
else:
logger.warn("Ignoring new command since no single job id was specified")
accepted_old_status = ('submitted', 'success', 'failure') if also_success else ('submitted', 'failure',)
for job in jobs:
# check if this job needs re-submission
......
......@@ -106,7 +106,7 @@ class GridTKTest(unittest.TestCase):
# reset the job 1
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs', '--overwrite-command', script_1])
# now, start the local execution of the job in a parallel job
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
......
......@@ -9,7 +9,7 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)):
setup(
name='gridtk',
version='1.1.6a0',
version='1.1.7a0',
description='SGE Grid and Local Submission and Monitoring Tools for Idiap',
url='http://github.com/idiap/gridtk',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment