__init__.py 7.21 KB
Newer Older
1 2 3 4 5 6 7
import unittest
import nose

import os
import pkg_resources

import gridtk
8 9
import subprocess, signal
import time
10 11 12 13 14 15 16 17 18 19 20

from gridtk.models import Job

class DatabaseTest(unittest.TestCase):
  # This class defines tests for the gridtk

  def setUp(self):
    # Create a temporary directory that will contain all outputs
    import tempfile
    self.temp_dir = tempfile.mkdtemp(prefix='gridtk_test')
    self.log_dir = os.path.join(self.temp_dir, 'logs')
21 22
    self.database = os.path.join(self.temp_dir, 'database.sql3')
    self.scheduler_job = None
23 24 25


  def tearDown(self):
26 27 28
    # make sure that all scheduler jobs are stopped after exiting
    if self.scheduler_job:
      self.scheduler_job.send_signal(signal.SIGINT)
29 30 31 32 33 34 35 36 37 38 39 40
    # Clean up the mess that we created
    import shutil
    shutil.rmtree(self.temp_dir)

  def test01_local(self):
    # This test executes all commands of the local grid manager and asserts that everything is fine

    # first, add some commands to the database
    script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh')
    script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh')
    from gridtk.script import jman
    # add a simple script that will write some information to the
41 42
    jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1])
    jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2',  '--dependencies', '1', '--parametric', '1-7:2', script_2])
43 44

    # check that the database was created successfully
45
    assert os.path.exists(self.database)
46

47
    print
48
    # test that the list command works (should also work with the "default" grid manager
49 50
    jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1'])
    jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies'])
51 52

    # get insight into the database
53
    job_manager = gridtk.local.JobManagerLocal(database=self.database)
54 55 56 57 58 59
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].id == 1
    assert jobs[1].id == 2
    assert len(jobs[1].array) == 4
60 61
    assert jobs[0].status == 'submitted'
    assert jobs[1].status == 'submitted'
62 63 64 65 66 67 68 69 70 71 72

    # check that the job dependencies are correct
    waiting = jobs[0].get_jobs_waiting_for_us()
    assert len(waiting) == 1
    assert waiting[0].id == 2
    waited = jobs[1].get_jobs_we_wait_for()
    assert len(waited) == 1
    assert waited[0].id == 1

    job_manager.unlock()

73 74
    # now, start the local execution of the job in a parallel job
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
75

76 77 78 79 80
    # sleep some time to assure that the scheduler was able to start the first job
    time.sleep(2)
    # ... and kill the scheduler
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
    # now, the first job needs to have status failure, and the second needs to be queued
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].status == 'failure'
    assert jobs[1].status == 'queued'
    # the result files should not be there yet
    assert not os.path.exists(jobs[0].std_out_file())
    assert not os.path.exists(jobs[0].std_err_file())
    job_manager.unlock()

    # reset the job 1
    jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])

    # now, start the local execution of the job in a parallel job
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '4', '--parallel', '2'])

    # sleep some time to assure that the scheduler was able to finish the first and start the second job
    time.sleep(6)
    # ... and kill the scheduler
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None

    # Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].status == 'failure'
    assert jobs[1].status == 'executing'
    assert jobs[1].array[0].status == 'failure'
    assert jobs[1].array[0].result == 1
    assert jobs[1].array[1].status == 'success'
    assert jobs[1].array[1].result == 0
    assert len([a for a in jobs[1].array if a.status == 'queued']) == 2
    out_file = jobs[0].std_out_file()
    err_file = jobs[0].std_err_file()
    job_manager.unlock()

    # the result files of the first job should now be there
121 122 123 124 125
    assert os.path.isfile(out_file)
    assert os.path.isfile(err_file)
    assert open(out_file).read().rstrip() == 'This is a text message to std-out'
    assert open(err_file).read().rstrip() == 'This is a text message to std-err'

126 127 128 129 130 131
    # resubmit all jobs
    jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs'])
    # check that the log files have been cleaned
    assert not os.path.exists(out_file)
    assert not os.path.exists(err_file)
    # ... but the log dir still exists
132
    assert os.path.exists(self.log_dir)
133

134 135 136 137 138 139
    # now, let the scheduler run all jobs
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2'])
    # ... and kill the scheduler
    time.sleep(3)
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None
140

141 142 143 144 145
    # check that all output files are generated again
    assert os.path.isfile(out_file)
    assert os.path.isfile(err_file)
    assert open(out_file).read().rstrip() == 'This is a text message to std-out'
    assert open(err_file).read().rstrip() == 'This is a text message to std-err'
146 147 148

    # check that exactly four output and four error files have been created
    files = os.listdir(self.log_dir)
149
    assert len(files) == 10
150 151 152 153
    for i in range(1,8,2):
      assert 'test_2.o2.%d'%i in files
      assert 'test_2.e2.%d'%i in files

154
    # check that all array jobs are finished now
155
    session = job_manager.lock()
156 157 158 159 160 161 162 163
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[1].status == 'failure'
    assert jobs[1].array[0].status == 'failure'
    assert jobs[1].array[0].result == 1
    for i in range(1,4):
      assert jobs[1].array[i].status == 'success'
      assert jobs[1].array[i].result == 0
164 165
    job_manager.unlock()

166 167 168 169 170 171 172 173
    print
    # test that the list command still works
    jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs'])

    print
    # test that the list command still works
    jman.main(['./bin/jman', '--database', self.database, 'report'])

174
    # clean-up
175
    jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
176

177
    # check that the database and the log files are gone
178
    assert len(os.listdir(self.temp_dir)) == 0
179 180 181 182 183 184


  def test02_grid(self):
    # Tests the functionality of the grid toolkit in the grid
    raise nose.plugins.skip.SkipTest("This test is not yet implemented. If you find a proper ways to test the grid functionality, please go ahead and implement the test.")