Commit 5b40fbe5 authored by Manuel Günther's avatar Manuel Günther

Added option to run submitted job in a given directory (not tested in grid mode)

parent 99da06a6
......@@ -38,7 +38,7 @@ class JobManagerLocal(JobManager):
JobManager.__init__(self, **kwargs)
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed on the local machine during a call to "run".
All kwargs will simply be ignored."""
# remove duplicate dependencies
......@@ -46,7 +46,7 @@ class JobManagerLocal(JobManager):
# add job to database
self.lock()
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir, stop_on_failure=stop_on_failure)
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, exec_dir=exec_dir, log_dir=log_dir, stop_on_failure=stop_on_failure)
logger.info("Added job '%s' to the database", job)
if dry_run:
......@@ -158,7 +158,7 @@ class JobManagerLocal(JobManager):
try:
return subprocess.Popen(command, env=environ, stdout=out, stderr=err, bufsize=1)
except OSError as e:
logger.error("Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- command:\t%s", job.name, self._format_log(job_id, array_id, len(job.array)), e, " ".join(job.get_command_line()), " ".join(command))
logger.error("Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s", job.name, self._format_log(job_id, array_id, len(job.array)), e, " ".join(job.get_command_line()), "." if job.exec_dir is None else job.exec_dir, " ".join(command))
job.finish(117, array_id) # ASCII 'O'
return None
......@@ -262,8 +262,11 @@ class JobManagerLocal(JobManager):
self.unlock()
logger.info("Stopping task scheduler due to user interrupt.")
for task in running_tasks:
logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
task[0].kill()
logger.warn("Killing job '%s' that was still running.", self._format_log(task[1], task[2] if len(task) > 2 else None))
try:
task[0].kill()
except OSError as e:
logger.error("Killing job '%s' was not successful: '%s'", self._format_log(task[1], task[2] if len(task) > 2 else None), e)
self.stop_job(task[1])
# stop all jobs that are currently running or queued
self.stop_jobs(job_ids)
......
......@@ -147,11 +147,12 @@ class JobManager:
self.lock()
job = self.get_jobs((job_id,))[0]
command_line = job.get_command_line()
exec_dir = job.get_exec_dir()
self.unlock()
# execute the command line of the job, and wait until it has finished
try:
result = subprocess.call(command_line)
result = subprocess.call(command_line, cwd=exec_dir)
except Exception as e:
print("ERROR: The job with id '%d' could not be executed: %s" % (job_id, e), file=sys.stderr)
result = 69 # ASCII: 'E'
......
......@@ -71,6 +71,7 @@ class Job(Base):
machine_name = Column(String(10)) # The name of the machine in which the job is run
grid_arguments = Column(String(255)) # The kwargs arguments for the job submission (e.g. in the grid)
id = Column(Integer) # The ID of the job as given from the grid
exec_dir = Column(String(255)) # The directory in which the command should be executed
log_dir = Column(String(255)) # The directory where the log files will be put to
array_string = Column(String(255)) # The array string (only needed for re-submission)
stop_on_failure = Column(Boolean) # An indicator whether to stop depending jobs when this job finishes with an error
......@@ -78,13 +79,14 @@ class Job(Base):
status = Column(Enum(*Status))
result = Column(Integer)
def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
def __init__(self, command_line, name = None, exec_dir = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
"""Constructs a Job object without an ID (needs to be set later)."""
self.command_line = dumps(command_line)
self.name = name
self.queue_name = queue_name # will be set during the queue command later
self.machine_name = machine_name # will be set during the execute command later
self.grid_arguments = dumps(kwargs)
self.exec_dir = exec_dir
self.log_dir = log_dir
self.stop_on_failure = stop_on_failure
self.array_string = dumps(array_string)
......@@ -207,6 +209,13 @@ class Job(Base):
"""Sets / overwrites the command line for the job."""
self.command_line = dumps(command_line)
def get_exec_dir(self):
"""Returns the command line for the job."""
# In python 2, the command line is unicode, which needs to be converted to string before pickling;
# In python 3, the command line is bytes, which can be pickled directly
return str(os.path.realpath(self.exec_dir)) if self.exec_dir is not None else None
def get_array(self):
"""Returns the array arguments for the job; usually a string."""
......@@ -292,6 +301,8 @@ class Job(Base):
if grid_opt:
# add additional information about the job at the end
command_line = "<" + ",".join(["%s=%s" % (key,value) for key,value in grid_opt.iteritems()]) + ">: " + command_line
if self.exec_dir is not None:
command_line += "; [Executed in directory: '%s']" % self.exec_dir
if dependencies:
deps = str(sorted(list(set([dep.unique for dep in self.get_jobs_we_wait_for()]))))
......@@ -321,9 +332,9 @@ class JobDependence(Base):
def add_job(session, command_line, name = 'job', dependencies = [], array = None, log_dir = None, stop_on_failure = False, **kwargs):
def add_job(session, command_line, name = 'job', dependencies = [], array = None, exec_dir=None, log_dir = None, stop_on_failure = False, **kwargs):
"""Helper function to create a job, add the dependencies and the array jobs."""
job = Job(command_line=command_line, name=name, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
job = Job(command_line=command_line, name=name, exec_dir=exec_dir, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
session.add(job)
session.flush()
......
......@@ -120,6 +120,7 @@ def submit(args):
}
if args.array is not None: kwargs['array'] = get_array(args.array)
if args.exec_dir is not None: kwargs['exec_dir'] = args.exec_dir
if args.log_dir is not None: kwargs['log_dir'] = args.log_dir
if args.dependencies is not None: kwargs['dependencies'] = args.dependencies
if args.qname != 'all.q': kwargs['hvmem'] = args.memory
......@@ -130,6 +131,7 @@ def submit(args):
kwargs['dry_run'] = args.dry_run
kwargs['stop_on_failure'] = args.stop_on_failure
# submit the job
job_id = jm.submit(args.job, **kwargs)
......@@ -283,6 +285,7 @@ def main(command_line_options = None):
submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
submit_parser.add_argument('-d', '--exec-dir', metavar='DIR', help='Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory')
submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.')
submit_parser.add_argument('-s', '--environment', metavar='KEY=VALUE', dest='env', nargs='*', default=[], help='Passes specific environment variables to the job.')
submit_parser.add_argument('-t', '--array', '--parametric', metavar='(first-)last(:step)', help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).")
......
......@@ -80,11 +80,11 @@ class JobManagerSGE(JobManager):
return job.unique
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed in the grid."""
# add job to database
self.lock()
job = add_job(self.session, command_line, name, dependencies, array, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
job = add_job(self.session, command_line, name, dependencies, array, exec_dir=exec_dir, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
logger.info("Added job '%s' to the database." % job)
if dry_run:
print("Would have added the Job")
......
......@@ -47,10 +47,12 @@ class GridTKTest(unittest.TestCase):
# first, add some commands to the database
script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh')
script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh')
rdir = pkg_resources.resource_filename('gridtk', 'tests')
from gridtk.script import jman
# add a simple script that will write some information to the
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', bash, script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', bash, script_2])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_3', '--dependencies', '1', '2', '--exec-dir', rdir, bash, "test_array.sh"])
# check that the database was created successfully
self.assertTrue(os.path.exists(self.database))
......@@ -64,20 +66,24 @@ class GridTKTest(unittest.TestCase):
job_manager = gridtk.local.JobManagerLocal(database=self.database)
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 2)
self.assertEqual(len(jobs), 3)
self.assertEqual(jobs[0].id, 1)
self.assertEqual(jobs[1].id, 2)
self.assertEqual(jobs[2].id, 3)
self.assertEqual(len(jobs[1].array), 4)
self.assertEqual(jobs[0].status, 'submitted')
self.assertEqual(jobs[1].status, 'submitted')
self.assertEqual(jobs[2].status, 'submitted')
# check that the job dependencies are correct
waiting = jobs[0].get_jobs_waiting_for_us()
self.assertEqual(len(waiting), 1)
self.assertEqual(len(waiting), 2)
self.assertEqual(waiting[0].id, 2)
waited = jobs[1].get_jobs_we_wait_for()
self.assertEqual(len(waited), 1)
self.assertEqual(waiting[1].id, 3)
waited = jobs[2].get_jobs_we_wait_for()
self.assertEqual(len(waited), 2)
self.assertEqual(waited[0].id, 1)
self.assertEqual(waited[1].id, 2)
job_manager.unlock()
......@@ -93,13 +99,14 @@ class GridTKTest(unittest.TestCase):
# now, the first job needs to have status failure, and the second needs to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 2)
self.assertEqual(len(jobs), 3)
if jobs[0].status in ('submitted', 'queued', 'executing'):
# on slow machines, we don0t want the tests to fail, so we just skip
job_manager.unlock()
raise nose.plugins.skip.SkipTest("This machine seems to be quite slow in processing parallel jobs.")
self.assertEqual(jobs[0].status, 'failure')
self.assertEqual(jobs[1].status, 'queued')
self.assertEqual(jobs[2].status, 'waiting')
# the result files should already be there
self.assertTrue(os.path.exists(jobs[0].std_out_file()))
self.assertTrue(os.path.exists(jobs[0].std_err_file()))
......@@ -121,7 +128,7 @@ class GridTKTest(unittest.TestCase):
# Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 2)
self.assertEqual(len(jobs), 3)
if jobs[0].status in ('queued', 'executing') or jobs[1].status == 'queued':
# on slow machines, we don0t want the tests to fail, so we just skip
job_manager.unlock()
......@@ -169,7 +176,7 @@ class GridTKTest(unittest.TestCase):
# check that exactly four output and four error files have been created
files = os.listdir(self.log_dir)
self.assertEqual(len(files), 10)
self.assertEqual(len(files), 12)
for i in range(1,8,2):
self.assertTrue('test_2.o2.%d'%i in files)
self.assertTrue('test_2.e2.%d'%i in files)
......@@ -177,13 +184,15 @@ class GridTKTest(unittest.TestCase):
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 2)
self.assertEqual(len(jobs), 3)
self.assertEqual(jobs[1].status, 'failure')
self.assertEqual(jobs[1].array[0].status, 'failure')
self.assertEqual(jobs[1].array[0].result, 1)
for i in range(1,4):
self.assertEqual(jobs[1].array[i].status, 'success')
self.assertEqual(jobs[1].array[i].result, 0)
self.assertEqual(jobs[2].status, 'success')
self.assertEqual(jobs[2].result, 0)
job_manager.unlock()
print()
......@@ -195,7 +204,7 @@ class GridTKTest(unittest.TestCase):
jman.main(['./bin/jman', '--database', self.database, 'report'])
# clean-up
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete', '--job-ids', '1-2'])
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete', '--job-ids', '1-3'])
# check that the database and the log files are gone
self.assertEqual(len(os.listdir(self.temp_dir)), 0)
......@@ -203,6 +212,7 @@ class GridTKTest(unittest.TestCase):
# add the scripts again, but this time with the --stop-on-failure option
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', bash, script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', bash, script_2])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_3', '--dependencies', '1', '2', '--exec-dir', rdir, '--stop-on-failure', bash, "test_array.sh"])
# and execute them, but without writing the log files
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files'])
......@@ -218,15 +228,18 @@ class GridTKTest(unittest.TestCase):
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
self.assertEqual(len(jobs), 2)
self.assertEqual(len(jobs), 3)
self.assertEqual(jobs[0].status, 'failure')
self.assertEqual(jobs[0].result, 255)
self.assertEqual(jobs[1].status, 'failure')
self.assertTrue(jobs[1].result is None)
self.assertEqual(jobs[2].status, 'failure')
self.assertTrue(jobs[2].result is None)
job_manager.unlock()
# and clean up again
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
self.assertEqual(len(os.listdir(self.temp_dir)), 0)
except KeyboardInterrupt:
# make sure that the keyboard interrupt is captured and the mess is cleaned up (i.e. by calling tearDown)
......
1.2.5b0
\ No newline at end of file
1.3.0b0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment