From 5b40fbe56b5738d5ffe71bc9f52f7f6531fcefdc Mon Sep 17 00:00:00 2001
From: Manuel Gunther <>
Date: Tue, 15 Mar 2016 20:00:35 -0600
Subject: [PATCH] Added option to run submitted job in a given directory (not
 tested in grid mode)

 gridtk/          | 13 ++++++++-----
 gridtk/        |  3 ++-
 gridtk/         | 17 ++++++++++++++---
 gridtk/script/    |  3 +++
 gridtk/            |  4 ++--
 gridtk/tests/ | 33 +++++++++++++++++++++++----------
 version.txt              |  2 +-
 7 files changed, 53 insertions(+), 22 deletions(-)

diff --git a/gridtk/ b/gridtk/
index d6dea66..017d23b 100644
--- a/gridtk/
+++ b/gridtk/
@@ -38,7 +38,7 @@ class JobManagerLocal(JobManager):
     JobManager.__init__(self, **kwargs)
-  def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
+  def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = None, dry_run = False, stop_on_failure = False, **kwargs):
     """Submits a job that will be executed on the local machine during a call to "run".
     All kwargs will simply be ignored."""
     # remove duplicate dependencies
@@ -46,7 +46,7 @@ class JobManagerLocal(JobManager):
     # add job to database
-    job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir, stop_on_failure=stop_on_failure)
+    job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, exec_dir=exec_dir, log_dir=log_dir, stop_on_failure=stop_on_failure)"Added job '%s' to the database", job)
     if dry_run:
@@ -158,7 +158,7 @@ class JobManagerLocal(JobManager):
       return subprocess.Popen(command, env=environ, stdout=out, stderr=err, bufsize=1)
     except OSError as e:
-      logger.error("Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- command:\t%s",, self._format_log(job_id, array_id, len(job.array)), e, " ".join(job.get_command_line()), " ".join(command))
+      logger.error("Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s",, self._format_log(job_id, array_id, len(job.array)), e, " ".join(job.get_command_line()), "." if job.exec_dir is None else job.exec_dir, " ".join(command))
       job.finish(117, array_id) # ASCII 'O'
       return None
@@ -262,8 +262,11 @@ class JobManagerLocal(JobManager):
         self.unlock()"Stopping task scheduler due to user interrupt.")
       for task in running_tasks:
-        logger.warn("Killing job '%s' that was still running." % self._format_log(task[1], task[2] if len(task) > 2 else None))
-        task[0].kill()
+        logger.warn("Killing job '%s' that was still running.", self._format_log(task[1], task[2] if len(task) > 2 else None))
+        try:
+          task[0].kill()
+        except OSError as e:
+          logger.error("Killing job '%s' was not successful: '%s'", self._format_log(task[1], task[2] if len(task) > 2 else None), e)
       # stop all jobs that are currently running or queued
diff --git a/gridtk/ b/gridtk/
index eadb491..7223162 100644
--- a/gridtk/
+++ b/gridtk/
@@ -147,11 +147,12 @@ class JobManager:
     job = self.get_jobs((job_id,))[0]
     command_line = job.get_command_line()
+    exec_dir = job.get_exec_dir()
     # execute the command line of the job, and wait until it has finished
-      result =
+      result =, cwd=exec_dir)
     except Exception as e:
       print("ERROR: The job with id '%d' could not be executed: %s" % (job_id, e), file=sys.stderr)
       result = 69 # ASCII: 'E'
diff --git a/gridtk/ b/gridtk/
index 92822fa..600a67e 100644
--- a/gridtk/
+++ b/gridtk/
@@ -71,6 +71,7 @@ class Job(Base):
   machine_name = Column(String(10))            # The name of the machine in which the job is run
   grid_arguments = Column(String(255))         # The kwargs arguments for the job submission (e.g. in the grid)
   id = Column(Integer)                         # The ID of the job as given from the grid
