__init__.py 7.21 KB
Newer Older
1
2
3
4
5
6
7
import unittest
import nose

import os
import pkg_resources

import gridtk
8
9
import subprocess, signal
import time
10
11
12
13
14
15
16
17
18
19
20

from gridtk.models import Job

class DatabaseTest(unittest.TestCase):
  # This class defines tests for the gridtk

  def setUp(self):
    # Create a temporary directory that will contain all outputs
    import tempfile
    self.temp_dir = tempfile.mkdtemp(prefix='gridtk_test')
    self.log_dir = os.path.join(self.temp_dir, 'logs')
21
22
    self.database = os.path.join(self.temp_dir, 'database.sql3')
    self.scheduler_job = None
23
24
25


  def tearDown(self):
26
27
28
    # make sure that all scheduler jobs are stopped after exiting
    if self.scheduler_job:
      self.scheduler_job.send_signal(signal.SIGINT)
29
30
31
32
33
34
35
36
37
38
39
40
    # Clean up the mess that we created
    import shutil
    shutil.rmtree(self.temp_dir)

  def test01_local(self):
    # This test executes all commands of the local grid manager and asserts that everything is fine

    # first, add some commands to the database
    script_1 = pkg_resources.resource_filename('gridtk.tests', 'test_script.sh')
    script_2 = pkg_resources.resource_filename('gridtk.tests', 'test_array.sh')
    from gridtk.script import jman
    # add a simple script that will write some information to the
41
42
    jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_1', script_1])
    jman.main(['./bin/jman', '--local', '--database', self.database, 'submit', '--log-dir', self.log_dir, '--name', 'test_2',  '--dependencies', '1', '--parametric', '1-7:2', script_2])
43
44

    # check that the database was created successfully
45
    assert os.path.exists(self.database)
46

47
    print
48
    # test that the list command works (should also work with the "default" grid manager
49
50
    jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '1'])
    jman.main(['./bin/jman', '--database', self.database, 'list', '--job-ids', '2', '--print-array-jobs', '--print-dependencies'])
51
52

    # get insight into the database
53
    job_manager = gridtk.local.JobManagerLocal(database=self.database)
54
55
56
57
58
59
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].id == 1
    assert jobs[1].id == 2
    assert len(jobs[1].array) == 4
60
61
    assert jobs[0].status == 'submitted'
    assert jobs[1].status == 'submitted'
62
63
64
65
66
67
68
69
70
71
72

    # check that the job dependencies are correct
    waiting = jobs[0].get_jobs_waiting_for_us()
    assert len(waiting) == 1
    assert waiting[0].id == 2
    waited = jobs[1].get_jobs_we_wait_for()
    assert len(waited) == 1
    assert waited[0].id == 1

    job_manager.unlock()

73
74
    # now, start the local execution of the job in a parallel job
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '5', '--parallel', '2'])
75

76
77
78
79
80
    # sleep some time to assure that the scheduler was able to start the first job
    time.sleep(2)
    # ... and kill the scheduler
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None
81

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    # now, the first job needs to have status failure, and the second needs to be queued
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].status == 'failure'
    assert jobs[1].status == 'queued'
    # the result files should not be there yet
    assert not os.path.exists(jobs[0].std_out_file())
    assert not os.path.exists(jobs[0].std_err_file())
    job_manager.unlock()

    # reset the job 1
    jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--job-id', '1', '--running-jobs'])

    # now, start the local execution of the job in a parallel job
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '4', '--parallel', '2'])

    # sleep some time to assure that the scheduler was able to finish the first and start the second job
    time.sleep(6)
    # ... and kill the scheduler
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None

    # Job 1 and two array jobs of job two should be finished now, the other two still need to be queued
    session = job_manager.lock()
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[0].status == 'failure'
    assert jobs[1].status == 'executing'
    assert jobs[1].array[0].status == 'failure'
    assert jobs[1].array[0].result == 1
    assert jobs[1].array[1].status == 'success'
    assert jobs[1].array[1].result == 0
    assert len([a for a in jobs[1].array if a.status == 'queued']) == 2
    out_file = jobs[0].std_out_file()
    err_file = jobs[0].std_err_file()
    job_manager.unlock()

    # the result files of the first job should now be there
121
122
123
124
125
    assert os.path.isfile(out_file)
    assert os.path.isfile(err_file)
    assert open(out_file).read().rstrip() == 'This is a text message to std-out'
    assert open(err_file).read().rstrip() == 'This is a text message to std-err'

126
127
128
129
130
131
    # resubmit all jobs
    jman.main(['./bin/jman', '--local', '--database', self.database, 'resubmit', '--running-jobs'])
    # check that the log files have been cleaned
    assert not os.path.exists(out_file)
    assert not os.path.exists(err_file)
    # ... but the log dir still exists
132
    assert os.path.exists(self.log_dir)
133

134
135
136
137
138
139
    # now, let the scheduler run all jobs
    self.scheduler_job = subprocess.Popen(['./bin/jman', '--local', '--database', self.database, 'run-scheduler', '--sleep-time', '0.1', '--parallel', '2'])
    # ... and kill the scheduler
    time.sleep(3)
    self.scheduler_job.send_signal(signal.SIGINT)
    self.scheduler_job = None
140

141
142
143
144
145
    # check that all output files are generated again
    assert os.path.isfile(out_file)
    assert os.path.isfile(err_file)
    assert open(out_file).read().rstrip() == 'This is a text message to std-out'
    assert open(err_file).read().rstrip() == 'This is a text message to std-err'
146
147
148

    # check that exactly four output and four error files have been created
    files = os.listdir(self.log_dir)
149
    assert len(files) == 10
150
151
152
153
    for i in range(1,8,2):
      assert 'test_2.o2.%d'%i in files
      assert 'test_2.e2.%d'%i in files

154
    # check that all array jobs are finished now
155
    session = job_manager.lock()
156
157
158
159
160
161
162
163
    jobs = list(session.query(Job))
    assert len(jobs) == 2
    assert jobs[1].status == 'failure'
    assert jobs[1].array[0].status == 'failure'
    assert jobs[1].array[0].result == 1
    for i in range(1,4):
      assert jobs[1].array[i].status == 'success'
      assert jobs[1].array[i].result == 0
164
165
    job_manager.unlock()

166
167
168
169
170
171
172
173
    print
    # test that the list command still works
    jman.main(['./bin/jman', '--database', self.database, 'list', '--print-array-jobs'])

    print
    # test that the list command still works
    jman.main(['./bin/jman', '--database', self.database, 'report'])

174
    # clean-up
175
    jman.main(['./bin/jman', '--local', '--database', self.database, 'delete'])
176

177
    # check that the database and the log files are gone
178
    assert len(os.listdir(self.temp_dir)) == 0
179
180
181
182
183
184


  def test02_grid(self):
    # Tests the functionality of the grid toolkit in the grid
    raise nose.plugins.skip.SkipTest("This test is not yet implemented. If you find a proper ways to test the grid functionality, please go ahead and implement the test.")