Commit ff2d46ea authored by Manuel Günther's avatar Manuel Günther
Browse files

First working example that relies on bob.db.utils.

parent 33013f70
......@@ -18,7 +18,11 @@ The script accepts buildout command-line options, so you can
use the -c option to specify an alternate configuration file.
"""
import os, shutil, sys, tempfile
import os
import shutil
import sys
import tempfile
from optparse import OptionParser
tmpeggs = tempfile.mkdtemp()
......@@ -31,8 +35,8 @@ Bootstraps a buildout-based project.
Simply run this script in a directory containing a buildout.cfg, using the
Python that you want bin/buildout to use.
Note that by using --setup-source and --download-base to point to
local resources, you can keep this script from going over the network.
Note that by using --find-links to point to local resources, you can keep
this script from going over the network.
'''
parser = OptionParser(usage=usage)
......@@ -48,23 +52,21 @@ parser.add_option("-t", "--accept-buildout-test-releases",
"bootstrap and buildout will get the newest releases "
"even if they are alphas or betas."))
parser.add_option("-c", "--config-file",
help=("Specify the path to the buildout configuration "
"file to be used."))
help=("Specify the path to the buildout configuration "
"file to be used."))
parser.add_option("-f", "--find-links",
help=("Specify a URL to search for buildout releases"))
help=("Specify a URL to search for buildout releases"))
options, args = parser.parse_args()
######################################################################
# load/install distribute
# load/install setuptools
to_reload = False
try:
import pkg_resources, setuptools
if not hasattr(pkg_resources, '_distribute'):
to_reload = True
raise ImportError
import pkg_resources
import setuptools
except ImportError:
ez = {}
......@@ -73,8 +75,10 @@ except ImportError:
except ImportError:
from urllib2 import urlopen
exec(urlopen('http://python-distribute.org/distribute_setup.py').read(), ez)
setup_args = dict(to_dir=tmpeggs, download_delay=0, no_fake=True)
# XXX use a more permanent ez_setup.py URL when available.
exec(urlopen('https://bitbucket.org/pypa/setuptools/raw/0.7.2/ez_setup.py'
).read(), ez)
setup_args = dict(to_dir=tmpeggs, download_delay=0)
ez['use_setuptools'](**setup_args)
if to_reload:
......@@ -86,10 +90,23 @@ except ImportError:
if path not in pkg_resources.working_set.entries:
pkg_resources.working_set.add_entry(path)
######################################################################
# Try to best guess the version of buildout given setuptools
if options.version is None:
try:
from distutils.version import LooseVersion
package = pkg_resources.require('setuptools')[0]
v = LooseVersion(package.version)
if v < LooseVersion('0.7'):
options.version = '2.1.1'
except:
pass
######################################################################
# Install buildout
ws = pkg_resources.working_set
ws = pkg_resources.working_set
cmd = [sys.executable, '-c',
'from setuptools.command.easy_install import main; main()',
......@@ -104,8 +121,8 @@ find_links = os.environ.get(
if find_links:
cmd.extend(['-f', find_links])
distribute_path = ws.find(
pkg_resources.Requirement.parse('distribute')).location
setuptools_path = ws.find(
pkg_resources.Requirement.parse('setuptools')).location
requirement = 'zc.buildout'
version = options.version
......@@ -113,13 +130,14 @@ if version is None and not options.accept_buildout_test_releases:
# Figure out the most recent final version of zc.buildout.
import setuptools.package_index
_final_parts = '*final-', '*final'
def _final_version(parsed_version):
for part in parsed_version:
if (part[:1] == '*') and (part not in _final_parts):
return False
return True
index = setuptools.package_index.PackageIndex(
search_path=[distribute_path])
search_path=[setuptools_path])
if find_links:
index.add_find_links((find_links,))
req = pkg_resources.Requirement.parse(requirement)
......@@ -142,7 +160,7 @@ if version:
cmd.append(requirement)
import subprocess
if subprocess.call(cmd, env=dict(os.environ, PYTHONPATH=distribute_path)) != 0:
if subprocess.call(cmd, env=dict(os.environ, PYTHONPATH=setuptools_path)) != 0:
raise Exception(
"Failed to execute command:\n%s",
repr(cmd)[1:-1])
......@@ -163,3 +181,4 @@ if options.config_file is not None:
zc.buildout.buildout.main(args)
shutil.rmtree(tmpeggs)
......@@ -2,4 +2,5 @@ import setshell
import tools
import manager
import local
import sge
import easy
......@@ -15,165 +15,13 @@ from cPickle import dumps, loads
from tools import makedirs_safe, logger, try_get_contents, try_remove_files
class Job:
"""Stores all information about a job that is run locally."""
def __init__(self, id, command_line, name, dependencies = [], array = None, stdout=None, stderr=None):
"""Initializes the job with the given values."""
self._id = id
self._command_line = command_line
self._name = name
self.dependencies = copy.deepcopy(dependencies)
self._array = array
self.stdout_dir = stdout
self.stderr_dir = stderr
self.status = "waiting"
def id(self):
return self._id
def name(self, *args):
return self._name if self._name else "%d" % self._id
def command_line(self):
return " ".join(self._command_line)
def array(self):
"""Creates a set of array job indices for the given array tuple."""
if not self._array:
return None
else:
start, stop, step = self._array
return set(range(start, stop+1, step))
def __str__(self):
"""Returns information about this job as a string."""
return "%d" % self.id() +\
("\tName: " + self.name() if self._name else "") +\
("\tDependencies: " + str(self.dependencies) if self.dependencies else "") +\
("\tarray: " + str(self.array()) if self._array else "") +\
"\tStatus: " + self.status
def row(self, fmt, maxcmd=0):
"""Returns a string containing the job description suitable for a table."""
id = str(self.id())
if self._array:
id += ".%d-%d.%d"% self._array
cmd = self.command_line()
if maxcmd and len(cmd) > maxcmd:
cmd = cmd[:(maxcmd-3)] + '...'
return fmt % (str(self.id()), self.name(), self.status, cmd)
def execute(self, array_index = None):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ['JOB_ID'] = str(self._id)
if array_index:
environ['SGE_TASK_ID'] = str(array_index)
self.status = "executing"
# return the subprocess pipe to the process
try:
return subprocess.Popen(self._command_line, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
self.status = "finished"
raise
def filename(self, out_or_err, array_index = None):
"""Returns the file name of the output or error log file of this job."""
assert out_or_err in 'oe'
dir = {'o':self.stdout_dir, 'e':self.stderr_dir}[out_or_err]
if dir is None:
return None
else:
return os.path.join(dir, self._name + "." + out_or_err + str(self._id) + ("." + str(array_index) if array_index is not None else "") )
def stdout(self, array_index):
if self.stdout_dir is None: return ""
if self._array and not array_index:
return "------------\n".join([f for f in [try_get_contents(self.filename('o', i)) for i in self.array()] if f])
return try_get_contents(self.filename('o', array_index))
def stderr(self, array_index):
if self.stderr_dir is None: return ""
if self._array and not array_index:
return "------------\n".join([f for f in [try_get_contents(self.filename('e', i)) for i in self.array()] if f])
return try_get_contents(self.filename('e', array_index))
def finalize(self, process, array_index = None):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
ofn = self.filename('o', array_index)
if ofn:
makedirs_safe(self.stdout_dir)
with open(ofn, 'w') as f: f.write(process.stdout.read())
else:
sys.stdout.write(process.stdout.read())
efn = self.filename('e', array_index)
if efn:
makedirs_safe(self.stderr_dir)
with open(efn, 'w') as f: f.write(process.stderr.read())
else:
sys.stderr.write(process.stderr.read())
if not array_index:
self.status = "finished"
def check(self, ignore_warnings=False):
"""Checks if the job is in error state. If this job is a parametric job, it
will return an error state if **any** of the parametrized jobs are in error
state."""
def check_file(name):
try:
if os.stat(name).st_size != 0:
logger.debug("Job %s has a stderr file with size != 0" % self._name)
if not ignore_warnings:
return False
# read the contents of the log file to ignore the annoying warning messages
is_error = False
f = open(name,'r')
for line in f:
is_error = is_error or (line and 'WARNING' not in line and 'INFO' not in line)
return not is_error
except OSError, e:
logger.warn("Could not find error file '%s'" % name)
return True
if not self.stderr_dir:
return True
if self._array:
error_files = [self.filename('e',array_index) for array_index in self.array()]
return False not in [check_file(array_file) for array_file in error_files]
else:
return check_file(self.filename('e'))
def rm_stdout(self, instance=None, recurse=True, verbose=False):
"""Removes the log files for the stdout, if available."""
if self._array:
files = [self.filename('o', array_index) for array_index in self.array()]
else:
files = [self.filename('o')]
try_remove_files(files, recurse, verbose)
def rm_stderr(self, instance=None, recurse=True, verbose=False):
if self._array:
files = [self.filename('e', array_index) for array_index in self.array()]
else:
files = [self.filename('e')]
try_remove_files(files, recurse, verbose)
from .manager import JobManager
from .models import add_job, Job
class JobManager:
class JobManagerLocal(JobManager):
"""Manages jobs run in parallel on the local machine."""
def __init__(self, statefile='submitted.db'):
def __init__(self, database='submitted.sql3', sleep_time = 0.1):
"""Initializes this object with a state file and a method for qsub'bing.
Keyword parameters:
......@@ -183,206 +31,166 @@ class JobManager:
does not exist it is initialized. If it exists, it is loaded.
"""
self._state_file = statefile
self._jobs = {}
import random
self._job_id = random.randint(0, 65000)
if os.path.exists(self._state_file):
try:
db = gdbm.open(self._state_file, 'r')
except:
db = anydbm.open(self._state_file, 'r')
logger.debug("Loading previous state...")
for ks in db.keys():
ki = loads(ks)
self._jobs[ki] = loads(db[ks])
logger.debug("Job %d loaded" % ki)
db.close()
def save(self):
"""Saves the current status of the Job Manager into the database file."""
try:
db = gdbm.open(self._state_file, 'c')
except:
db = anydbm.open(self._state_file, 'c')
# synchronize jobs
for ks in sorted(db.keys()):
ki = loads(ks)
if ki not in self._jobs:
del db[ks]
logger.debug("Job %d deleted from database" % ki)
for ki in sorted(self._jobs.keys()):
ks = dumps(ki)
db[ks] = dumps(self._jobs[ki])
logger.debug("Job %d added or updated in database" % ki)
db.close()
def __del__(self):
"""Safely terminates the JobManager by updating writing the state file"""
self.save()
if not self._jobs:
logger.debug("Removing file %s because there are no more jobs to store" % self._state_file)
os.unlink(self._state_file)
JobManager.__init__(self, database)
self._sleep_time = sleep_time
def submit(self, command_line, name, array = None, deps = [], stdout=None, stderr=None, *args, **kwars):
"""Submits a job that will be executed on the local machine during a call to "run"."""
self._job_id += 1
job = Job(self._job_id, command_line[1:] if command_line[0] == '-S' else command_line, name, deps, array, stdout, stderr)
self._jobs[self._job_id] = job
return self._jobs[self._job_id]
def submit(self, command_line, name = None, array = None, dependencies = [], log_dir = None, **kwargs):
"""Submits a job that will be executed on the local machine during a call to "run".
All kwargs will simply be ignored."""
# add job to database
self.lock()
job = add_job(self.session, command_line=command_line, name=name, dependencies=dependencies, array=array, log_dir=log_dir)
# return the new job id
job_id = job.id
self.unlock()
return job_id
def keys(self):
"""Returns the list of keys stored in this Job Manager."""
return self._jobs.keys()
def has_key(self, key):
"""Checks id the given key is registered in this Job Manager."""
return self._jobs.has_key(key)
def __getitem__(self, key):
"""Returns the Job for the given key."""
return self._jobs[key]
def __delitem__(self, key):
"""Removes the given job from the list."""
if not self._jobs.has_key(key): raise KeyError, key
del self._jobs[key]
def _run_parallel_job(self, job_id, array_id = None):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ['JOB_ID'] = str(job_id)
if array_id:
environ['SGE_TASK_ID'] = str(array_id)
else:
environ['SGE_TASK_ID'] = 'undefined'
def __str__(self):
"""Returns the status of each job still being tracked"""
return self.table(43)
# get the name of the file that was called originally
jman = os.path.realpath(sys.argv[0])
# generate call to the wrapper script
command = [jman, '-l', 'run-job', self.database]
# return the subprocess pipe to the process
try:
return subprocess.Popen(command, env=environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
logger.error("Could not execute job '%s' locally, reason:\n\t%s" % ("(%d:%d)"%(job_id, array_id) if array_id else str(job_id)), e)
return None
def table(self, maxcmdline=0):
"""Returns the status of each job still being tracked"""
# configuration
fields = ("job-id", "job-name", "status", "arguments")
lengths = (20, 20, 15, 43)
marker = '='
def _report(self, process, job_id, array_id = None):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
def write(file, process, std):
f = std if file is None else open(str(file), 'w')
f.write(process.read())
self.lock()
# get the files to write to
job, array_job = self._job_and_array(job_id, array_id)
if array_job:
out, err = array_job.std_out_file(), array_job.std_err_file()
else:
out, err = job.std_out_file(), job.std_err_file()
# work
fmt = "%%%ds %%%ds %%%ds %%-%ds" % lengths
delimiter = fmt % tuple([k*marker for k in lengths])
header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
header = ' '.join(header)
log_dir = job.log_dir
self.unlock()
return '\n'.join([header] + [delimiter] + \
[job.row(fmt, maxcmdline) for job in [self._jobs[k] for k in sorted(self._jobs.keys())]])
if log_dir: makedirs_safe(log_dir)
# write stdout
write(out, process.stdout, sys.stdout)
# write stderr
write(err, process.stderr, sys.stderr)
def clear(self):
"""Clear the whole job queue"""
for k in self.keys(): del self[k]
def run(self, parallel_jobs = 1, job_ids = None):
"""Runs the jobs stored in this job manager on the local machine."""
self.lock()
query = self.session.query(Job).filter(Job.status != 'finished')
if job_ids:
query.filter(Job.id.in_(job_ids))
def stdout(self, key, array_index=None):
"""Gets the output of a certain job"""
return self[key].filename('o', array_index)
jobs = list(query)
# collect the jobs to execute
unfinished_jobs = [job.id for job in jobs]
def stderr(self, key, array_index=None):
"""Gets the error output of a certain job"""
return self[key].stderr('e', array_index)
# collect the array jobs
unfinished_array_jobs = {}
for job in jobs:
if job.array:
unfinished_array_jobs[job.id] = [array.id for array in job.array if array.status != 'finished']
# collect the dependencies for the jobs
dependencies = {}
for job in jobs:
dependencies[job.id] = [dependent.id for dependent in job.dependent_jobs]
def refresh(self, ignore_warnings=False):
"""Conducts a qstat over all jobs in the cache. If the job is not present
anymore check the logs directory for output and error files. If the size of
the error file is different than zero, warn the user.
self.unlock()
Returns two lists: jobs that work and jobs that require attention
(error file does not have size 0).
"""
success = []
error = []
for k in self._jobs.keys():
if self._jobs[k].status == "finished": #job has finished
status = self._jobs[k].check(ignore_warnings)
if status:
success.append(self._jobs[k])
del self._jobs[k]
logger.debug("Job %d completed successfully" % k)
else:
error.append(self._jobs[k])
del self._jobs[k]
logger.debug("Job %d probably did not complete successfully" % k)
return success, error
def run(self, parallel_jobs = 1, external_dependencies = []):
"""Runs the jobs stored in this job manager on the local machine."""
unfinished_jobs = [j for j in self._jobs.itervalues()]
finished_job_ids = []
# start the jobs
finished_array_jobs = {}
running_jobs = []
running_array_jobs = {}
while len(unfinished_jobs) > 0 or len(running_jobs) > 0:
# check if some of the jobs finished
# FIRST: check if some of the jobs finished
for task in running_jobs:
# check if the job is still running
process = task[0]
if process.poll() is not None:
# process ended
job = task[1]
if job.array():
job_id = task[1]
if len(task) > 2:
# we have an array job
array_id = task[2]
if job.id() in finished_array_jobs:
finished_array_jobs[job.id()].add(array_id)
else:
finished_array_jobs[job.id()] = set([array_id])
running_array_jobs[job.id()].remove(array_id)
job.finalize(process, array_id)
if finished_array_jobs[job.id()] == job.array():
finished_job_ids.append(job.id())
unfinished_jobs.remove(job)
job.status = "finished"
else: # not array
finished_job_ids.append(job.id())
job.finalize(process)
unfinished_jobs.remove(job)
# report the result
self._report(process, job_id, array_id)
# remove from running and unfinished jobs
running_array_jobs[job_id].remove(array_id)
unfinished_array_jobs[job_id].remove(array_id)
if len(unfinished_array_jobs[job_id]) == 0:
del unfinished_array_jobs[job_id]
unfinished_jobs.remove(job_id)
else:
# non-array job
self._report(process, job_id)
unfinished_jobs.remove(job_id)
# in any case, remove the job from the list
running_jobs.remove(task)
self.save()
# run as many parallel jobs as desired
# SECOND: run as many parallel jobs as desired
if len(running_jobs) < parallel_jobs:
# start new jobs
for job in unfinished_jobs:
for job_id in unfinished_jobs:
# check if there are unsatisfied dependencies for this job
unsatisfied_dependencies = False
if job.dependencies:
for dep in job.dependencies:
if dep not in finished_job_ids:
unsatisfied_dependencies = True
break
# all dependencies are met
if not unsatisfied_dependencies:
if job.array():
unsatisfied_dependencies = [dep for dep in dependencies[job_id] if dep in unfinished_jobs]
if len(unsatisfied_dependencies) == 0:
# all dependencies are met
if job_id in unfinished_array_jobs:
# execute one of the array jobs
for array_id in job.array():
if job.id() not in finished_array_jobs or array_id not in finished_array_jobs[job.id()]:
if job.id() not in running_array_jobs or array_id not in running_array_jobs[job.id()]:
running_jobs.append((job.execute(array_id), job, array_id))
if job.id() in running_array_jobs:
running_array_jobs[job.id()].add(array_id)
for array_id in unfinished_array_jobs[job_id]:
# check if the current array_id still need to run
if job_id not in running_array_jobs or array_id not in running_array_jobs[job_id]:
# execute parallel job
process = self._run_parallel_job(job_id, array_id)
if process is not None:
# remember that we currently run this job
running_jobs.append((process, job_id, array_id))
if job_id in running_array_jobs:
running_array_jobs[job_id].add(array_id)
else:
running_array_jobs[job.id()] = set([array_id])
running_array_jobs[job_id] = set([array_id])
else:
# remove the job from the list since it could not run
unfinished_array_jobs[job_id].remove(array_id)
# check if more jobs can be executed
if len(running_jobs) == parallel_jobs:
break