Skip to content
Snippets Groups Projects
Commit 3eae2db0 authored by Manuel Günther's avatar Manuel Günther
Browse files

Used a more clever way to guess the 'wrapper_script' parameter

parent 4a027fbe
Branches
Tags
No related merge requests found
......@@ -158,7 +158,7 @@ class JobManagerLocal(JobManager):
try:
return subprocess.Popen(command, env=environ, stdout=out, stderr=err, bufsize=1)
except OSError as e:
logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id, len(job.array)), e, job.get_command_line()))
logger.error("Could not execute job '%s' locally\n- reason:\t%s\n- command line:\t%s\n- command:\t%s", self._format_log(job_id, array_id, len(job.array)), e, " ".join(job.get_command_line()), " ".join(command))
job.finish(117, array_id) # ASCII 'O'
return None
......@@ -189,8 +189,11 @@ class JobManagerLocal(JobManager):
job, array_job = self._job_and_array(job_id, array_id)
if array_job: job = array_job
result = "%s (%d)" % (job.status, job.result) if job.result is not None else "%s (?)" % job.status
if job.status not in ('success', 'failure'):
logger.error("Job '%s' finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!", self._format_log(job_id, array_id), job.status)
raise StopIteration("Job did not finish correctly.")
self.unlock()
logger.info("Job '%s' finished execution with result %s" % (self._format_log(job_id, array_id), result))
logger.info("Job '%s' finished execution with result '%s'" % (self._format_log(job_id, array_id), result))
finished_tasks.add(job_id)
# in any case, remove the job from the list
del running_tasks[task_index]
......@@ -254,7 +257,7 @@ class JobManagerLocal(JobManager):
time.sleep(sleep_time)
# This is the only way to stop: you have to interrupt the scheduler
except KeyboardInterrupt:
except (KeyboardInterrupt, StopIteration):
if hasattr(self, 'session'):
self.unlock()
logger.info("Stopping task scheduler due to user interrupt.")
......
......@@ -7,6 +7,7 @@ import socket # to get the host name
from .models import Base, Job, ArrayJob, Status
from .tools import logger
import sqlalchemy
"""This file defines a minimum Job Manager interface."""
......@@ -15,12 +16,21 @@ sqlalchemy_version = [int(v) for v in sqlalchemy.__version__.split('.')]
class JobManager:
"""This job manager defines the basic interface for handling jobs in the SQL database."""
def __init__(self, database, wrapper_script = './bin/jman', debug = False):
def __init__(self, database, wrapper_script = None, debug = False):
self._database = os.path.realpath(database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, connect_args={'timeout': 600}, echo=debug)
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
# store the command that this job manager was called with
if wrapper_script is None:
# try to find the executable, search in the bin path first
import distutils.spawn
wrapper_script = os.path.realpath(distutils.spawn.find_executable('jman', '.' + os.pathsep + 'bin' + os.pathsep + os.environ['PATH']))
if wrapper_script is None:
raise IOError("Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager.")
if not os.path.exists(wrapper_script):
raise IOError("Your wrapper_script cannot be found. Jobs will not be executable.")
self.wrapper_script = wrapper_script
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment