Commit 773d7b52 authored by Manuel Günther's avatar Manuel Günther

Improved behavior of 'jman ls' and 'jman del' commands; added more informative...

Improved behavior of 'jman ls' and 'jman del' commands; added more informative output during scheduler run.
parent 3b5942f7
......@@ -121,13 +121,13 @@ class JobManagerLocal(JobManager):
# generate call to the wrapper script
command = [self.wrapper_script, '-ld', self._database, 'run-job']
logger.info("Started execution of Job '%s'" % self._format_log(job_id, array_id))
job = self.get_jobs((job_id,))[0]
logger.info("Starting execution of Job '%s': '%s'" % (self._format_log(job_id, array_id), job.name))
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
job = self.get_jobs((job_id,))[0]
logger.error("Could not execute job '%s' locally,\nreason:\t%s,\ncommand_line\t%s:" % (self._format_log(job_id, array_id), e, job.get_command_line()))
job.finish(117, array_id) # ASCII 'O'
return None
......
......@@ -3,7 +3,7 @@ from __future__ import print_function
import os
import subprocess
from .models import Base, Job, ArrayJob
from .models import Base, Job, ArrayJob, Status
from .tools import logger
import sqlalchemy
......@@ -201,12 +201,13 @@ class JobManager:
def _write_array_jobs(array_jobs):
for array_job in array_jobs:
if unfinished or array_job.status in ('success', 'failure'):
if unfinished or array_job.status in accepted_status:
print("Array Job", str(array_job.id), ":")
_write_contents(array_job)
self.lock()
accepted_status = ('failure',) if error and not output else ('success', 'failure')
# check if an array job should be reported
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
......@@ -219,20 +220,20 @@ class JobManager:
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.array:
if (unfinished or job.status in ('success', 'failure', 'executing')):
if unfinished or job.status in accepted_status or job.status == 'executing':
print(job)
_write_array_jobs(job.array)
else:
if unfinished or job.status in ('success', 'failure'):
if unfinished or job.status in accepted_status:
print(job)
_write_contents(job)
if job.log_dir is not None:
if job.log_dir is not None and job.status in accepted_status:
print("-"*60)
self.unlock()
def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, delete_jobs = True):
def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True):
"""Deletes the jobs with the given ids from the database."""
def _delete_dir_if_empty(log_dir):
if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
......@@ -264,11 +265,13 @@ class JobManager:
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
_delete(array_job)
if array_job.status in status:
logger.debug("Deleting array job '%d' of job '%d' from the database." % array_job.id, job.id)
_delete(array_job)
if not job.array:
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
if job.status in status:
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
else:
# iterate over all jobs
......@@ -277,11 +280,13 @@ class JobManager:
# delete all array jobs
if job.array:
for array_job in job.array:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
_delete(array_job)
if array_job.status in status:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.id))
_delete(array_job)
# delete this job
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
if job.status in status:
logger.info("Deleting job '%d' from the database." % job.id)
_delete(job, True)
self.session.commit()
self.unlock()
......@@ -21,6 +21,7 @@ import logging
from ..tools import make_shell, logger
from .. import local, sge
from ..models import Status
def setup(args):
"""Returns the JobManager and sets up the basic infrastructure"""
......@@ -149,10 +150,10 @@ def delete(args):
"""Deletes the jobs from the job manager. If the jobs are still running in the grid, they are stopped."""
jm = setup(args)
# first, stop the jobs if they are running in the grid
if not args.local:
if not args.local and 'executing' in args.status:
stop(args)
# then, delete them from the database
jm.delete(job_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir)
jm.delete(job_ids=args.job_ids, array_ids=args.array_ids, delete_logs=not args.keep_logs, delete_log_dir=not args.keep_log_dir, status=args.status)
def run_job(args):
......@@ -275,6 +276,7 @@ def main(command_line_options = None):
delete_parser.add_argument('-a', '--array-ids', metavar='ID', nargs='*', type=int, help='Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.')
delete_parser.add_argument('-r', '--keep-logs', action='store_true', help='If set, the log files will NOT be removed.')
delete_parser.add_argument('-R', '--keep-log-dir', action='store_true', help='When removing the logs, keep the log directory.')
delete_parser.add_argument('-s', '--status', nargs='+', choices = Status, default = Status, help='Delete only jobs that have the given stati; by default all jobs are deleted.')
delete_parser.set_defaults(func=delete)
# subcommand 'run_scheduler'
......
......@@ -9,7 +9,6 @@ probing.
import os
import re
import sys
import six
import hashlib
import random
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment