test_worker.py 15.4 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Philip ABBET's avatar
Philip ABBET committed
39
from time import time
40
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
41
42
43
44
45
46
47
48

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
49
50
51
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
52
53
54
55
WORKER1 = 'worker1'
WORKER2 = 'worker2'


Philip ABBET's avatar
Philip ABBET committed
56
57
58
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
59
CONFIGURATION1 = {
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    'queue': 'queue',
    'inputs': {
        'in_data': {
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
    'algorithm': 'user/integers_echo/1',
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
        'version': '1.2.0'
    },
    'outputs': {
        'out_data': {
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'endpoint': 'out_data',
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
89
90
91
}


Philip ABBET's avatar
Philip ABBET committed
92
93
94
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
95
CONFIGURATION2 = {
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
    'queue': 'queue',
    'inputs': {
        'in_data': {
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
    'algorithm': 'user/integers_echo/1',
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
        'version': '1.2.0'
    },
    'outputs': {
        'out_data': {
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'endpoint': 'out_data',
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
125
126
127
}


Philip ABBET's avatar
Philip ABBET committed
128
129
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
130

Philip ABBET's avatar
Philip ABBET committed
131
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
132

Philip ABBET's avatar
Philip ABBET committed
133
134
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
135

Philip ABBET's avatar
Philip ABBET committed
136
137
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
138
139


Philip ABBET's avatar
Philip ABBET committed
140
141
142
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
143
144


Philip ABBET's avatar
Philip ABBET committed
145
146
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
147

148
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
149

Philip ABBET's avatar
Philip ABBET committed
150
    def __init__(self, methodName='runTest'):
151
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
152
        self.controller = None
153
154
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
155
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
156
157


Philip ABBET's avatar
Philip ABBET committed
158
    def setUp(self):
159
160
161
162
163
164
165
166
167
168
169
170
171
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
172

173
174
175
176
177
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
178

Philip ABBET's avatar
Philip ABBET committed
179
        def onWorkerReady(name):
180
181
182
183
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
184

Philip ABBET's avatar
Philip ABBET committed
185
        self.controller = WorkerController(
186
187
188
189
190
191
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
192
        )
Philip ABBET's avatar
Philip ABBET committed
193

194
195
196
197
198
199
200
201
202
203
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
204
205
206
        args = [
          '--prefix=%s' % prefix,
          '--cache=%s' % tmp_prefix,
207
          '--name=%s' % name,
Philip ABBET's avatar
Philip ABBET committed
208
          # '-vvv',
209
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
210
        ]
Philip ABBET's avatar
Philip ABBET committed
211

Philip ABBET's avatar
Philip ABBET committed
212
213
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
214

215
216
217
218
219
220
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
221
222


223
224
225
226
227
228
229
230
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


    def wait_for_worker_connection(self, name):
Philip ABBET's avatar
Philip ABBET committed
231
        start = time()
232
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
233
234
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds
Philip ABBET's avatar
Philip ABBET committed
235

236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
264
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
265

Philip ABBET's avatar
Philip ABBET committed
266

267
268
269
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
270

271
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
272

273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
        self.start_worker(WORKER1, address='tcp://127.0.0.1:51000')
        sleep(1)

        self.start_controller(port=51000)

        self.wait_for_worker_connection(WORKER1)


#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
315
316


Philip ABBET's avatar
Philip ABBET committed
317
318
319
    def _wait(self, max=100):
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
320

Philip ABBET's avatar
Philip ABBET committed
321
322
323
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
324

Philip ABBET's avatar
Philip ABBET committed
325
        return message
Philip ABBET's avatar
Philip ABBET committed
326
327


Philip ABBET's avatar
Philip ABBET committed
328
329
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
330

Philip ABBET's avatar
Philip ABBET committed
331
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
332

Philip ABBET's avatar
Philip ABBET committed
333
334
335
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
336

Philip ABBET's avatar
Philip ABBET committed
337
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
338

Philip ABBET's avatar
Philip ABBET committed
339
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
340
341


Philip ABBET's avatar
Philip ABBET committed
342
343
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
344

Philip ABBET's avatar
Philip ABBET committed
345
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
346

Philip ABBET's avatar
Philip ABBET committed
347
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
348
349


Philip ABBET's avatar
Philip ABBET committed
350
351
352
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_crash/1'
Philip ABBET's avatar
Philip ABBET committed
353

Philip ABBET's avatar
Philip ABBET committed
354
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
355

Philip ABBET's avatar
Philip ABBET committed
356
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
357

Philip ABBET's avatar
Philip ABBET committed
358
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
359
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
360
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
361

Philip ABBET's avatar
Philip ABBET committed
362
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
363

Philip ABBET's avatar
Philip ABBET committed
364
365
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
366
367


Philip ABBET's avatar
Philip ABBET committed
368
369
370
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
371

Philip ABBET's avatar
Philip ABBET committed
372
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
373

Philip ABBET's avatar
Philip ABBET committed
374
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
375

Philip ABBET's avatar
Philip ABBET committed
376
377
378
379
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
380
381


Philip ABBET's avatar
Philip ABBET committed
382
383
384
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
385

Philip ABBET's avatar
Philip ABBET committed
386
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
387

Philip ABBET's avatar
Philip ABBET committed
388
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
389

Philip ABBET's avatar
Philip ABBET committed
390
391
392
393
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
394
395


Philip ABBET's avatar
Philip ABBET committed
396
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
397
398
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
399

Philip ABBET's avatar
Philip ABBET committed
400
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
401
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
402

Philip ABBET's avatar
Philip ABBET committed
403
404
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
405

Philip ABBET's avatar
Philip ABBET committed
406
407
408
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
409

Philip ABBET's avatar
Philip ABBET committed
410
411
412
413
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
414

Philip ABBET's avatar
Philip ABBET committed
415
416
417
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
418
419


Philip ABBET's avatar
Philip ABBET committed
420
421
422
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
423

Philip ABBET's avatar
Philip ABBET committed
424
        self.controller.execute(WORKER1, 1, config)
425
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
426

Philip ABBET's avatar
Philip ABBET committed
427
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
428

Philip ABBET's avatar
Philip ABBET committed
429
430
431
432
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
433
434


435
436
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
437

Philip ABBET's avatar
Philip ABBET committed
438
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
439

Philip ABBET's avatar
Philip ABBET committed
440
441
442
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
Philip ABBET's avatar
Philip ABBET committed
443
        self.assertEqual(data[0], "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
444
445


Philip ABBET's avatar
Philip ABBET committed
446
447
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
448
449
450

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
451
452
453
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
454
455


Philip ABBET's avatar
Philip ABBET committed
456
457
458
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
459
460


Philip ABBET's avatar
Philip ABBET committed
461
462
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
463

464
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
465

Philip ABBET's avatar
Philip ABBET committed
466
467
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
468

469
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
470

471
472
473
474
475
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
476
477


Philip ABBET's avatar
Philip ABBET committed
478
479
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
480

Philip ABBET's avatar
Philip ABBET committed
481
482
483
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
484

Philip ABBET's avatar
Philip ABBET committed
485
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
488
489
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
490

Philip ABBET's avatar
Philip ABBET committed
491
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
492

Philip ABBET's avatar
Philip ABBET committed
493
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
494
495


Philip ABBET's avatar
Philip ABBET committed
496
497
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
498
499


Philip ABBET's avatar
Philip ABBET committed
500
501
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
502
503


Philip ABBET's avatar
Philip ABBET committed
504
505
506
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
507

Philip ABBET's avatar
Philip ABBET committed
508
509
510
511
512
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
513

Philip ABBET's avatar
Philip ABBET committed
514
515
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
516
517


Philip ABBET's avatar
Philip ABBET committed
518
519
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
520

Philip ABBET's avatar
Philip ABBET committed
521
522
523
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
524

Philip ABBET's avatar
Philip ABBET committed
525
526
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
527

Philip ABBET's avatar
Philip ABBET committed
528
529
530
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
531

Philip ABBET's avatar
Philip ABBET committed
532
533
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
534

Philip ABBET's avatar
Philip ABBET committed
535
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
536
537


Philip ABBET's avatar
Philip ABBET committed
538
539
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
540
541
542

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
543
544
545
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
546
547


Philip ABBET's avatar
Philip ABBET committed
548
549
550
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)