Commit ca3f45c5 authored by Manuel Günther's avatar Manuel Günther
Browse files

Another improvement of python3 compatibility.

parent 6cc08646
......@@ -40,7 +40,7 @@ Make sure to have your shell environment setup to reach it w/o requiring to type
The first task you may need to pursue is to submit jobs.
Here is how::
$ jman submit myscript.py --help
$ jman -vv submit myscript.py --help
... Added job '<Job: 1> : submitted -- /usr/bin/python myscript.py --help' to the database
... Submitted job '<Job: 6151645> : queued -- /usr/bin/python myscript.py --help' to the SGE grid.
......
......@@ -17,7 +17,7 @@ if sys.version_info[0] >= 3:
else:
from cPickle import dumps, loads
from .tools import makedirs_safe, logger
from .tools import makedirs_safe, logger, str_
from .manager import JobManager
......@@ -143,7 +143,7 @@ class JobManagerLocal(JobManager):
"""Finalizes the execution of the job by writing the stdout and stderr results into the according log files."""
def write(file, std, process):
f = std if file is None else open(str(file), 'w')
f.write(process.read().decode('utf-8'))
f.write(str_(process.read()))
self.lock()
# get the files to write to
......
import sqlalchemy
from sqlalchemy import Table, Column, Integer, String, Boolean, ForeignKey, Enum
from sqlalchemy.orm import backref, relationship
from sqlalchemy import Table, Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import backref
from sqlalchemy.ext.declarative import declarative_base
from .tools import Enum, relationship
import os
import sys
......
......@@ -8,10 +8,9 @@
import os
import sys
import six
import signal
import subprocess
from .tools import logger
from .tools import logger, str_
def environ(context):
"""Retrieves the environment for a particular SETSHELL context"""
......@@ -26,8 +25,7 @@ def environ(context):
pi = subprocess.Popen(command, stdout = subprocess.PIPE)
# overwrite the default environment
for line in pi.stdout:
if isinstance(line, bytes) and not isinstance(line, str):
line = line.decode('utf8')
line = str_(line)
(key, _, value) = line.partition("=")
os.environ[key.strip()] = value.strip()
except OSError as e:
......@@ -54,8 +52,7 @@ def environ(context):
raise OSError("Error executing '%s': %s (%d)" % (' '.join(command), e.strerror, e.errno))
try:
source = p.communicate()[0]
source = source.strip()
source = str_(p.communicate()[0]).strip()
except KeyboardInterrupt: # the user CTRL-C'ed
os.kill(p.pid, signal.SIGTERM)
sys.exit(signal.SIGTERM)
......@@ -72,6 +69,7 @@ def environ(context):
new_environ = dict(os.environ)
for line in p2.stdout:
line = str_(line)
(key, _, value) = line.partition("=")
new_environ[key.strip()] = value.strip()
......@@ -92,6 +90,7 @@ def environ(context):
def sexec(context, command, error_on_nonzero=True):
"""Executes a command within a particular Idiap SETSHELL context"""
import six
if isinstance(context, six.string_types): E = environ(context)
else: E = context
......@@ -102,10 +101,10 @@ def sexec(context, command, error_on_nonzero=True):
(stdout, stderr) = p.communicate() #note: stderr will be 'None'
if p.returncode != 0:
if error_on_nonzero:
raise RuntimeError("Execution of '%s' exited with status != 0 (%d): %s" % (' '.join(command), p.returncode, stdout))
raise RuntimeError("Execution of '%s' exited with status != 0 (%d): %s" % (' '.join(command), p.returncode, str_(stdout)))
else:
logger.debug("Execution of '%s' exited with status != 0 (%d): %s" % \
(' '.join(command), p.returncode, stdout))
(' '.join(command), p.returncode, str_(stdout)))
return stdout.strip()
......
......@@ -36,177 +36,183 @@ class GridTKTest(unittest.TestCase):
def test01_local(self):
# This test executes all commands of the local grid manager and asserts that everything is fine
# first, add some commands to the database
script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh')
script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh')
from gridtk.script import jman
# add a simple script that will write some information to the
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', script_2])
# check that the database was created successfully
assert os.path.exists(self.database)
print()
# test that the list command works (should also work with the "default" grid manager
jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1'])
jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies'])
# get insight into the database
job_manager = gridtk.local.JobManagerLocal(database=self.database)
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].id == 1
assert jobs[1].id == 2
assert len(jobs[1].array) == 4
assert jobs[0].status == 'submitted'
assert jobs[1].status == 'submitted'
# check that the job dependencies are correct
waiting = jobs[0].get_jobs_waiting_for_us()
assert len(waiting) == 1
assert waiting[0].id == 2
waited = jobs[1].get_jobs_we_wait_for()
assert len(waited) == 1
assert waited[0].id == 1
job_manager.unlock()
# now, start the local execution of the job in a parallel job
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
# sleep some time to assure that the scheduler was able to start the first job
time.sleep(4)
# ... and kill the scheduler
self.scheduler_job.kill()
self.scheduler_job = None
# now, the first job needs to have status failure, and the second needs to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[1].status == 'queued'
# the result files should not be there yet
assert not os.path.exists(jobs[0].std_out_file())
assert not os.path.exists(jobs[0].std_err_file())
job_manager.unlock()
# reset the job 1
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])
# now, start the local execution of the job in a parallel job
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
# sleep some time to assure that the scheduler was able to finish the first and start the second job
time.sleep(9)
# ... and kill the scheduler
self.scheduler_job.kill()
self.scheduler_job = None
# Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[1].status == 'executing'
assert jobs[1].array[0].status == 'failure'
assert jobs[1].array[0].result == 1
assert jobs[1].array[1].status == 'success'
assert jobs[1].array[1].result == 0
assert len([a for a in jobs[1].array if a.status == 'queued']) == 2
out_file = jobs[0].std_out_file()
err_file = jobs[0].std_err_file()
job_manager.unlock()
# the result files of the first job should now be there
assert os.path.isfile(out_file)
assert os.path.isfile(err_file)
assert open(out_file).read().rstrip() == 'This is a text message to std-out'
assert open(err_file).read().rstrip() == 'This is a text message to std-err'
# resubmit all jobs
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs'])
# check that the log files have been cleaned
assert not os.path.exists(out_file)
assert not os.path.exists(err_file)
# ... but the log dir still exists
assert os.path.exists(self.log_dir)
# now, let the scheduler run all jobs
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished'])
# and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards)
self.scheduler_job.wait()
self.scheduler_job = None
try:
# first, add some commands to the database
script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh')
script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh')
from gridtk.script import jman
# add a simple script that will write some information to the
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', script_2])
# check that the database was created successfully
assert os.path.exists(self.database)
print()
# test that the list command works (should also work with the "default" grid manager
jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1'])
jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies'])
# get insight into the database
job_manager = gridtk.local.JobManagerLocal(database=self.database)
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].id == 1
assert jobs[1].id == 2
assert len(jobs[1].array) == 4
assert jobs[0].status == 'submitted'
assert jobs[1].status == 'submitted'
# check that the job dependencies are correct
waiting = jobs[0].get_jobs_waiting_for_us()
assert len(waiting) == 1
assert waiting[0].id == 2
waited = jobs[1].get_jobs_we_wait_for()
assert len(waited) == 1
assert waited[0].id == 1
job_manager.unlock()
# now, start the local execution of the job in a parallel job
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
# sleep some time to assure that the scheduler was able to start the first job
time.sleep(4)
# ... and kill the scheduler
self.scheduler_job.kill()
self.scheduler_job = None
# check that all output files are generated again
assert os.path.isfile(out_file)
assert os.path.isfile(err_file)
assert open(out_file).read().rstrip() == 'This is a text message to std-out'
assert open(err_file).read().rstrip() == 'This is a text message to std-err'
# check that exactly four output and four error files have been created
files = os.listdir(self.log_dir)
assert len(files) == 10
for i in range(1,8,2):
assert 'test_2.o2.%d'%i in files
assert 'test_2.e2.%d'%i in files
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[1].status == 'failure'
assert jobs[1].array[0].status == 'failure'
assert jobs[1].array[0].result == 1
for i in range(1,4):
assert jobs[1].array[i].status == 'success'
assert jobs[1].array[i].result == 0
job_manager.unlock()
print()
# test that the list command still works
jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs'])
print()
# test that the report command works
jman.main(['./bin/jman', '--database', self.database, 'report'])
# clean-up
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
# check that the database and the log files are gone
assert len(os.listdir(self.temp_dir)) == 0
# add the scripts again, but this time with the --stop-on-failure option
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', script_2])
# and execute them, but without writing the log files
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files'])
# and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards)
self.scheduler_job.wait()
self.scheduler_job = None
# now, the first job needs to have status failure, and the second needs to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[1].status == 'queued'
# the result files should not be there yet
assert not os.path.exists(jobs[0].std_out_file())
assert not os.path.exists(jobs[0].std_err_file())
job_manager.unlock()
# assert that the log files are not there
assert not os.path.isfile(out_file)
assert not os.path.isfile(err_file)
# reset the job 1
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[0].result == 255
assert jobs[1].status == 'failure'
assert jobs[1].result is None
job_manager.unlock()
# now, start the local execution of the job in a parallel job
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
# and clean up again
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
# sleep some time to assure that the scheduler was able to finish the first and start the second job
time.sleep(9)
# ... and kill the scheduler
self.scheduler_job.kill()
self.scheduler_job = None
# Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[1].status == 'executing'
assert jobs[1].array[0].status == 'failure'
assert jobs[1].array[0].result == 1
assert jobs[1].array[1].status == 'success'
assert jobs[1].array[1].result == 0
assert len([a for a in jobs[1].array if a.status == 'queued']) == 2
out_file = jobs[0].std_out_file()
err_file = jobs[0].std_err_file()
job_manager.unlock()
# the result files of the first job should now be there
assert os.path.isfile(out_file)
assert os.path.isfile(err_file)
assert open(out_file).read().rstrip() == 'This is a text message to std-out'
assert open(err_file).read().rstrip() == 'This is a text message to std-err'
# resubmit all jobs
jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs'])
# check that the log files have been cleaned
assert not os.path.exists(out_file)
assert not os.path.exists(err_file)
# ... but the log dir still exists
assert os.path.exists(self.log_dir)
# now, let the scheduler run all jobs
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished'])
# and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards)
self.scheduler_job.wait()
self.scheduler_job = None
# check that all output files are generated again
assert os.path.isfile(out_file)
assert os.path.isfile(err_file)
assert open(out_file).read().rstrip() == 'This is a text message to std-out'
assert open(err_file).read().rstrip() == 'This is a text message to std-err'
# check that exactly four output and four error files have been created
files = os.listdir(self.log_dir)
assert len(files) == 10
for i in range(1,8,2):
assert 'test_2.o2.%d'%i in files
assert 'test_2.e2.%d'%i in files
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[1].status == 'failure'
assert jobs[1].array[0].status == 'failure'
assert jobs[1].array[0].result == 1
for i in range(1,4):
assert jobs[1].array[i].status == 'success'
assert jobs[1].array[i].result == 0
job_manager.unlock()
print()
# test that the list command still works
jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs'])
print()
# test that the report command works
jman.main(['./bin/jman', '--database', self.database, 'report'])
# clean-up
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
# check that the database and the log files are gone
assert len(os.listdir(self.temp_dir)) == 0
# add the scripts again, but this time with the --stop-on-failure option
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', script_1])
jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', script_2])
# and execute them, but without writing the log files
self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files'])
# and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards)
self.scheduler_job.wait()
self.scheduler_job = None
# assert that the log files are not there
assert not os.path.isfile(out_file)
assert not os.path.isfile(err_file)
# check that all array jobs are finished now
session = job_manager.lock()
jobs = list(session.query(Job))
assert len(jobs) == 2
assert jobs[0].status == 'failure'
assert jobs[0].result == 255
assert jobs[1].status == 'failure'
assert jobs[1].result is None
job_manager.unlock()
# and clean up again
jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
except KeyboardInterrupt:
# make sure that the keyboard interrupt is captured and the mess is cleaned up (i.e. by calling tearDown)
pass
def test02_grid(self):
......
......@@ -9,10 +9,49 @@ probing.
import os
import re
import six
import hashlib
import random
# sqlalchemy migration; copied from Bob
try:
from sqlalchemy import Enum
except ImportError:
from sqlalchemy import types
class Enum(types.TypeDecorator):
impl = types.Unicode
def __init__(self, *values):
"""Emulates an Enum type.
values:
A list of valid values for this column
"""
if values is None or len(values) is 0:
raise AssertionError('Enum requires a list of values')
self.values = values[:]
# The length of the string/unicode column should be the longest string
# in values
size = max([len(v) for v in values if v is not None])
super(Enum, self).__init__(size)
def process_bind_param(self, value, dialect):
if value not in self.values:
raise AssertionError('"%s" not in Enum.values' % value)
return value
def process_result_value(self, value, dialect):
return value
try:
from sqlalchemy.orm import relationship
except ImportError:
from sqlalchemy.orm import relation as relationship
# initialize the logging system
import logging
logger = logging.getLogger("gridtk")
......@@ -45,6 +84,16 @@ def makedirs_safe(fulldir):
else: raise
def str_(name):
"""Return the string representation of the given 'name'.
If it is a bytes object, it will be converted into str.
If it is a str object, it will simply be resurned."""
if isinstance(name, bytes) and not isinstance(name, str):
return name.decode('utf8')
else:
return name
def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
stderr='', env=[], array=None, context='grid', hostname=None,
mem=None, memfree=None, hvmem=None, pe_opt=None, io_big=False):
......@@ -138,6 +187,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
scmd = ['qsub']
import six
if isinstance(queue, six.string_types) and queue not in ('all.q', 'default'):
scmd += ['-l', queue]
......@@ -212,7 +262,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='',
logger.debug("Qsub command '%s'", ' '.join(scmd))
from .setshell import sexec
jobid = sexec(context, scmd)
jobid = str_(sexec(context, scmd))
return int(jobid.split('.',1)[0])
def make_shell(shell, command):
......@@ -253,7 +303,7 @@ def qstat(jobid, context='grid'):
logger.debug("Qstat command '%s'", ' '.join(scmd))
from .setshell import sexec
data = sexec(context, scmd, error_on_nonzero=False)
data = str_(sexec(context, scmd, error_on_nonzero=False))
# some parsing:
retval = {}
......
......@@ -9,14 +9,14 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)):
setup(
name='gridtk',
version='1.0.0.a0',
description='SGE Grid Submission and Monitoring Tools for Idiap',
version='1.0.0',
description='SGE Grid and Local Submission and Monitoring Tools for Idiap',
url='https://github.com/idiap/gridtk',
license='LICENSE.txt',
author='Andre Anjos',
author_email='andre.anjos@idiap.ch',
author='Manuel Guenther',
author_email='manuel.guenther@idiap.ch',
packages=find_packages(),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment