Commit def4cfd0 authored by Manuel Günther's avatar Manuel Günther

Implemented resubmission of jobs to another queue; enabled jman's --job-ids...

Implemented resubmission of jobs to another queue; enabled jman's --job-ids parameter to be more sophisticated; listed command line is now wrapped with '' e.g. to allow directories with spaces; local jobs can now be run with priority; added warning when queue combination is not valid (and jobs will never execute); fixed small bugs.
parent b40364d1
......@@ -154,6 +154,11 @@ Usually, it is a good idea to combine the ``-a`` option with ``-j``, which will
$ bin/jman -vv list -a -j [job_id_1] [job_id_2]
Note that the ``-j`` option is in general relatively smart.
You can use it to select a range of job ids, e.g., ``-j 1-4 6-8``.
In this case, please assert that there are no spaces between job ids and the ``-`` separator.
If any job id is specified, which is not available in the database, it will simply be ignored, including job ids that in the ranges.
Inspecting log files
--------------------
......@@ -170,6 +175,7 @@ E.g.:
will print the contents of the output and error log file from the job with the desired ID (and only the array job with the given ID).
To report only the output or only the error logs, you can use the ``-o`` or ``-e`` option, respectively.
When some (array-)jobs are still running, use the ``-u`` option to list their current output and/or error logs.
Hopefully, that helps in debugging the problem!
......@@ -198,9 +204,9 @@ E.g. use:
.. code-block:: sh
$ bin/jman -vv delete -s success
$ bin/jman -vv delete -s success -j 10-20
to delete all jobs and the logs of all successfully finished jobs from the database.
to delete all jobs and the logs of all successfully finished jobs with job ids from 10 to 20 from the database.
Other command line tools
......
......@@ -41,6 +41,9 @@ class JobManagerLocal(JobManager):
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed on the local machine during a call to "run".
All kwargs will simply be ignored."""
# remove duplicate dependencies
dependencies = sorted(list(set(dependencies)))
# add job to database
self.lock()
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir, stop_on_failure=stop_on_failure)
......@@ -59,12 +62,12 @@ class JobManagerLocal(JobManager):
return job_id
def resubmit(self, job_ids = None, failed_only = False, running_jobs = False):
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, **kwargs):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
accepted_old_status = ('failure',) if failed_only else ('success', 'failure')
accepted_old_status = ('success', 'failure') if also_success else ('failure',)
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status in accepted_old_status:
......@@ -82,7 +85,7 @@ class JobManagerLocal(JobManager):
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status in ('executing', 'queued', 'waiting'):
if job.status in ('executing', 'queued', 'waiting') and job.queue_name == 'local':
logger.info("Reset job '%s' in the database" % job.name)
job.submit()
......@@ -115,7 +118,7 @@ class JobManagerLocal(JobManager):
#####################################################################
###### Methods to run the jobs in parallel on the local machine #####
def _run_parallel_job(self, job_id, array_id = None, no_log = False):
def _run_parallel_job(self, job_id, array_id = None, no_log = False, nice = None):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ['JOB_ID'] = str(job_id)
......@@ -127,6 +130,9 @@ class JobManagerLocal(JobManager):
# generate call to the wrapper script
command = [self.wrapper_script, '-ld', self._database, 'run-job']
if nice is not None:
command = ['nice', '-n%d'%nice] + command
job, array_job = self._job_and_array(job_id, array_id)
logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id, len(job.array)), job.name))
# create log files
......@@ -152,7 +158,7 @@ class JobManagerLocal(JobManager):
def _format_log(self, job_id, array_id = None, array_count = 0):
return ("%d (%d/%d)" % (job_id, array_id, array_count)) if array_id is not None and array_count else ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False):
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False, nice = None):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
running_tasks = []
try:
......@@ -178,6 +184,7 @@ class JobManagerLocal(JobManager):
logger.info("Job '%s' finished execution with result %s" % (self._format_log(job_id, array_id), result))
# in any case, remove the job from the list
del running_tasks[task_index]
# SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE
if len(running_tasks) < parallel_jobs:
# get all unfinished jobs:
......@@ -185,7 +192,7 @@ class JobManagerLocal(JobManager):
jobs = self.get_jobs(job_ids)
# put all new jobs into the queue
for job in jobs:
if job.status == 'submitted':
if job.status == 'submitted' and job.queue_name == 'local':
job.queue()
# get all unfinished jobs that are submitted to the local queue
......@@ -202,7 +209,7 @@ class JobManagerLocal(JobManager):
for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(job.id, array_job.id, no_log=no_log)
process = self._run_parallel_job(job.id, array_job.id, no_log=no_log, nice=nice)
if process is None:
continue
running_tasks.append((process, job.id, array_job.id))
......@@ -215,7 +222,7 @@ class JobManagerLocal(JobManager):
else:
if job.status == 'queued':
# start a new job
process = self._run_parallel_job(job.id, no_log=no_log)
process = self._run_parallel_job(job.id, no_log=no_log, nice=nice)
if process is None:
continue
running_tasks.append((process, job.id))
......@@ -245,5 +252,5 @@ class JobManagerLocal(JobManager):
logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
task[0].kill()
self.stop_job(task[1])
# stopp all jobs that are currently running or queued
# stop all jobs that are currently running or queued
self.stop_jobs()
......@@ -209,7 +209,8 @@ class JobManager:
if print_array_jobs and job.array:
print(array_delimiter)
for array_job in job.array:
print(array_job.format(array_format))
if array_job.status in status:
print(array_job.format(array_format))
print(array_delimiter)
self.unlock()
......
......@@ -214,6 +214,7 @@ class Job(Base):
# In python 2, the command line is unicode, which needs to be converted to string before pickling;
# In python 3, the command line is bytes, which can be pickled directly
args = loads(self.grid_arguments)['kwargs'] if isinstance(self.grid_arguments, bytes) else loads(str(self.grid_arguments))['kwargs']
# in any case, the commands have to be converted to str
retval = {}
if 'pe_opt' in args:
retval['pe_opt'] = args['pe_opt']
......@@ -226,8 +227,14 @@ class Job(Base):
if 'io_big' in args and args['io_big']:
retval['io_big'] = True
# also add the queue
if self.queue_name is not None:
retval['queue'] = str(self.queue_name)
return retval
def set_arguments(self, **kwargs):
self.grid_arguments = dumps(kwargs)
def get_jobs_we_wait_for(self):
return [j.waited_for_job for j in self.jobs_we_have_to_wait_for if j.waited_for_job is not None]
......@@ -243,6 +250,16 @@ class Job(Base):
return os.path.join(self.log_dir, (self.name if self.name else 'job') + ".e" + str(self.id)) if self.log_dir else None
def _cmdline(self):
cmdline = self.get_command_line()
c = ""
for cmd in cmdline:
if cmd[0] == '-':
c += "%s " % cmd
else:
c += "'%s' " % cmd
return c
def __str__(self):
id = "%d" % self.id
if self.machine_name: m = "%s - %s" % (self.queue_name, self.machine_name)
......@@ -253,11 +270,11 @@ class Job(Base):
else: n = "<Job: %s>" % id
if self.result is not None: r = "%s (%d)" % (self.status, self.result)
else: r = "%s" % self.status
return "%s | %s : %s -- %s" % (n, m, r, " ".join(self.get_command_line()))
return "%s | %s : %s -- %s" % (n, m, r, self._cmdline())
def format(self, format, dependencies = 0, limit_command_line = None):
"""Formats the current job into a nicer string to fit into a table."""
command_line = " ".join(self.get_command_line())
command_line = self._cmdline()
if limit_command_line is not None and len(command_line) > limit_command_line:
command_line = command_line[:limit_command_line-3] + '...'
......@@ -271,7 +288,7 @@ class Job(Base):
command_line = "<" + ",".join(["%s=%s" % (key,value) for key,value in grid_opt.iteritems()]) + ">: " + command_line
if dependencies:
deps = str([dep.id for dep in self.get_jobs_we_wait_for()])
deps = str(sorted(list(set([dep.id for dep in self.get_jobs_we_wait_for()]))))
if dependencies < len(deps):
deps = deps[:dependencies-3] + '...'
return format.format(job_id, queue, status, self.name, deps, command_line)
......
This diff is collapsed.
......@@ -42,10 +42,10 @@ class JobManagerSGE(JobManager):
process it we have to split it twice (',' and then on '='), create a
dictionary and extract just the qname"""
if not 'hard resource_list' in kwargs: return 'all.q'
d = dict([reversed(k.split('=')) for k in kwargs['hard resource_list'].split(',')])
if not 'TRUE' in d: return 'all.q'
return d['TRUE']
d = dict([k.split('=') for k in kwargs['hard resource_list'].split(',')])
for k in d:
if k[0] == 'q' and d[k] == 'TRUE': return k
return 'all.q'
def _submit_to_grid(self, job, name, array, dependencies, log_dir, **kwargs):
......@@ -54,6 +54,9 @@ class JobManagerSGE(JobManager):
jman = self.wrapper_script
python = sys.executable
# remove duplicate dependencies
dependencies = sorted(list(set(dependencies)))
# generate call to the wrapper script
command = make_shell(python, [jman, '-d', self._database, 'run-job'])
q_array = "%d-%d:%d" % array if array else None
......@@ -67,6 +70,11 @@ class JobManagerSGE(JobManager):
logger.info("Submitted job '%s' to the SGE grid." % job)
if 'io_big' in kwargs and kwargs['io_big'] and ('queue' not in kwargs or kwargs['queue'] == 'all.q'):
logger.warn("This job will never be executed since the 'io_big' flag is not available for the 'all.q'.")
if 'pe_opt' in kwargs and ('queue' not in kwargs or kwargs['queue'] not in ('q1dm', 'q_1day_mth', 'q1wm', 'q_1week_mth')):
logger.warn("This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." % kwargs['queue'] if 'queue' in kwargs else 'all.q')
assert job.id == grid_id
return grid_id
......@@ -112,22 +120,31 @@ class JobManagerSGE(JobManager):
self.unlock()
def resubmit(self, job_ids = None, failed_only = False, running_jobs = False):
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, **kwargs):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
accepted_old_status = ('failure',) if failed_only else ('success', 'failure')
accepted_old_status = ('success', 'failure') if also_success else ('failure',)
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status in accepted_old_status:
grid_status = qstat(job.id, context=self.context)
if len(grid_status) != 0:
logger.warn("Deleting job '%d' since it was still running in the grid." % job.id)
qdel(job.id, context=self.context)
# re-submit job to the grid
if job.queue_name == 'local':
arguments = job.get_arguments()
arguments.update(**kwargs)
job.set_arguments(kwargs=arguments)
# delete old status and result of the job
job.submit()
if job.queue_name == 'local' and 'queue' not in arguments:
logger.warn("Re-submitting job '%s' locally (since no queue name is specified)." % job)
else:
logger.debug("Re-submitting job '%s' to the grid." % job)
self._submit_to_grid(job, job.name, job.get_array(), [dep.id for dep in job.get_jobs_we_wait_for()], job.log_dir, **job.get_arguments())
job.submit()
deps = [dep.id for dep in job.get_jobs_we_wait_for()]
logger.debug("Re-submitting job '%s' with dependencies '%s' to the grid." % (job, deps))
self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, **arguments)
self.session.commit()
self.unlock()
......@@ -143,6 +160,10 @@ class JobManagerSGE(JobManager):
qdel(job.id, context=self.context)
logger.info("Stopped job '%s' in the SGE grid." % job)
job.status = 'submitted'
for array_job in job.array:
if array_job.status in ('executing', 'queued', 'waiting'):
array_job.status = 'submitted'
self.session.commit()
self.unlock()
......@@ -194,7 +194,7 @@ class GridTKTest(unittest.TestCase):
jman.main(['./bin/jman', '--database', self.database, 'report'])
# clean-up
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete', '--job-ids', '1-2'])
# check that the database and the log files are gone
self.assertEqual(len(os.listdir(self.temp_dir)), 0)
......
......@@ -9,7 +9,7 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)):
setup(
name='gridtk',
version='1.1.1a0',
version='1.1.3a0',
description='SGE Grid and Local Submission and Monitoring Tools for Idiap',
url='http://github.com/idiap/gridtk',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment