Commit d7a151ed authored by Manuel Günther's avatar Manuel Günther

Added time stamps to log files (using logging module, when -vv flag is enabled)

parent e4368044
......@@ -192,6 +192,20 @@ This will clean up the old log files (if you didn't specify the ``--keep-logs``
If the submission is done in the grid the job id(s) will change during this process.
Note about verbosity and time stamps
------------------------------------
For some jobs, it might be interesting to get the time stamps when the job has started and when it has finished.
These time stamps are added to the log files (usually the error log file) automatically, when you use the ``-vv`` option, one when starting the process and one when it is finished.
However, there is a difference between the ``SGE`` operation and the ``--local`` operation.
For the ``SGE`` operation, you need to use the ``-vv`` option during the submission or re-submission of a job.
In ``--local`` mode, the ``-vv`` flag during execution (using ``--run-local-scheduler``) is used instead.
.. note::
Why writing info logs the error log file, and not to the default output log file?
This is the default behavior of python's logging module.
All logs, independent of whether they are error, warning, info or debug logs are written to ``sys.stderr``, which in turn will be written into the error log files.
Cleaning up
-----------
After the job was successfully (or not) executed, you should clean up the database using the ``bin/jman delete`` command.
......@@ -217,4 +231,3 @@ These tools are:
- ``bin/qsub.py``: submit job to the SGE grid without logging them into the database
- ``bin/qdel.py``: delete job from the SGE grid without logging them into the database
- ``bin/grid``: executes the command in an grid environment (i.e., as if a ``SETSHELL grid`` command would have been issued before)
......@@ -126,7 +126,7 @@ class JobManagerLocal(JobManager):
#####################################################################
###### Methods to run the jobs in parallel on the local machine #####
def _run_parallel_job(self, job_id, array_id = None, no_log = False, nice = None):
def _run_parallel_job(self, job_id, array_id = None, no_log = False, nice = None, verbosity = 0):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ['JOB_ID'] = str(job_id)
......@@ -136,7 +136,7 @@ class JobManagerLocal(JobManager):
environ['SGE_TASK_ID'] = 'undefined'
# generate call to the wrapper script
command = [self.wrapper_script, '-ld', self._database, 'run-job']
command = [self.wrapper_script, '-l%sd'%("v"*verbosity), self._database, 'run-job']
if nice is not None:
command = ['nice', '-n%d'%nice] + command
......@@ -166,7 +166,7 @@ class JobManagerLocal(JobManager):
def _format_log(self, job_id, array_id = None, array_count = 0):
return ("%d (%d/%d)" % (job_id, array_id, array_count)) if array_id is not None and array_count else ("%d (%d)" % (job_id, array_id)) if array_id is not None else ("%d" % job_id)
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False, nice = None):
def run_scheduler(self, parallel_jobs = 1, job_ids = None, sleep_time = 0.1, die_when_finished = False, no_log = False, nice = None, verbosity = 0):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
running_tasks = []
finished_tasks = set()
......@@ -222,7 +222,7 @@ class JobManagerLocal(JobManager):
for i in range(min(parallel_jobs - len(running_tasks), len(queued_array_jobs))):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(job.unique, array_job.id, no_log=no_log, nice=nice)
process = self._run_parallel_job(job.unique, array_job.id, no_log=no_log, nice=nice, verbosity=verbosity)
if process is None:
continue
running_tasks.append((process, job.unique, array_job.id))
......@@ -235,7 +235,7 @@ class JobManagerLocal(JobManager):
else:
if job.status == 'queued':
# start a new job
process = self._run_parallel_job(job.unique, no_log=no_log, nice=nice)
process = self._run_parallel_job(job.unique, no_log=no_log, nice=nice, verbosity=verbosity)
if process is None:
continue
running_tasks.append((process, job.unique))
......
......@@ -137,7 +137,8 @@ class JobManager:
job.execute(array_id, machine_name)
self.session.commit()
except Exception:
except Exception as e:
logger.error("Caught exception '%s'", e)
pass
finally:
self.unlock()
......@@ -149,21 +150,23 @@ class JobManager:
exec_dir = job.get_exec_dir()
self.unlock()
logger.info("Starting job %d: %s", job_id, " ".join(command_line))
# execute the command line of the job, and wait until it has finished
try:
result = subprocess.call(command_line, cwd=exec_dir)
logger.info("Job %d finished with result %s", job_id, str(result))
except Exception as e:
print("ERROR: The job with id '%d' could not be executed: %s" % (job_id, e), file=sys.stderr)
logger.error("The job with id '%d' could not be executed: %s", job_id, e)
result = 69 # ASCII: 'E'
# set a new status and the results of the job
try:
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
print("ERROR: The job with id '%d' could not be found in the database!" % job_id, file=sys.stderr)
logger.error("The job with id '%d' could not be found in the database!", job_id)
self.unlock()
return
......@@ -187,18 +190,16 @@ class JobManager:
self.unlock()
deps = sorted(list(dependent_job_ids))
self.stop_jobs(deps)
print ("WARNING: Stopped dependent jobs '%s' since this job failed." % str(deps), file=sys.stderr)
logger.warn ("Stopped dependent jobs '%s' since this job failed.", str(deps))
except Exception as e:
print ("ERROR: Caught exception '%s'" % e, file=sys.stderr)
logger.error("Caught exception '%s'", e)
pass
finally:
if hasattr(self, 'session'):
self.unlock()
def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, status=Status, names=None, ids_only=False):
"""Lists the jobs currently added to the database."""
# configuration for jobs
......
......@@ -113,6 +113,7 @@ def submit(args):
kwargs = {
'queue': args.qname,
'cwd': True,
'verbosity' : args.verbose,
'name': args.name,
'env': args.env,
'memfree': args.memory,
......@@ -146,7 +147,8 @@ def resubmit(args):
jm.delete(job_ids=get_ids(args.job_ids), delete_jobs=False)
kwargs = {
'cwd': True
'cwd': True,
'verbosity' : args.verbose
}
if args.qname is not None:
kwargs['queue'] = args.qname
......@@ -171,7 +173,7 @@ def run_scheduler(args):
if not args.local:
raise ValueError("The execute command can only be used with the '--local' command line option")
jm = setup(args)
jm.run_scheduler(parallel_jobs=args.parallel, job_ids=get_ids(args.job_ids), sleep_time=args.sleep_time, die_when_finished=args.die_when_finished, no_log=args.no_log_files, nice=args.nice)
jm.run_scheduler(parallel_jobs=args.parallel, job_ids=get_ids(args.job_ids), sleep_time=args.sleep_time, die_when_finished=args.die_when_finished, no_log=args.no_log_files, nice=args.nice, verbosity=args.verbose)
def list(args):
......
......@@ -48,7 +48,7 @@ class JobManagerSGE(JobManager):
return 'all.q'
def _submit_to_grid(self, job, name, array, dependencies, log_dir, **kwargs):
def _submit_to_grid(self, job, name, array, dependencies, log_dir, verbosity, **kwargs):
# ... what we will actually submit to the grid is a wrapper script that will call the desired command...
# get the name of the file that was called originally
jman = self.wrapper_script
......@@ -59,7 +59,7 @@ class JobManagerSGE(JobManager):
deps = sorted(list(set([j.id for j in dependent_jobs])))
# generate call to the wrapper script
command = make_shell(python, [jman, '-d', self._database, 'run-job'])
command = make_shell(python, [jman, '-d%s' % ('v'*verbosity), self._database, 'run-job'])
q_array = "%d-%d:%d" % array if array else None
grid_id = qsub(command, context=self.context, name=name, deps=deps, array=q_array, stdout=log_dir, stderr=log_dir, **kwargs)
......@@ -80,7 +80,7 @@ class JobManagerSGE(JobManager):
return job.unique
def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = "logs", dry_run = False, stop_on_failure = False, **kwargs):
def submit(self, command_line, name = None, array = None, dependencies = [], exec_dir = None, log_dir = "logs", dry_run = False, verbosity = 0, stop_on_failure = False, **kwargs):
"""Submits a job that will be executed in the grid."""
# add job to database
self.lock()
......@@ -95,7 +95,7 @@ class JobManagerSGE(JobManager):
job_id = None
else:
job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, **kwargs)
job_id = self._submit_to_grid(job, name, array, dependencies, log_dir, verbosity, **kwargs)
self.session.commit()
self.unlock()
......@@ -126,7 +126,7 @@ class JobManagerSGE(JobManager):
self.unlock()
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, **kwargs):
def resubmit(self, job_ids = None, also_success = False, running_jobs = False, new_command=None, verbosity=0, **kwargs):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
......@@ -159,7 +159,7 @@ class JobManagerSGE(JobManager):
else:
deps = [dep.unique for dep in job.get_jobs_we_wait_for()]
logger.debug("Re-submitting job '%s' with dependencies '%s' to the grid." % (job, deps))
self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, **arguments)
self._submit_to_grid(job, job.name, job.get_array(), deps, job.log_dir, verbosity, **arguments)
# commit after each job to avoid failures of not finding the job during execution in the grid
self.session.commit()
......
......@@ -162,8 +162,8 @@ class GridTKTest(unittest.TestCase):
# ... but the log dir still exists
self.assertTrue(os.path.exists(self.log_dir))
# now, let the scheduler run all jobs
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished'])
# now, let the scheduler run all jobs, but this time in verbose mode
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '-vv', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished'])
# and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards)
self.scheduler_job.wait()
self.scheduler_job = None
......@@ -172,7 +172,7 @@ class GridTKTest(unittest.TestCase):
self.assertTrue(os.path.isfile(out_file))
self.assertTrue(os.path.isfile(err_file))
self.assertEqual(open(out_file).read().rstrip(), 'This is a text message to std-out')
self.assertEqual(open(err_file).read().split('\n')[0], 'This is a text message to std-err')
self.assertEqual(open(err_file).read().split('\n')[1], 'This is a text message to std-err')
# check that exactly four output and four error files have been created
files = os.listdir(self.log_dir)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment