test_worker.py 14.6 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Philip ABBET's avatar
Philip ABBET committed
39
40
41
42
43
44
45
46
47
from time import time

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
48
49
50
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
51
52
53
54
WORKER1 = 'worker1'
WORKER2 = 'worker2'


Philip ABBET's avatar
Philip ABBET committed
55
56
57
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
CONFIGURATION1 = {
  'queue': 'queue',
  'inputs': {
    'in_data': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/integers_echo/1',
  'parameters': {},
  'environment': {
    'name': 'Python 2.7',
Philip ABBET's avatar
Philip ABBET committed
76
    'version': '1.2.0'
Philip ABBET's avatar
Philip ABBET committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
  },
  'outputs': {
    'out_data': {
      'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'out_data',
      'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


Philip ABBET's avatar
Philip ABBET committed
91
92
93
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
CONFIGURATION2 = {
  'queue': 'queue',
  'inputs': {
    'in_data': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/integers_echo/1',
  'parameters': {},
  'environment': {
    'name': 'Python 2.7',
Philip ABBET's avatar
Philip ABBET committed
112
    'version': '1.2.0'
Philip ABBET's avatar
Philip ABBET committed
113
114
115
116
117
118
119
120
121
122
123
124
125
126
  },
  'outputs': {
    'out_data': {
      'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'out_data',
      'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


Philip ABBET's avatar
Philip ABBET committed
127
128
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
129

Philip ABBET's avatar
Philip ABBET committed
130
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
131

Philip ABBET's avatar
Philip ABBET committed
132
133
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
134

Philip ABBET's avatar
Philip ABBET committed
135
136
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
137
138


Philip ABBET's avatar
Philip ABBET committed
139
140
141
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
142
143


Philip ABBET's avatar
Philip ABBET committed
144
145
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
146
147
148

class TestOneWorker(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
149
150
151
152
153
    def __init__(self, methodName='runTest'):
        super(TestOneWorker, self).__init__(methodName)
        self.controller = None
        self.worker_process = None
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
154
155


Philip ABBET's avatar
Philip ABBET committed
156
157
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
158

Philip ABBET's avatar
Philip ABBET committed
159
        connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
160

Philip ABBET's avatar
Philip ABBET committed
161
162
        def onWorkerReady(name):
            connected_workers.append(name)
Philip ABBET's avatar
Philip ABBET committed
163

Philip ABBET's avatar
Philip ABBET committed
164
165
166
167
168
169
170
        self.controller = WorkerController(
          '127.0.0.1',
          port=None,
          callbacks=dict(
            onWorkerReady = onWorkerReady,
          )
        )
Philip ABBET's avatar
Philip ABBET committed
171

Philip ABBET's avatar
Philip ABBET committed
172
173
174
175
        args = [
          '--prefix=%s' % prefix,
          '--cache=%s' % tmp_prefix,
          '--name=%s' % WORKER1,
Philip ABBET's avatar
Philip ABBET committed
176
          # '-vvv',
Philip ABBET's avatar
Philip ABBET committed
177
178
          self.controller.address,
        ]
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
181
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
182

Philip ABBET's avatar
Philip ABBET committed
183
184
        self.worker_process = WorkerProcess(multiprocessing.Queue(), args)
        self.worker_process.start()
Philip ABBET's avatar
Philip ABBET committed
185

Philip ABBET's avatar
Philip ABBET committed
186
        self.worker_process.queue.get()
Philip ABBET's avatar
Philip ABBET committed
187

Philip ABBET's avatar
Philip ABBET committed
188
189
190
191
        start = time()
        while len(self.controller.workers) == 0:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds
Philip ABBET's avatar
Philip ABBET committed
192

Philip ABBET's avatar
Philip ABBET committed
193
194
        self.assertEqual(len(self.controller.workers), 1)
        self.assertTrue(WORKER1 in self.controller.workers)
Philip ABBET's avatar
Philip ABBET committed
195

Philip ABBET's avatar
Philip ABBET committed
196
197
        self.assertEqual(len(connected_workers), 1)
        self.assertTrue(WORKER1 in connected_workers)
Philip ABBET's avatar
Philip ABBET committed
198

Philip ABBET's avatar
Philip ABBET committed
199

Philip ABBET's avatar
Philip ABBET committed
200
201
    def tearDown(self):
        self.stop_worker()
Philip ABBET's avatar
Philip ABBET committed
202

Philip ABBET's avatar
Philip ABBET committed
203
204
205
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None
Philip ABBET's avatar
Philip ABBET committed
206
207


Philip ABBET's avatar
Philip ABBET committed
208
209
210
    def _wait(self, max=100):
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
211

Philip ABBET's avatar
Philip ABBET committed
212
213
214
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
215

Philip ABBET's avatar
Philip ABBET committed
216
        return message
Philip ABBET's avatar
Philip ABBET committed
217
218


Philip ABBET's avatar
Philip ABBET committed
219
220
221
222
223
    def stop_worker(self):
        if self.worker_process is not None:
            self.worker_process.terminate()
            self.worker_process.join()
            self.worker_process = None
Philip ABBET's avatar
Philip ABBET committed
224
225


Philip ABBET's avatar
Philip ABBET committed
226
227
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
228

Philip ABBET's avatar
Philip ABBET committed
229
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
230

Philip ABBET's avatar
Philip ABBET committed
231
232
233
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
234

Philip ABBET's avatar
Philip ABBET committed
235
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
236

Philip ABBET's avatar
Philip ABBET committed
237
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
238
239


Philip ABBET's avatar
Philip ABBET committed
240
241
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
242

Philip ABBET's avatar
Philip ABBET committed
243
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
244

Philip ABBET's avatar
Philip ABBET committed
245
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
246
247


Philip ABBET's avatar
Philip ABBET committed
248
249
250
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_crash/1'
Philip ABBET's avatar
Philip ABBET committed
251

Philip ABBET's avatar
Philip ABBET committed
252
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
253

Philip ABBET's avatar
Philip ABBET committed
254
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
255

Philip ABBET's avatar
Philip ABBET committed
256
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
257
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
258
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
259

Philip ABBET's avatar
Philip ABBET committed
260
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
261

Philip ABBET's avatar
Philip ABBET committed
262
263
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
264
265


Philip ABBET's avatar
Philip ABBET committed
266
267
268
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
269

Philip ABBET's avatar
Philip ABBET committed
270
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
271

Philip ABBET's avatar
Philip ABBET committed
272
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
273

Philip ABBET's avatar
Philip ABBET committed
274
275
276
277
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
278
279


Philip ABBET's avatar
Philip ABBET committed
280
281
282
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
283

Philip ABBET's avatar
Philip ABBET committed
284
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
285

Philip ABBET's avatar
Philip ABBET committed
286
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
287

Philip ABBET's avatar
Philip ABBET committed
288
289
290
291
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
292
293


Philip ABBET's avatar
Philip ABBET committed
294
295
    def test_worker_shutdown(self):
        did_shutdown = True
Philip ABBET's avatar
Philip ABBET committed
296

Philip ABBET's avatar
Philip ABBET committed
297
298
        def onWorkerGone(name):
            did_shutdown = (name == WORKER1)
Philip ABBET's avatar
Philip ABBET committed
299

Philip ABBET's avatar
Philip ABBET committed
300
        self.controller.callbacks.onWorkerGone = onWorkerGone
Philip ABBET's avatar
Philip ABBET committed
301

Philip ABBET's avatar
Philip ABBET committed
302
        self.stop_worker()
Philip ABBET's avatar
Philip ABBET committed
303

Philip ABBET's avatar
Philip ABBET committed
304
305
306
        self.assertTrue(self.controller.process(2000) is None)
        self.assertEqual(len(self.controller.workers), 0)
        self.assertTrue(did_shutdown)
Philip ABBET's avatar
Philip ABBET committed
307
308


Philip ABBET's avatar
Philip ABBET committed
309
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
310
311
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
312

Philip ABBET's avatar
Philip ABBET committed
313
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
314
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
315

Philip ABBET's avatar
Philip ABBET committed
316
317
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
318

Philip ABBET's avatar
Philip ABBET committed
319
320
321
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
322

Philip ABBET's avatar
Philip ABBET committed
323
324
325
326
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
327

Philip ABBET's avatar
Philip ABBET committed
328
329
330
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
331
332


Philip ABBET's avatar
Philip ABBET committed
333
334
335
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
336

Philip ABBET's avatar
Philip ABBET committed
337
        self.controller.execute(WORKER1, 1, config)
338
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
339

Philip ABBET's avatar
Philip ABBET committed
340
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
341

Philip ABBET's avatar
Philip ABBET committed
342
343
344
345
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
346
347


348
349
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
350

Philip ABBET's avatar
Philip ABBET committed
351
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
352

Philip ABBET's avatar
Philip ABBET committed
353
354
355
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
Philip ABBET's avatar
Philip ABBET committed
356
        self.assertEqual(data[0], "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
357
358


Philip ABBET's avatar
Philip ABBET committed
359
360
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
361
362
363

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
364
365
366
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
367
368


Philip ABBET's avatar
Philip ABBET committed
369
370
371
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
372
373


Philip ABBET's avatar
Philip ABBET committed
374
375
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
376
377
378

class TestTwoWorkers(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
379
380
381
382
383
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkers, self).__init__(methodName)
        self.controller = None
        self.worker_processes = None
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
384
385


Philip ABBET's avatar
Philip ABBET committed
386
387
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
388

Philip ABBET's avatar
Philip ABBET committed
389
        connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
390

Philip ABBET's avatar
Philip ABBET committed
391
392
        def onWorkerReady(name):
            connected_workers.append(name)
Philip ABBET's avatar
Philip ABBET committed
393

Philip ABBET's avatar
Philip ABBET committed
394
395
396
397
398
399
400
        self.controller = WorkerController(
          '127.0.0.1',
          port=None,
          callbacks=dict(
            onWorkerReady = onWorkerReady,
          )
        )
Philip ABBET's avatar
Philip ABBET committed
401

Philip ABBET's avatar
Philip ABBET committed
402
        self.worker_processes = []
Philip ABBET's avatar
Philip ABBET committed
403

Philip ABBET's avatar
Philip ABBET committed
404
405
406
407
408
409
410
        for name in [ WORKER1, WORKER2 ]:
            args = [
                '--prefix=%s' % prefix,
                '--cache=%s' % tmp_prefix,
                '--name=%s' % name,
                self.controller.address,
            ]
Philip ABBET's avatar
Philip ABBET committed
411

Philip ABBET's avatar
Philip ABBET committed
412
413
            if self.docker:
                args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
414

Philip ABBET's avatar
Philip ABBET committed
415
416
            worker_process = WorkerProcess(multiprocessing.Queue(), args)
            worker_process.start()
Philip ABBET's avatar
Philip ABBET committed
417

Philip ABBET's avatar
Philip ABBET committed
418
            worker_process.queue.get()
Philip ABBET's avatar
Philip ABBET committed
419

Philip ABBET's avatar
Philip ABBET committed
420
            self.worker_processes.append(worker_process)
Philip ABBET's avatar
Philip ABBET committed
421

Philip ABBET's avatar
Philip ABBET committed
422
423
424
425
        start = time()
        while len(self.controller.workers) < 2:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds
Philip ABBET's avatar
Philip ABBET committed
426

Philip ABBET's avatar
Philip ABBET committed
427
428
        self.assertTrue(WORKER1 in self.controller.workers)
        self.assertTrue(WORKER2 in self.controller.workers)
Philip ABBET's avatar
Philip ABBET committed
429

Philip ABBET's avatar
Philip ABBET committed
430
431
432
        self.assertEqual(len(connected_workers), 2)
        self.assertTrue(WORKER1 in connected_workers)
        self.assertTrue(WORKER2 in connected_workers)
Philip ABBET's avatar
Philip ABBET committed
433

Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
436
437
438
439
440
    def tearDown(self):
        if self.worker_processes is not None:
            for worker_process in self.worker_processes:
                worker_process.terminate()
                worker_process.join()
            self.worker_processes = None
Philip ABBET's avatar
Philip ABBET committed
441

Philip ABBET's avatar
Philip ABBET committed
442
443
444
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None
Philip ABBET's avatar
Philip ABBET committed
445
446


Philip ABBET's avatar
Philip ABBET committed
447
448
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
449

Philip ABBET's avatar
Philip ABBET committed
450
451
452
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
455

Philip ABBET's avatar
Philip ABBET committed
456
457
458
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
459

Philip ABBET's avatar
Philip ABBET committed
460
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
461

Philip ABBET's avatar
Philip ABBET committed
462
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
463
464


Philip ABBET's avatar
Philip ABBET committed
465
466
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
467
468


Philip ABBET's avatar
Philip ABBET committed
469
470
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
471
472


Philip ABBET's avatar
Philip ABBET committed
473
474
475
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
476

Philip ABBET's avatar
Philip ABBET committed
477
478
479
480
481
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
482

Philip ABBET's avatar
Philip ABBET committed
483
484
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
485
486


Philip ABBET's avatar
Philip ABBET committed
487
488
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
492
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
493

Philip ABBET's avatar
Philip ABBET committed
494
495
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
496

Philip ABBET's avatar
Philip ABBET committed
497
498
499
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
500

Philip ABBET's avatar
Philip ABBET committed
501
502
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
503

Philip ABBET's avatar
Philip ABBET committed
504
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
505
506


Philip ABBET's avatar
Philip ABBET committed
507
508
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
509
510
511

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
512
513
514
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
515
516


Philip ABBET's avatar
Philip ABBET committed
517
518
519
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)