+  exec_dir = Column(String(255))               # The directory in which the command should be executed
   log_dir = Column(String(255))                # The directory where the log files will be put to
   array_string = Column(String(255))           # The array string (only needed for re-submission)
   stop_on_failure = Column(Boolean)            # An indicator whether to stop depending jobs when this job finishes with an error
@@ -78,13 +79,14 @@ class Job(Base):
   status = Column(Enum(*Status))
   result = Column(Integer)
-  def __init__(self, command_line, name = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
+  def __init__(self, command_line, name = None, exec_dir = None, log_dir = None, array_string = None, queue_name = 'local', machine_name = None, stop_on_failure = False, **kwargs):
     """Constructs a Job object without an ID (needs to be set later)."""
     self.command_line = dumps(command_line) = name
     self.queue_name = queue_name   # will be set during the queue command later
     self.machine_name = machine_name   # will be set during the execute command later
     self.grid_arguments = dumps(kwargs)
+    self.exec_dir = exec_dir
     self.log_dir = log_dir
     self.stop_on_failure = stop_on_failure
     self.array_string = dumps(array_string)
@@ -207,6 +209,13 @@ class Job(Base):
     """Sets / overwrites the command line for the job."""
     self.command_line = dumps(command_line)
+  def get_exec_dir(self):
+    """Returns the command line for the job."""
+    # In python 2, the command line is unicode, which needs to be converted to string before pickling;
+    # In python 3, the command line is bytes, which can be pickled directly
+    return str(os.path.realpath(self.exec_dir)) if self.exec_dir is not None else None
   def get_array(self):
     """Returns the array arguments for the job; usually a string."""
@@ -292,6 +301,8 @@ class Job(Base):
       if grid_opt:
         # add additional information about the job at the end
         command_line = "<" + ",".join(["%s=%s" % (key,value) for key,value in grid_opt.iteritems()]) + ">: " + command_line
+      if self.exec_dir is not None:
+        command_line += "; [Executed in directory: '%s']" % self.exec_dir
     if dependencies:
       deps = str(sorted(list(set([dep.unique for dep in self.get_jobs_we_wait_for()]))))
@@ -321,9 +332,9 @@ class JobDependence(Base):
-def add_job(session, command_line, name = 'job', dependencies = [], array = None, log_dir = None, stop_on_failure = False, **kwargs):
+def add_job(session, command_line, name = 'job', dependencies = [], array = None, exec_dir=None, log_dir = None, stop_on_failure = False, **kwargs):
   """Helper function to create a job, add the dependencies and the array jobs."""
-  job = Job(command_line=command_line, name=name, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
+  job = Job(command_line=command_line, name=name, exec_dir=exec_dir, log_dir=log_dir, array_string=array, stop_on_failure=stop_on_failure, kwargs=kwargs)
diff --git a/gridtk/script/ b/gridtk/script/
index 5734b24..11608b3 100644
--- a/gridtk/script/
+++ b/gridtk/script/
@@ -120,6 +120,7 @@ def submit(args):
   if args.array is not None:         kwargs['array'] = get_array(args.array)
+  if args.exec_dir is not None:      kwargs['exec_dir'] = args.exec_dir
   if args.log_dir is not None:       kwargs['log_dir'] = args.log_dir
   if args.dependencies is not None:  kwargs['dependencies'] = args.dependencies
   if args.qname != 'all.q':          kwargs['hvmem'] = args.memory
@@ -130,6 +131,7 @@ def submit(args):
   kwargs['dry_run'] = args.dry_run
   kwargs['stop_on_failure'] = args.stop_on_failure
   # submit the job
   job_id = jm.submit(args.job, **kwargs)
@@ -283,6 +285,7 @@ def main(command_line_options = None):
   submit_parser.add_argument('-n', '--name', dest='name', help='Gives the job a name')
   submit_parser.add_argument('-x', '--dependencies', type=int, default=[], metavar='ID', nargs='*', help='Set job dependencies to the list of job identifiers separated by spaces')
   submit_parser.add_argument('-k', '--stop-on-failure', action='store_true', help='Stop depending jobs when this job finished with an error.')
+  submit_parser.add_argument('-d', '--exec-dir', metavar='DIR', help='Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory')
   submit_parser.add_argument('-l', '--log-dir', metavar='DIR', help='Sets the log directory. By default, "logs" is selected for the SGE. If the jobs are executed locally, by default the result is written to console.')
   submit_parser.add_argument('-s', '--environment', metavar='KEY=VALUE', dest='env', nargs='*', default=[], help='Passes specific environment variables to the job.')
   submit_parser.add_argument('-t', '--array', '--parametric', metavar='(first-)last(:step)', help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).")
diff --git a/gridtk/ b/gridtk/
index be08706..ab6515b 100644
--- a/gridtk/
+++ b/gridtk/
@@ -80,11 +80,11 @@ class JobManagerSGE(JobManager):
     return job.unique
-  def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
+  def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
     """Submits a job that will be executed in the grid."""
     # add job to database
-    job = add_job(self.session, command_line, name, dependencies, array, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)
+    job = add_job(self.session, command_line, name, dependencies, array, exec_dir=exec_dir, log_dir=log_dir, stop_on_failure=stop_on_failure, context=self.context, **kwargs)"Added job '%s' to the database." % job)
     if dry_run:
       print("Would have added the Job")
diff --git a/gridtk/tests/ b/gridtk/tests/
index aa6149e..e0e3e54 100644
--- a/gridtk/tests/
+++ b/gridtk/tests/
@@ -47,10 +47,12 @@ class GridTKTest(unittest.TestCase):
       # first, add some commands to the database
       script_1 = pkg_resources.resource_filename('gridtk.tests', '')
       script_2 = pkg_resources.resource_filename('gridtk.tests', '')
+      rdir = pkg_resources.resource_filename('gridtk', 'tests')
       from gridtk.script import jman
       # add a simple script that will write some information to the
       jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', bash, script_1])
       jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2',  '--dependencies', '1', '--parametric', '1-7:2', bash, script_2])
+      jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_3',  '--dependencies', '1', '2', '--exec-dir', rdir, bash, ""])
       # check that the database was created successfully
@@ -64,20 +66,24 @@ class GridTKTest(unittest.TestCase):
       job_manager = gridtk.local.JobManagerLocal(database=self.database)
       session = job_manager.lock()
       jobs = list(session.query(Job))
-      self.assertEqual(len(jobs), 2)
+      self.assertEqual(len(jobs), 3)
       self.assertEqual(jobs[0].id, 1)
       self.assertEqual(jobs[1].id, 2)
+      self.assertEqual(jobs[2].id, 3)
       self.assertEqual(len(jobs[1].array), 4)
       self.assertEqual(jobs[0].status, 'submitted')
       self.assertEqual(jobs[1].status, 'submitted')
+      self.assertEqual(jobs[2].status, 'submitted')
       # check that the job dependencies are correct
       waiting = jobs[0].get_jobs_waiting_for_us()
-      self.assertEqual(len(waiting), 1)
+      self.assertEqual(len(waiting), 2)
       self.assertEqual(waiting[0].id, 2)
-      waited = jobs[1].get_jobs_we_wait_for()
-      self.assertEqual(len(waited), 1)
+      self.assertEqual(waiting[1].id, 3)
+      waited = jobs[2].get_jobs_we_wait_for()
+      self.assertEqual(len(waited), 2)
       self.assertEqual(waited[0].id, 1)
+      self.assertEqual(waited[1].id, 2)
@@ -93,13 +99,14 @@ class GridTKTest(unittest.TestCase):
       # now, the first job needs to have status failure, and the second needs to be queued
       session = job_manager.lock()
       jobs = list(session.query(Job))
-      self.assertEqual(len(jobs), 2)
+      self.assertEqual(len(jobs), 3)
       if jobs[0].status in ('submitted', 'queued', 'executing'):
         # on slow machines, we don0t want the tests to fail, so we just skip
         raise nose.plugins.skip.SkipTest("This machine seems to be quite slow in processing parallel jobs.")
       self.assertEqual(jobs[0].status, 'failure')
       self.assertEqual(jobs[1].status, 'queued')
+      self.assertEqual(jobs[2].status, 'waiting')
       # the result files should already be there
@@ -121,7 +128,7 @@ class GridTKTest(unittest.TestCase):
       # Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
       session = job_manager.lock()
       jobs = list(session.query(Job))
-      self.assertEqual(len(jobs), 2)
+      self.assertEqual(len(jobs), 3)
       if jobs[0].status in ('queued', 'executing') or jobs[1].status == 'queued':
         # on slow machines, we don0t want the tests to fail, so we just skip
@@ -169,7 +176,7 @@ class GridTKTest(unittest.TestCase):
       # check that exactly four output and four error files have been created
       files = os.listdir(self.log_dir)
-      self.assertEqual(len(files), 10)
+      self.assertEqual(len(files), 12)
       for i in range(1,8,2):
         self.assertTrue('test_2.o2.%d'%i in files)
         self.assertTrue('test_2.e2.%d'%i in files)
@@ -177,13 +184,15 @@ class GridTKTest(unittest.TestCase):
       # check that all array jobs are finished now
       session = job_manager.lock()
       jobs = list(session.query(Job))
-      self.assertEqual(len(jobs), 2)
+      self.assertEqual(len(jobs), 3)
       self.assertEqual(jobs[1].status, 'failure')
       self.assertEqual(jobs[1].array[0].status, 'failure')
       self.assertEqual(jobs[1].array[0].result, 1)
       for i in range(1,4):
         self.assertEqual(jobs[1].array[i].status, 'success')
         self.assertEqual(jobs[1].array[i].result, 0)
+      self.assertEqual(jobs[2].status, 'success')
+      self.assertEqual(jobs[2].result, 0)
@@ -195,7 +204,7 @@ class GridTKTest(unittest.TestCase):
       jman.main(['./bin/jman', '--database', self.database, 'report'])
       # clean-up
-      jman.main(['./bin/jman', '--local', '--database', self.database, 'delete', '--job-ids', '1-2'])
+      jman.main(['./bin/jman', '--local', '--database', self.database, 'delete', '--job-ids', '1-3'])
       # check that the database and the log files are gone
       self.assertEqual(len(os.listdir(self.temp_dir)), 0)
@@ -203,6 +212,7 @@ class GridTKTest(unittest.TestCase):
       # add the scripts again, but this time with the --stop-on-failure option
       jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', bash, script_1])
       jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2',  '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', bash, script_2])
+      jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_3',  '--dependencies', '1', '2', '--exec-dir', rdir, '--stop-on-failure', bash, ""])
       # and execute them, but without writing the log files
       self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files'])
@@ -218,15 +228,18 @@ class GridTKTest(unittest.TestCase):
       # check that all array jobs are finished now
       session = job_manager.lock()
       jobs = list(session.query(Job))
-      self.assertEqual(len(jobs), 2)
+      self.assertEqual(len(jobs), 3)
       self.assertEqual(jobs[0].status, 'failure')
       self.assertEqual(jobs[0].result, 255)
       self.assertEqual(jobs[1].status, 'failure')
       self.assertTrue(jobs[1].result is None)
+      self.assertEqual(jobs[2].status, 'failure')
+      self.assertTrue(jobs[2].result is None)
       # and clean up again
       jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
+      self.assertEqual(len(os.listdir(self.temp_dir)), 0)
     except KeyboardInterrupt:
       # make sure that the keyboard interrupt is captured and the mess is cleaned up (i.e. by calling tearDown)
diff --git a/version.txt b/version.txt
index b118efb..e447d7d 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
\ No newline at end of file