Commit c13f2ec4 authored by Manuel Günther's avatar Manuel Günther

Improved the logging system (added own 'gridtk' logger).

parent c81cf516
......@@ -8,10 +8,9 @@
import os
import time
import logging
import anydbm
from cPickle import dumps, loads
from .tools import qsub, qstat, qdel
from .tools import qsub, qstat, qdel, logger
from .setshell import environ
import re
......@@ -23,7 +22,7 @@ def try_get_contents(filename):
try:
return open(filename, 'rt').read()
except OSError, e:
logging.warn("Could not find file '%s'" % filename)
logger.warn("Could not find file '%s'" % filename)
return ''
......@@ -224,7 +223,7 @@ class Job:
def check_file(name, jobname):
try:
if os.stat(name).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % jobname)
logger.debug("Job %s has a stderr file with size != 0" % jobname)
if not ignore_warnings:
return False
......@@ -235,7 +234,7 @@ class Job:
is_error = is_error or (line and 'WARNING' not in line)
return not is_error
except OSError, e:
logging.warn("Could not find error file '%s'" % name)
logger.warn("Could not find error file '%s'" % name)
return True
if self.array:
......@@ -256,10 +255,10 @@ class Job:
def check_file(name, jobname):
try:
if os.stat(name).st_size != 0:
logging.debug("Job %s has a stderr file with size != 0" % jobname)
logger.debug("Job %s has a stderr file with size != 0" % jobname)
return False
except OSError, e:
logging.warn("Could not find error file '%s'" % f)
logger.warn("Could not find error file '%s'" % f)
return True
start, stop, step = self.array
......@@ -325,11 +324,11 @@ class JobManager:
self.state_file = statefile
self.state_db = anydbm.open(self.state_file, 'c')
self.job = {}
logging.debug("Loading previous state...")
logger.debug("Loading previous state...")
for k in self.state_db.keys():
ki = loads(k)
self.job[ki] = loads(self.state_db[k])
logging.debug("Job %d loaded" % ki)
logger.debug("Job %d loaded" % ki)
self.context = environ(context)
def __del__(self):
......@@ -338,7 +337,7 @@ class JobManager:
db = anydbm.open(self.state_file, 'n') # erase previously recorded jobs
for k in sorted(self.job.keys()): db[dumps(k)] = dumps(self.job[k])
if not self.job:
logging.debug("Removing file %s because there are no more jobs to store" \
logger.debug("Removing file %s because there are no more jobs to store" \
% self.state_file)
os.unlink(self.state_file)
......@@ -438,10 +437,10 @@ class JobManager:
if status:
success.append(self.job[k])
del self.job[k]
logging.debug("Job %d completed successfuly" % k)
logger.debug("Job %d completed successfuly" % k)
else:
error.append(self.job[k])
del self.job[k]
logging.debug("Job %d probably did not complete successfuly" % k)
logger.debug("Job %d probably did not complete successfuly" % k)
return success, error
File mode changed from 100644 to 100755
......@@ -21,7 +21,7 @@ from cPickle import dumps
import argparse
from ..manager import JobManager
from ..tools import make_shell, random_logdir
from ..tools import make_shell, random_logdir, logger
def setup(args):
"""Returns the JobManager and sets up the basic infrastructure"""
......@@ -33,7 +33,8 @@ def setup(args):
# set-up logging
if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
return jm
......@@ -347,7 +348,7 @@ def main():
dest='array', help='Creates a parametric (array) job. You must specify the starting range "n" (>=1), the stopping (inclusive) range "m" and the step "s". Read the qsub command man page for details')
subparser.add_argument('-p', '--py', '--python', dest='python', default=False,
action='store_true', help='Wrap execution of your command using the current python interpreter')
subparser.add_argument('-z', '--dry-run', dest='dry_run', default=False,
subparser.add_argument('-z', '--dry-run',
action='store_true', help='Do not really submit anything, just print out what would submit in this case')
subparser.add_argument('job', metavar='command', nargs=argparse.REMAINDER)
subparser.set_defaults(func=submit)
......
......@@ -10,7 +10,7 @@ import os
import sys
import signal
import subprocess
import logging
from .tools import logger
def environ(context):
"""Retrieves the environment for a particular SETSHELL context"""
......@@ -18,7 +18,7 @@ def environ(context):
# It seems that we are in a hostile environment
# try to source the Idiap-wide shell
idiap_source = "/idiap/resource/software/initfiles/shrc"
logging.debug("Sourcing: '%s'"%idiap_source)
logger.debug("Sourcing: '%s'"%idiap_source)
try:
command = ['bash', '-c', 'source %s && env' % idiap_source]
pi = subprocess.Popen(command, stdout = subprocess.PIPE)
......@@ -41,7 +41,7 @@ def environ(context):
# First things first, we get the path to the temp file created by dosetshell
try:
logging.debug("Executing: '%s'", ' '.join(command))
logger.debug("Executing: '%s'", ' '.join(command))
p = subprocess.Popen(command, stdout = subprocess.PIPE)
except OSError as e:
# occurs when the file is not executable or not found
......@@ -59,7 +59,7 @@ def environ(context):
command2 = ['bash', '-c', 'source %s && env' % source]
try:
logging.debug("Executing: '%s'", ' '.join(command2))
logger.debug("Executing: '%s'", ' '.join(command2))
p2 = subprocess.Popen(command2, stdout = subprocess.PIPE)
except OSError as e:
# occurs when the file is not executable or not found
......@@ -79,10 +79,9 @@ def environ(context):
if os.path.exists(source): os.unlink(source)
logging.debug("Discovered environment for context '%s':", context)
if logging.getLogger().isEnabledFor(logging.DEBUG):
for k in sorted(new_environ.keys()):
logging.debug(" %s = %s", k, new_environ[k])
logger.debug("Discovered environment for context '%s':", context)
for k in sorted(new_environ.keys()):
logger.debug(" %s = %s", k, new_environ[k])
return new_environ
......@@ -93,7 +92,7 @@ def sexec(context, command, error_on_nonzero=True):
else: E = context
try:
logging.debug("Executing: '%s'", ' '.join(command))
logger.debug("Executing: '%s'", ' '.join(command))
p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=E)
(stdout, stderr) = p.communicate() #note: stderr will be 'None'
......@@ -103,7 +102,7 @@ def sexec(context, command, error_on_nonzero=True):
"Execution of '%s' exited with status != 0 (%d): %s" % \
(' '.join(command), p.returncode, stdout)
else:
logging.debug("Execution of '%s' exited with status != 0 (%d): %s" % \
logger.debug("Execution of '%s' exited with status != 0 (%d): %s" % \
(' '.join(command), p.returncode, stdout))
return stdout.strip()
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 09:26:46 CEST
# Wed 24 Aug 2011 09:26:46 CEST
"""Functions that replace the shell based utilities for the grid submission and
probing.
......@@ -9,10 +9,20 @@ probing.
import os
import re
import logging
import hashlib
import random
# initialize the logging system
import logging
class NullHandler (logging.Handler):
def __init__(self,*args,**kwargs): logging.Handler.__init__(self,**kwargs)
def emit(self,*args,**kwargs): pass
def handle(self,*args,**kwargs): pass
def createLock(self,*args,**kwargs): pass
logger = logging.getLogger("gridtk")
# .. register null handler (has to be done in python 2.6)
logger.addHandler(NullHandler())
# Constant regular expressions
QSTAT_FIELD_SEPARATOR = re.compile(':\s+')
......@@ -34,10 +44,10 @@ def makedirs_safe(fulldir):
else: raise
def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
stderr='', env=[], array=None, context='grid', hostname=None,
stderr='', env=[], array=None, context='grid', hostname=None,
mem=None, memfree=None, hvmem=None, pe_opt=None):
"""Submits a shell job to a given grid queue
Keyword parameters:
command
......@@ -70,9 +80,9 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
array
If set should be either:
1. a string in the form m[-n[:s]] which indicates the starting range 'm',
the closing range 'n' and the step 's'.
the closing range 'n' and the step 's'.
2. an integer value indicating the total number of jobs to be submitted.
This is equivalent ot set the parameter to a string "1-k:1" where "k" is
the passed integer value
......@@ -80,7 +90,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
end and step arguments ("m", "n", "s").
The minimum value for "m" is 1. Giving "0" is an error.
If submitted with this option, the job to be created will be an SGE
parametric job. In this mode SGE does not allow individual control of each
job. The environment variable SGE_TASK_ID will be set on the executing
......@@ -100,12 +110,12 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
(cf. qsub -l mem_free=<...> -l h_vmem=<...>)
memfree
If set, it asks the queue for a node with a minimum amount of memory
If set, it asks the queue for a node with a minimum amount of memory
Used only if mem is not set
(cf. qsub -l mem_free=<...>)
hvmem
If set, it asks the queue for a node with a minimum amount of memory
If set, it asks the queue for a node with a minimum amount of memory
Used only if mem is not set
(cf. qsub -l h_vmem=<...>)
......@@ -125,7 +135,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
if isinstance(queue, str) and queue not in ('all.q', 'default'):
scmd += ['-l', queue]
if mem:
if mem:
scmd += ['-l', 'mem_free=%s' % mem]
scmd += ['-l', 'h_vmem=%s' % mem]
else:
......@@ -143,12 +153,12 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
if deps: scmd += ['-hold_jid', ','.join(['%d' % k for k in deps])]
if stdout:
if not cwd:
# pivot, temporarily, to home directory
curdir = os.path.realpath(os.curdir)
os.chdir(os.environ['HOME'])
if not os.path.exists(stdout): makedirs_safe(stdout)
if not cwd:
......@@ -191,15 +201,15 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
if not isinstance(command, (list, tuple)): command = [command]
scmd += command
logging.debug("Qsub command '%s'", ' '.join(scmd))
logger.debug("Qsub command '%s'", ' '.join(scmd))
from .setshell import sexec
jobid = sexec(context, scmd)
return int(jobid.split('.',1)[0])
def make_shell(shell, command):
"""Returns a single command given a shell and a command to be qsub'ed
Keyword parameters:
shell
......@@ -215,12 +225,12 @@ def make_shell(shell, command):
def qstat(jobid, context='grid'):
"""Queries status of a given job.
Keyword parameters:
jobid
The job identifier as returned by qsub()
context
The setshell context in which we should try a 'qsub'. Normally you don't
need to change the default. This variable can also be set to a context
......@@ -232,7 +242,7 @@ def qstat(jobid, context='grid'):
scmd = ['qstat', '-j', '%d' % jobid, '-f']
logging.debug("Qstat command '%s'", ' '.join(scmd))
logger.debug("Qstat command '%s'", ' '.join(scmd))
from .setshell import sexec
data = sexec(context, scmd, error_on_nonzero=False)
......@@ -250,12 +260,12 @@ def qstat(jobid, context='grid'):
def qdel(jobid, context='grid'):
"""Halts a given job.
Keyword parameters:
jobid
The job identifier as returned by qsub()
context
The setshell context in which we should try a 'qsub'. Normally you don't
need to change the default. This variable can also be set to a context
......@@ -265,7 +275,7 @@ def qdel(jobid, context='grid'):
scmd = ['qdel', '%d' % jobid]
logging.debug("Qdel command '%s'", ' '.join(scmd))
logger.debug("Qdel command '%s'", ' '.join(scmd))
from .setshell import sexec
sexec(context, scmd, error_on_nonzero=False)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment