Commit c748a4f8 authored by André Anjos's avatar André Anjos 💬

Directory hashing implemented; Recursive remotion of log directory structure

parent edf44a1a
......@@ -27,18 +27,27 @@ def try_get_contents(filename):
return ''
def try_remove_files(filename, verbose):
def try_remove_files(filename, recurse, verbose):
"""Safely removes files from the filesystem"""
if isinstance(filename, (tuple, list)):
for k in filename:
if os.path.exists(k):
os.unlink(k)
if verbose: print verbose + ("removed `%f'" % k)
if verbose: print verbose + ("removed `%s'" % k)
d = os.path.dirname(k)
if recurse and not os.listdir(d):
os.removedirs(d)
if verbose: print verbose + ("recursively removed `%s'" % d)
else:
if os.path.exists(filename):
os.unlink(filename)
if verbose: print verbose + ("removed `%f'" % filename)
if verbose: print verbose + ("removed `%s'" % filename)
d = os.path.dirname(filename)
if recurse and not os.listdir(d):
os.removedirs(d)
if verbose: print verbose + ("recursively removed `%s'" % d)
class Job:
"""The job class describes a job"""
......@@ -169,9 +178,9 @@ class Job:
else:
return try_get_contents(self.stdout_filename(instance))
def rm_stdout(self, instance=None, verbose=False):
def rm_stdout(self, instance=None, recurse=True, verbose=False):
try_remove_files(self.stdout_filename(instance), verbose)
try_remove_files(self.stdout_filename(instance), recurse, verbose)
def stderr_filename(self, instance=None):
"""Returns the stderr filename for this job, with the full path"""
......@@ -186,9 +195,9 @@ class Job:
else:
return try_get_contents(self.stderr_filename(instance))
def rm_stderr(self, instance=None, verbose=False):
def rm_stderr(self, instance=None, recurse=True, verbose=False):
try_remove_files(self.stderr_filename(instance), verbose)
try_remove_files(self.stderr_filename(instance), recurse, verbose)
def check(self):
"""Checks if the job is in error state. If this job is a parametric job, it
......
......@@ -21,7 +21,7 @@ from cPickle import dumps
import argparse
from ..manager import JobManager
from ..tools import make_python_wrapper, make_torch_wrapper
from ..tools import make_python_wrapper, make_torch_wrapper, random_logdir
def setup(args):
"""Returns the JobManager and sets up the basic infrastructure"""
......@@ -84,7 +84,6 @@ def delete(args):
for k in jobs:
if jm.has_key(k):
J = jm[k]
del jm[k]
if args.also_logs:
if args.verbose:
J.rm_stdout(verbose=' ')
......@@ -92,6 +91,7 @@ def delete(args):
else:
J.rm_stdout()
J.rm_stderr()
del jm[k]
if args.verbose: print "Deleted job %s" % J
else: print "Deleted job", J.name()
......@@ -99,6 +99,10 @@ def delete(args):
print "Ignored job %d (not found on manager)" % k
def submit(args):
"""Normal submission"""
if args.stdout is None: args.stdout = os.path.join('logs', random_logdir())
if args.stderr is None: args.stderr = args.stdout
jm = setup(args)
kwargs = {
......@@ -116,42 +120,17 @@ def submit(args):
else: print 'Job', job.name(), 'submitted'
def wsubmit(args):
"""Wrapped submission"""
jm = setup(args)
kwargs = {
'queue': args.qname,
'cwd': True,
'name': args.name,
'deps': args.deps,
'stdout': args.stdout,
'stderr': args.stderr,
'env': args.env,
'array': args.array,
}
command = make_python_wrapper(args.wrapper, args.job)
job = jm.submit(command, **kwargs)
job = jm.submit(args.wrapper, args.job, **kwargs)
if args.verbose: print 'Submitted (wrapped)', job
else: print 'Job', job.name(), 'submitted'
args.job = make_python_wrapper(args.wrapper, args.job)
submit(args)
def tsubmit(args):
"""Torch5spro-based submission"""
jm = setup(args)
kwargs = {
'queue': args.qname,
'cwd': True,
'name': args.name,
'deps': args.deps,
'stdout': args.stdout,
'stderr': args.stderr,
'env': args.env,
'array': args.array,
}
command, kwargs = make_torch_wrapper(args.torch, args.torch_debug,
args.job, kwargs)
job = jm.submit(command, **kwargs)
if args.verbose: print 'Submitted (wrapped)', job
else: print 'Job', job.name(), 'submitted'
args.job, env = make_torch_wrapper(args.torch, args.torch_debug, args.job)
args.env.insert(0, env)
submit(args)
def explain(args):
"""Explain action"""
......@@ -219,8 +198,8 @@ def add_submission_options(parser):
parser.add_argument('-n', '--name', dest='name', help='Sets the jobname')
parser.add_argument('-x', '--dependencies', '--deps', dest='deps', type=int,
default=[], metavar='ID', nargs='*', help='set job dependencies by giving this option an a list of job identifiers separated by spaces')
parser.add_argument('-o', '--stdout', '--out', metavar='DIR', dest='stdout', default='logs', help='Set the standard output of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to "%(default)s")')
parser.add_argument('-e', '--stderr', '--err', metavar='DIR', dest='stderr', default='logs', help='Set the standard error of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to "%(default)s")')
parser.add_argument('-o', '--stdout', '--out', metavar='DIR', dest='stdout', help='Set the standard output of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to a randomly generated hashed directory structure)')
parser.add_argument('-e', '--stderr', '--err', metavar='DIR', dest='stderr', help='Set the standard error of the job to be placed in the given directory - relative paths are interpreted according to the currently working directory (defaults to what stdout will be set to)')
parser.add_argument('-s', '--environment', '--env', metavar='KEY=VALUE',
dest='env', nargs='*', default=[],
help='Passes specific environment variables to the job')
......@@ -344,6 +323,7 @@ def main():
resubparser.set_defaults(func=resubmit)
args = parser.parse_args()
args.func(args)
sys.exit(0)
......@@ -10,10 +10,18 @@ probing.
import os
import re
import logging
import hashlib
import random
# Constant regular expressions
QSTAT_FIELD_SEPARATOR = re.compile(':\s+')
def random_logdir():
"""Generates a random log directory for placing the command output"""
x = hashlib.md5(str(random.randint(100000,999999))).hexdigest()
return os.path.join(x[:2], x[2:4], x[4:6])
def makedirs_safe(fulldir):
"""Creates a directory if it does not exists. Takes into consideration
concurrent access support. Works like the shell's 'mkdir -p'.
......@@ -111,7 +119,7 @@ def qsub(command, queue='all.q', cwd=True, name=None, deps=[], stdout='',
scmd += ['-o', stdout]
if stderr:
if not os.path.exists(stdout): makedirs_safe(stdout)
if not os.path.exists(stderr): makedirs_safe(stderr)
scmd += ['-e', stderr]
elif stdout: #just re-use the stdout settings
scmd += ['-e', stdout]
......@@ -179,7 +187,7 @@ def make_python_wrapper(wrapper, command):
if not isinstance(command, (list, tuple)): command = [command]
return make_shell('/usr/bin/python', wrapper + ['--'] + command)
def make_torch_wrapper(torch, debug, command, kwargs):
def make_torch_wrapper(torch, debug, command):
"""Submits a command using the Torch python wrapper so the **command**
executes in a valid Torch context.
......@@ -198,11 +206,9 @@ def make_torch_wrapper(torch, debug, command, kwargs):
command
The script path to be submitted
kwargs
The set of parameters to be sent to qsub(), as a python dictionary
Returns the command and kwargs parameters to be supplied to qsub()
Returns the command and environment parameters to be supplied to qsub()
"""
binroot = os.path.join(torch, 'bin')
shell = os.path.join(binroot, 'shell.py')
if not os.path.exists(shell):
......@@ -212,11 +218,9 @@ def make_torch_wrapper(torch, debug, command, kwargs):
if debug: wrapper += ['--debug']
# adds OVERWRITE_TORCH5SPRO_ROOT to the execution environment
if not kwargs.has_key('env'): kwargs['env'] = {}
kwargs['env'].append('OVERWRITE_TORCH5SPRO_BINROOT=%s' % binroot)
env = 'OVERWRITE_TORCH5SPRO_BINROOT=%s' % binroot
return make_python_wrapper(wrapper, command), kwargs
return make_python_wrapper(wrapper, command), env
def qstat(jobid, context='grid'):
"""Queries status of a given job.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment