test_worker.py 17.5 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
39
import Queue
Philip ABBET's avatar
Philip ABBET committed
40
from time import time
41
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
42
43
44
45

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host
46
from ..database import Database
Philip ABBET's avatar
Philip ABBET committed
47
48
49
50

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
51
52
53
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
54
55
56
57
WORKER1 = 'worker1'
WORKER2 = 'worker2'


Philip ABBET's avatar
Philip ABBET committed
58
59
60
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
61
CONFIGURATION1 = {
62
63
    'queue': 'queue',
    'inputs': {
64
        'in': {
65
66
67
68
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
69
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
70
71
72
73
74
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
75
    'algorithm': 'legacy/echo/1',
76
77
78
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
79
        'version': '1.3.0'
80
81
    },
    'outputs': {
82
        'out': {
83
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
84
            'endpoint': 'out',
85
86
87
88
89
90
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
91
92
93
}


Philip ABBET's avatar
Philip ABBET committed
94
95
96
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
97
CONFIGURATION2 = {
98
99
    'queue': 'queue',
    'inputs': {
100
        'in': {
101
102
103
104
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
105
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
106
107
108
109
110
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
111
    'algorithm': 'legacy/echo/1',
112
113
114
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
115
        'version': '1.3.0'
116
117
    },
    'outputs': {
118
        'out': {
119
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
120
            'endpoint': 'out',
121
122
123
124
125
126
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
127
128
129
}


Philip ABBET's avatar
Philip ABBET committed
130
131
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
132

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class ControllerProcess(multiprocessing.Process):

    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue


    def run(self):
        self.queue.put('STARTED')

        def onWorkerReady(name):
            self.queue.put('READY ' + name)

        def onWorkerGone(name):
            self.queue.put('GONE ' + name)

        self.controller = WorkerController(
            '127.0.0.1',
152
            port=50999,
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
                if command == 'STOP':
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
175
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
176

Philip ABBET's avatar
Philip ABBET committed
177
178
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
181
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
182
183


Philip ABBET's avatar
Philip ABBET committed
184
185
186
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
187
188


Philip ABBET's avatar
Philip ABBET committed
189
190
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
191

192
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
193

Philip ABBET's avatar
Philip ABBET committed
194
    def __init__(self, methodName='runTest'):
195
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
196
        self.controller = None
197
198
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
199
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
200
201


Philip ABBET's avatar
Philip ABBET committed
202
    def setUp(self):
203
204
205
206
207
208
209
210
211
212
213
214
215
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
216

217
218
219
220
221
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
222

Philip ABBET's avatar
Philip ABBET committed
223
        def onWorkerReady(name):
224
225
226
227
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
228

Philip ABBET's avatar
Philip ABBET committed
229
        self.controller = WorkerController(
230
231
232
233
234
235
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
236
        )
Philip ABBET's avatar
Philip ABBET committed
237

238
239
240
241
242
243
244
245
246
247
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
248
        args = [
249
          '--prefix=%s' % ','.join(prefix.paths),
Philip ABBET's avatar
Philip ABBET committed
250
          '--cache=%s' % tmp_prefix,
251
          '--name=%s' % name,
252
          # '-vv',
253
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
254
        ]
Philip ABBET's avatar
Philip ABBET committed
255

Philip ABBET's avatar
Philip ABBET committed
256
257
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
258

259
260
261
262
263
264
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
265
266


267
268
269
270
271
272
273
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


274
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
275
        start = time()
276
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
277
            self.assertTrue(self.controller.process(100) is None)
278
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
279

280
281
282
283
284
285
286
287
288
289
290
291
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


292
293
294
295
296
297
298
    def prepare_databases(self, configuration):
        for _, input_cfg in configuration['inputs'].items():
            database = Database(prefix, input_cfg['database'])
            view = database.view(input_cfg['protocol'], input_cfg['set'])
            view.index(os.path.join(tmp_prefix, input_cfg['path']))


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
315
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
316

Philip ABBET's avatar
Philip ABBET committed
317

318
319
320
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
321

322
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
323

Philip ABBET's avatar
Philip ABBET committed
324
325
        sleep(1)

326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
348
        self.start_worker(WORKER1, address='tcp://127.0.0.1:50999')
349
350
        sleep(1)

351
        self.start_controller(port=50999)
352
353
354
355

        self.wait_for_worker_connection(WORKER1)


356
357
358
359
360
361
362
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

363
        self.start_worker(WORKER1, 'tcp://127.0.0.1:50999')
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')


384
385
386
387
388
389
390
391
392
393
394
#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
395

396
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
397

398
399
        self.prepare_databases(CONFIGURATION1)

Philip ABBET's avatar
Philip ABBET committed
400

401
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
402
403
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
404

Philip ABBET's avatar
Philip ABBET committed
405
406
407
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
408

Philip ABBET's avatar
Philip ABBET committed
409
        return message
Philip ABBET's avatar
Philip ABBET committed
410
411


Philip ABBET's avatar
Philip ABBET committed
412
413
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
414

Philip ABBET's avatar
Philip ABBET committed
415
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
416

Philip ABBET's avatar
Philip ABBET committed
417
418
419
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
420

Philip ABBET's avatar
Philip ABBET committed
421
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
422

Philip ABBET's avatar
Philip ABBET committed
423
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
424
425


Philip ABBET's avatar
Philip ABBET committed
426
427
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
428

Philip ABBET's avatar
Philip ABBET committed
429
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
430

Philip ABBET's avatar
Philip ABBET committed
431
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
432
433


Philip ABBET's avatar
Philip ABBET committed
434
435
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
436
        config['algorithm'] = 'legacy/process_crash/1'
Philip ABBET's avatar
Philip ABBET committed
437

Philip ABBET's avatar
Philip ABBET committed
438
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
439

Philip ABBET's avatar
Philip ABBET committed
440
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
441

Philip ABBET's avatar
Philip ABBET committed
442
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
443
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
444
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
445

Philip ABBET's avatar
Philip ABBET committed
446
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
447

Philip ABBET's avatar
Philip ABBET committed
448
449
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
450
451


Philip ABBET's avatar
Philip ABBET committed
452
453
454
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
455

Philip ABBET's avatar
Philip ABBET committed
456
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
457

Philip ABBET's avatar
Philip ABBET committed
458
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
459

Philip ABBET's avatar
Philip ABBET committed
460
461
462
463
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
464
465


Philip ABBET's avatar
Philip ABBET committed
466
467
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
468
        config['algorithm'] = 'legacy/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
469

Philip ABBET's avatar
Philip ABBET committed
470
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
471

Philip ABBET's avatar
Philip ABBET committed
472
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
473

Philip ABBET's avatar
Philip ABBET committed
474
        self.assertEqual(worker, WORKER1)
475
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
476
477
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
478
479


Philip ABBET's avatar
Philip ABBET committed
480
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
481
482
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
483

Philip ABBET's avatar
Philip ABBET committed
484
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
485
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
488
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
492
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
493

Philip ABBET's avatar
Philip ABBET committed
494
495
496
497
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
498

Philip ABBET's avatar
Philip ABBET committed
499
500
501
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
502
503


Philip ABBET's avatar
Philip ABBET committed
504
505
506
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
507

Philip ABBET's avatar
Philip ABBET committed
508
        self.controller.execute(WORKER1, 1, config)
509
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
510

Philip ABBET's avatar
Philip ABBET committed
511
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
512

Philip ABBET's avatar
Philip ABBET committed
513
514
515
516
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
517
518


519
520
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
521

Philip ABBET's avatar
Philip ABBET committed
522
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
523

Philip ABBET's avatar
Philip ABBET committed
524
525
526
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
Philip ABBET's avatar
Philip ABBET committed
527
        self.assertEqual(data[0], "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
528
529


Philip ABBET's avatar
Philip ABBET committed
530
531
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
532
533
534

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
535
536
537
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
538
539


Philip ABBET's avatar
Philip ABBET committed
540
541
542
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
543
544


Philip ABBET's avatar
Philip ABBET committed
545
546
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
547

548
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
549

Philip ABBET's avatar
Philip ABBET committed
550
551
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
552

553
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
554

555
556
557
558
559
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
560
561


Philip ABBET's avatar
Philip ABBET committed
562
563
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
564

Philip ABBET's avatar
Philip ABBET committed
565
566
567
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
568

Philip ABBET's avatar
Philip ABBET committed
569
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
570

Philip ABBET's avatar
Philip ABBET committed
571
572
573
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
574

Philip ABBET's avatar
Philip ABBET committed
575
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
576

Philip ABBET's avatar
Philip ABBET committed
577
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
578
579


Philip ABBET's avatar
Philip ABBET committed
580
581
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
582
583


Philip ABBET's avatar
Philip ABBET committed
584
585
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
586
587


Philip ABBET's avatar
Philip ABBET committed
588
589
590
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
591

Philip ABBET's avatar
Philip ABBET committed
592
593
594
595
596
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
597

Philip ABBET's avatar
Philip ABBET committed
598
599
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
600
601


Philip ABBET's avatar
Philip ABBET committed
602
603
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
604

Philip ABBET's avatar
Philip ABBET committed
605
606
607
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
608

Philip ABBET's avatar
Philip ABBET committed
609
610
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
611

Philip ABBET's avatar
Philip ABBET committed
612
613
614
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
615

Philip ABBET's avatar
Philip ABBET committed
616
617
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
618

Philip ABBET's avatar
Philip ABBET committed
619
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
620
621


Philip ABBET's avatar
Philip ABBET committed
622
623
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
624
625
626

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
627
628
629
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
630
631


Philip ABBET's avatar
Philip ABBET committed
632
633
634
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)