From ca3f45c5532e75137340a112ccdb22928ddfbab7 Mon Sep 17 00:00:00 2001 From: Manuel Guenther <manuel.guenther@idiap.ch> Date: Fri, 30 Aug 2013 11:59:57 +0200 Subject: [PATCH] Another improvement of python3 compatibility. --- README.rst | 2 +- gridtk/local.py | 4 +- gridtk/models.py | 5 +- gridtk/setshell.py | 15 +- gridtk/tests/__init__.py | 338 ++++++++++++++++++++------------------- gridtk/tools.py | 56 ++++++- setup.py | 8 +- 7 files changed, 242 insertions(+), 186 deletions(-) diff --git a/README.rst b/README.rst index 3bdc653..9fe539f 100644 --- a/README.rst +++ b/README.rst @@ -40,7 +40,7 @@ Make sure to have your shell environment setup to reach it w/o requiring to type The first task you may need to pursue is to submit jobs. Here is how:: - $ jman submit myscript.py --help + $ jman -vv submit myscript.py --help ... Added job '<Job: 1> : submitted -- /usr/bin/python myscript.py --help' to the database ... Submitted job '<Job: 6151645> : queued -- /usr/bin/python myscript.py --help' to the SGE grid. diff --git a/gridtk/local.py b/gridtk/local.py index e762941..e8eaa59 100644 --- a/gridtk/local.py +++ b/gridtk/local.py @@ -17,7 +17,7 @@ if sys.version_info[0] >= 3: else: from cPickle import dumps, loads -from .tools import makedirs_safe, logger +from .tools import makedirs_safe, logger, str_ from .manager import JobManager @@ -143,7 +143,7 @@ class JobManagerLocal(JobManager): """Finalizes the execution of the job by writing the stdout and stderr results into the according log files.""" def write(file, std, process): f = std if file is None else open(str(file), 'w') - f.write(process.read().decode('utf-8')) + f.write(str_(process.read())) self.lock() # get the files to write to diff --git a/gridtk/models.py b/gridtk/models.py index 91f76f5..5b9e864 100644 --- a/gridtk/models.py +++ b/gridtk/models.py @@ -1,7 +1,8 @@ import sqlalchemy -from sqlalchemy import Table, Column, Integer, String, Boolean, ForeignKey, Enum -from sqlalchemy.orm import backref, relationship +from sqlalchemy import Table, Column, Integer, String, Boolean, ForeignKey +from sqlalchemy.orm import backref from sqlalchemy.ext.declarative import declarative_base +from .tools import Enum, relationship import os import sys diff --git a/gridtk/setshell.py b/gridtk/setshell.py index 598c32d..6a84337 100644 --- a/gridtk/setshell.py +++ b/gridtk/setshell.py @@ -8,10 +8,9 @@ import os import sys -import six import signal import subprocess -from .tools import logger +from .tools import logger, str_ def environ(context): """Retrieves the environment for a particular SETSHELL context""" @@ -26,8 +25,7 @@ def environ(context): pi = subprocess.Popen(command, stdout = subprocess.PIPE) # overwrite the default environment for line in pi.stdout: - if isinstance(line, bytes) and not isinstance(line, str): - line = line.decode('utf8') + line = str_(line) (key, _, value) = line.partition("=") os.environ[key.strip()] = value.strip() except OSError as e: @@ -54,8 +52,7 @@ def environ(context): raise OSError("Error executing '%s': %s (%d)" % (' '.join(command), e.strerror, e.errno)) try: - source = p.communicate()[0] - source = source.strip() + source = str_(p.communicate()[0]).strip() except KeyboardInterrupt: # the user CTRL-C'ed os.kill(p.pid, signal.SIGTERM) sys.exit(signal.SIGTERM) @@ -72,6 +69,7 @@ def environ(context): new_environ = dict(os.environ) for line in p2.stdout: + line = str_(line) (key, _, value) = line.partition("=") new_environ[key.strip()] = value.strip() @@ -92,6 +90,7 @@ def environ(context): def sexec(context, command, error_on_nonzero=True): """Executes a command within a particular Idiap SETSHELL context""" + import six if isinstance(context, six.string_types): E = environ(context) else: E = context @@ -102,10 +101,10 @@ def sexec(context, command, error_on_nonzero=True): (stdout, stderr) = p.communicate() #note: stderr will be 'None' if p.returncode != 0: if error_on_nonzero: - raise RuntimeError("Execution of '%s' exited with status != 0 (%d): %s" % (' '.join(command), p.returncode, stdout)) + raise RuntimeError("Execution of '%s' exited with status != 0 (%d): %s" % (' '.join(command), p.returncode, str_(stdout))) else: logger.debug("Execution of '%s' exited with status != 0 (%d): %s" % \ - (' '.join(command), p.returncode, stdout)) + (' '.join(command), p.returncode, str_(stdout))) return stdout.strip() diff --git a/gridtk/tests/__init__.py b/gridtk/tests/__init__.py index bc66955..0ca8b7d 100644 --- a/gridtk/tests/__init__.py +++ b/gridtk/tests/__init__.py @@ -36,177 +36,183 @@ class GridTKTest(unittest.TestCase): def test01_local(self): # This test executes all commands of the local grid manager and asserts that everything is fine - # first, add some commands to the database - script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh') - script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh') - from gridtk.script import jman - # add a simple script that will write some information to the - jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1]) - jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', script_2]) - - # check that the database was created successfully - assert os.path.exists(self.database) - - print() - # test that the list command works (should also work with the "default" grid manager - jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1']) - jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies']) - - # get insight into the database - job_manager = gridtk.local.JobManagerLocal(database=self.database) - session = job_manager.lock() - jobs = list(session.query(Job)) - assert len(jobs) == 2 - assert jobs[0].id == 1 - assert jobs[1].id == 2 - assert len(jobs[1].array) == 4 - assert jobs[0].status == 'submitted' - assert jobs[1].status == 'submitted' - - # check that the job dependencies are correct - waiting = jobs[0].get_jobs_waiting_for_us() - assert len(waiting) == 1 - assert waiting[0].id == 2 - waited = jobs[1].get_jobs_we_wait_for() - assert len(waited) == 1 - assert waited[0].id == 1 - - job_manager.unlock() - - # now, start the local execution of the job in a parallel job - self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2']) - - # sleep some time to assure that the scheduler was able to start the first job - time.sleep(4) - # ... and kill the scheduler - self.scheduler_job.kill() - self.scheduler_job = None - - # now, the first job needs to have status failure, and the second needs to be queued - session = job_manager.lock() - jobs = list(session.query(Job)) - assert len(jobs) == 2 - assert jobs[0].status == 'failure' - assert jobs[1].status == 'queued' - # the result files should not be there yet - assert not os.path.exists(jobs[0].std_out_file()) - assert not os.path.exists(jobs[0].std_err_file()) - job_manager.unlock() - - - # reset the job 1 - jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs']) - - # now, start the local execution of the job in a parallel job - self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2']) - - # sleep some time to assure that the scheduler was able to finish the first and start the second job - time.sleep(9) - # ... and kill the scheduler - self.scheduler_job.kill() - self.scheduler_job = None - - # Job 1 and two array jobs of job two should be finished now, the other two still need to be queued - session = job_manager.lock() - jobs = list(session.query(Job)) - assert len(jobs) == 2 - assert jobs[0].status == 'failure' - assert jobs[1].status == 'executing' - assert jobs[1].array[0].status == 'failure' - assert jobs[1].array[0].result == 1 - assert jobs[1].array[1].status == 'success' - assert jobs[1].array[1].result == 0 - assert len([a for a in jobs[1].array if a.status == 'queued']) == 2 - out_file = jobs[0].std_out_file() - err_file = jobs[0].std_err_file() - job_manager.unlock() - - # the result files of the first job should now be there - assert os.path.isfile(out_file) - assert os.path.isfile(err_file) - assert open(out_file).read().rstrip() == 'This is a text message to std-out' - assert open(err_file).read().rstrip() == 'This is a text message to std-err' - - # resubmit all jobs - jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs']) - # check that the log files have been cleaned - assert not os.path.exists(out_file) - assert not os.path.exists(err_file) - # ... but the log dir still exists - assert os.path.exists(self.log_dir) - - # now, let the scheduler run all jobs - self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished']) - # and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards) - self.scheduler_job.wait() - self.scheduler_job = None + try: + + # first, add some commands to the database + script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh') + script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh') + from gridtk.script import jman + # add a simple script that will write some information to the + jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1]) + jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', script_2]) + + # check that the database was created successfully + assert os.path.exists(self.database) + + print() + # test that the list command works (should also work with the "default" grid manager + jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1']) + jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies']) + + # get insight into the database + job_manager = gridtk.local.JobManagerLocal(database=self.database) + session = job_manager.lock() + jobs = list(session.query(Job)) + assert len(jobs) == 2 + assert jobs[0].id == 1 + assert jobs[1].id == 2 + assert len(jobs[1].array) == 4 + assert jobs[0].status == 'submitted' + assert jobs[1].status == 'submitted' + + # check that the job dependencies are correct + waiting = jobs[0].get_jobs_waiting_for_us() + assert len(waiting) == 1 + assert waiting[0].id == 2 + waited = jobs[1].get_jobs_we_wait_for() + assert len(waited) == 1 + assert waited[0].id == 1 + + job_manager.unlock() + + # now, start the local execution of the job in a parallel job + self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2']) + + # sleep some time to assure that the scheduler was able to start the first job + time.sleep(4) + # ... and kill the scheduler + self.scheduler_job.kill() + self.scheduler_job = None - # check that all output files are generated again - assert os.path.isfile(out_file) - assert os.path.isfile(err_file) - assert open(out_file).read().rstrip() == 'This is a text message to std-out' - assert open(err_file).read().rstrip() == 'This is a text message to std-err' - - # check that exactly four output and four error files have been created - files = os.listdir(self.log_dir) - assert len(files) == 10 - for i in range(1,8,2): - assert 'test_2.o2.%d'%i in files - assert 'test_2.e2.%d'%i in files - - # check that all array jobs are finished now - session = job_manager.lock() - jobs = list(session.query(Job)) - assert len(jobs) == 2 - assert jobs[1].status == 'failure' - assert jobs[1].array[0].status == 'failure' - assert jobs[1].array[0].result == 1 - for i in range(1,4): - assert jobs[1].array[i].status == 'success' - assert jobs[1].array[i].result == 0 - job_manager.unlock() - - print() - # test that the list command still works - jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs']) - - print() - # test that the report command works - jman.main(['./bin/jman', '--database', self.database, 'report']) - - # clean-up - jman.main(['./bin/jman', '--local', '--database', self.database, 'delete']) - - # check that the database and the log files are gone - assert len(os.listdir(self.temp_dir)) == 0 - - # add the scripts again, but this time with the --stop-on-failure option - jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', script_1]) - jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', script_2]) - - # and execute them, but without writing the log files - self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files']) - # and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards) - self.scheduler_job.wait() - self.scheduler_job = None + # now, the first job needs to have status failure, and the second needs to be queued + session = job_manager.lock() + jobs = list(session.query(Job)) + assert len(jobs) == 2 + assert jobs[0].status == 'failure' + assert jobs[1].status == 'queued' + # the result files should not be there yet + assert not os.path.exists(jobs[0].std_out_file()) + assert not os.path.exists(jobs[0].std_err_file()) + job_manager.unlock() - # assert that the log files are not there - assert not os.path.isfile(out_file) - assert not os.path.isfile(err_file) + # reset the job 1 + jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs']) - # check that all array jobs are finished now - session = job_manager.lock() - jobs = list(session.query(Job)) - assert len(jobs) == 2 - assert jobs[0].status == 'failure' - assert jobs[0].result == 255 - assert jobs[1].status == 'failure' - assert jobs[1].result is None - job_manager.unlock() + # now, start the local execution of the job in a parallel job + self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2']) - # and clean up again - jman.main(['./bin/jman', '--local', '--database', self.database, 'delete']) + # sleep some time to assure that the scheduler was able to finish the first and start the second job + time.sleep(9) + # ... and kill the scheduler + self.scheduler_job.kill() + self.scheduler_job = None + + # Job 1 and two array jobs of job two should be finished now, the other two still need to be queued + session = job_manager.lock() + jobs = list(session.query(Job)) + assert len(jobs) == 2 + assert jobs[0].status == 'failure' + assert jobs[1].status == 'executing' + assert jobs[1].array[0].status == 'failure' + assert jobs[1].array[0].result == 1 + assert jobs[1].array[1].status == 'success' + assert jobs[1].array[1].result == 0 + assert len([a for a in jobs[1].array if a.status == 'queued']) == 2 + out_file = jobs[0].std_out_file() + err_file = jobs[0].std_err_file() + job_manager.unlock() + + # the result files of the first job should now be there + assert os.path.isfile(out_file) + assert os.path.isfile(err_file) + assert open(out_file).read().rstrip() == 'This is a text message to std-out' + assert open(err_file).read().rstrip() == 'This is a text message to std-err' + + # resubmit all jobs + jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs']) + # check that the log files have been cleaned + assert not os.path.exists(out_file) + assert not os.path.exists(err_file) + # ... but the log dir still exists + assert os.path.exists(self.log_dir) + + # now, let the scheduler run all jobs + self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '1', '--parallel', '2', '--die-when-finished']) + # and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards) + self.scheduler_job.wait() + self.scheduler_job = None + + # check that all output files are generated again + assert os.path.isfile(out_file) + assert os.path.isfile(err_file) + assert open(out_file).read().rstrip() == 'This is a text message to std-out' + assert open(err_file).read().rstrip() == 'This is a text message to std-err' + + # check that exactly four output and four error files have been created + files = os.listdir(self.log_dir) + assert len(files) == 10 + for i in range(1,8,2): + assert 'test_2.o2.%d'%i in files + assert 'test_2.e2.%d'%i in files + + # check that all array jobs are finished now + session = job_manager.lock() + jobs = list(session.query(Job)) + assert len(jobs) == 2 + assert jobs[1].status == 'failure' + assert jobs[1].array[0].status == 'failure' + assert jobs[1].array[0].result == 1 + for i in range(1,4): + assert jobs[1].array[i].status == 'success' + assert jobs[1].array[i].result == 0 + job_manager.unlock() + + print() + # test that the list command still works + jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs']) + + print() + # test that the report command works + jman.main(['./bin/jman', '--database', self.database, 'report']) + + # clean-up + jman.main(['./bin/jman', '--local', '--database', self.database, 'delete']) + + # check that the database and the log files are gone + assert len(os.listdir(self.temp_dir)) == 0 + + # add the scripts again, but this time with the --stop-on-failure option + jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', '--stop-on-failure', script_1]) + jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2', '--dependencies', '1', '--parametric', '1-7:2', '--stop-on-failure', script_2]) + + # and execute them, but without writing the log files + self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2', '--die-when-finished', '--no-log-files']) + # and wait for the job to finish (the timeout argument to Popen only exists from python 3.3 onwards) + self.scheduler_job.wait() + self.scheduler_job = None + + # assert that the log files are not there + assert not os.path.isfile(out_file) + assert not os.path.isfile(err_file) + + + # check that all array jobs are finished now + session = job_manager.lock() + jobs = list(session.query(Job)) + assert len(jobs) == 2 + assert jobs[0].status == 'failure' + assert jobs[0].result == 255 + assert jobs[1].status == 'failure' + assert jobs[1].result is None + job_manager.unlock() + + # and clean up again + jman.main(['./bin/jman', '--local', '--database', self.database, 'delete']) + + except KeyboardInterrupt: + # make sure that the keyboard interrupt is captured and the mess is cleaned up (i.e. by calling tearDown) + pass def test02_grid(self): diff --git a/gridtk/tools.py b/gridtk/tools.py index 162244d..571901f 100644 --- a/gridtk/tools.py +++ b/gridtk/tools.py @@ -9,10 +9,49 @@ probing. import os import re -import six import hashlib import random + +# sqlalchemy migration; copied from Bob +try: + from sqlalchemy import Enum +except ImportError: + from sqlalchemy import types + + class Enum(types.TypeDecorator): + impl = types.Unicode + + def __init__(self, *values): + """Emulates an Enum type. + values: + A list of valid values for this column + """ + + if values is None or len(values) is 0: + raise AssertionError('Enum requires a list of values') + self.values = values[:] + + # The length of the string/unicode column should be the longest string + # in values + size = max([len(v) for v in values if v is not None]) + super(Enum, self).__init__(size) + + def process_bind_param(self, value, dialect): + if value not in self.values: + raise AssertionError('"%s" not in Enum.values' % value) + return value + + def process_result_value(self, value, dialect): + return value + +try: + from sqlalchemy.orm import relationship +except ImportError: + from sqlalchemy.orm import relation as relationship + + + # initialize the logging system import logging logger = logging.getLogger("gridtk") @@ -45,6 +84,16 @@ def makedirs_safe(fulldir): else: raise +def str_(name): + """Return the string representation of the given 'name'. + If it is a bytes object, it will be converted into str. + If it is a str object, it will simply be resurned.""" + if isinstance(name, bytes) and not isinstance(name, str): + return name.decode('utf8') + else: + return name + + def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='', stderr='', env=[], array=None, context='grid', hostname=None, mem=None, memfree=None, hvmem=None, pe_opt=None, io_big=False): @@ -138,6 +187,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='', scmd = ['qsub'] + import six if isinstance(queue, six.string_types) and queue not in ('all.q', 'default'): scmd += ['-l', queue] @@ -212,7 +262,7 @@ def qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='', logger.debug("Qsub command '%s'", ' '.join(scmd)) from .setshell import sexec - jobid = sexec(context, scmd) + jobid = str_(sexec(context, scmd)) return int(jobid.split('.',1)[0]) def make_shell(shell, command): @@ -253,7 +303,7 @@ def qstat(jobid, context='grid'): logger.debug("Qstat command '%s'", ' '.join(scmd)) from .setshell import sexec - data = sexec(context, scmd, error_on_nonzero=False) + data = str_(sexec(context, scmd, error_on_nonzero=False)) # some parsing: retval = {} diff --git a/setup.py b/setup.py index a3242d1..bf0a275 100644 --- a/setup.py +++ b/setup.py @@ -9,14 +9,14 @@ if sys.version_info[:2] < (2, 7) or ((3,0) <= sys.version_info[:2] < (3,2)): setup( name='gridtk', - version='1.0.0.a0', - description='SGE Grid Submission and Monitoring Tools for Idiap', + version='1.0.0', + description='SGE Grid and Local Submission and Monitoring Tools for Idiap', url='https://github.com/idiap/gridtk', license='LICENSE.txt', - author='Andre Anjos', - author_email='andre.anjos@idiap.ch', + author='Manuel Guenther', + author_email='manuel.guenther@idiap.ch', packages=find_packages(), -- GitLab