test_worker.py 17.6 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
39
40
41
42
try:
    import Queue
except ImportError:
    import queue as Queue
Philip ABBET's avatar
Philip ABBET committed
43
from time import time
44
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
45
46
47
48

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host
49
from ..database import Database
Philip ABBET's avatar
Philip ABBET committed
50
51
52
53

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
54
55
56
#----------------------------------------------------------


57
58
WORKER1 = b'worker1'
WORKER2 = b'worker2'
Philip ABBET's avatar
Philip ABBET committed
59
60


Philip ABBET's avatar
Philip ABBET committed
61
62
63
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
64
CONFIGURATION1 = {
65
66
    'queue': 'queue',
    'inputs': {
67
        'in': {
68
69
70
71
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
72
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
73
74
75
76
77
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
78
    'algorithm': 'legacy/echo/1',
79
80
81
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
82
        'version': '1.3.0'
83
84
    },
    'outputs': {
85
        'out': {
86
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
87
            'endpoint': 'out',
88
89
90
91
92
93
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
94
95
96
}


Philip ABBET's avatar
Philip ABBET committed
97
98
99
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
100
CONFIGURATION2 = {
101
102
    'queue': 'queue',
    'inputs': {
103
        'in': {
104
105
106
107
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
108
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
109
110
111
112
113
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
114
    'algorithm': 'legacy/echo/1',
115
116
117
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
118
        'version': '1.3.0'
119
120
    },
    'outputs': {
121
        'out': {
122
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
123
            'endpoint': 'out',
124
125
126
127
128
129
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
130
131
132
}


Philip ABBET's avatar
Philip ABBET committed
133
134
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class ControllerProcess(multiprocessing.Process):

    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue


    def run(self):
        self.queue.put('STARTED')

        def onWorkerReady(name):
            self.queue.put('READY ' + name)

        def onWorkerGone(name):
            self.queue.put('GONE ' + name)

        self.controller = WorkerController(
            '127.0.0.1',
155
            port=50999,
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
                if command == 'STOP':
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
178
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
181
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
182

Philip ABBET's avatar
Philip ABBET committed
183
184
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
185
186


Philip ABBET's avatar
Philip ABBET committed
187
188
189
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
190
191


Philip ABBET's avatar
Philip ABBET committed
192
193
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
194

195
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
196

Philip ABBET's avatar
Philip ABBET committed
197
    def __init__(self, methodName='runTest'):
198
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
199
        self.controller = None
200
201
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
202
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
203
204


Philip ABBET's avatar
Philip ABBET committed
205
    def setUp(self):
206
207
208
209
210
211
212
213
214
215
216
217
218
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
219

220
221
222
223
224
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
225

Philip ABBET's avatar
Philip ABBET committed
226
        def onWorkerReady(name):
227
228
229
230
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
231

Philip ABBET's avatar
Philip ABBET committed
232
        self.controller = WorkerController(
233
234
235
236
237
238
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
239
        )
Philip ABBET's avatar
Philip ABBET committed
240

241
242
243
244
245
246
247
248
249
250
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
251
        args = [
252
          '--prefix=%s' % prefix,
Philip ABBET's avatar
Philip ABBET committed
253
          '--cache=%s' % tmp_prefix,
254
          '--name=%s' % name,
255
          # '-vv',
256
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
257
        ]
Philip ABBET's avatar
Philip ABBET committed
258

Philip ABBET's avatar
Philip ABBET committed
259
260
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
261

262
263
264
265
266
267
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
268
269


270
271
272
273
274
275
276
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


277
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
278
        start = time()
279
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
280
            self.assertTrue(self.controller.process(100) is None)
281
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
282

283
284
285
286
287
288
289
290
291
292
293
294
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


295
296
297
298
299
300
301
    def prepare_databases(self, configuration):
        for _, input_cfg in configuration['inputs'].items():
            database = Database(prefix, input_cfg['database'])
            view = database.view(input_cfg['protocol'], input_cfg['set'])
            view.index(os.path.join(tmp_prefix, input_cfg['path']))


302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
318
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
319

Philip ABBET's avatar
Philip ABBET committed
320

321
322
323
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
324

325
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
326

Philip ABBET's avatar
Philip ABBET committed
327
328
        sleep(1)

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
351
        self.start_worker(WORKER1, address='tcp://127.0.0.1:50999')
352
353
        sleep(1)

354
        self.start_controller(port=50999)
355
356
357
358

        self.wait_for_worker_connection(WORKER1)


359
360
361
362
363
364
365
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

366
        self.start_worker(WORKER1, 'tcp://127.0.0.1:50999')
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')


387
388
389
390
391
392
393
394
395
396
397
#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
398

399
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
400

401
402
        self.prepare_databases(CONFIGURATION1)

Philip ABBET's avatar
Philip ABBET committed
403

404
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
405
406
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
407

Philip ABBET's avatar
Philip ABBET committed
408
409
410
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
411

412
413
414
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
415
        return message
Philip ABBET's avatar
Philip ABBET committed
416
417


Philip ABBET's avatar
Philip ABBET committed
418
419
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
420

Philip ABBET's avatar
Philip ABBET committed
421
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
422

Philip ABBET's avatar
Philip ABBET committed
423
424
425
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
426

Philip ABBET's avatar
Philip ABBET committed
427
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
428

Philip ABBET's avatar
Philip ABBET committed
429
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
430
431


Philip ABBET's avatar
Philip ABBET committed
432
433
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
436

Philip ABBET's avatar
Philip ABBET committed
437
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
438
439


Philip ABBET's avatar
Philip ABBET committed
440
441
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
442
        config['algorithm'] = 'legacy/process_crash/1'
Philip ABBET's avatar
Philip ABBET committed
443

Philip ABBET's avatar
Philip ABBET committed
444
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
445

Philip ABBET's avatar
Philip ABBET committed
446
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
447

Philip ABBET's avatar
Philip ABBET committed
448
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
449
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
450
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
451

Philip ABBET's avatar
Philip ABBET committed
452
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454
455
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
456
457


Philip ABBET's avatar
Philip ABBET committed
458
459
460
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
461

Philip ABBET's avatar
Philip ABBET committed
462
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
463

Philip ABBET's avatar
Philip ABBET committed
464
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
465

Philip ABBET's avatar
Philip ABBET committed
466
467
468
469
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
470
471


Philip ABBET's avatar
Philip ABBET committed
472
473
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
474
        config['algorithm'] = 'legacy/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
475

Philip ABBET's avatar
Philip ABBET committed
476
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
477

Philip ABBET's avatar
Philip ABBET committed
478
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
479

Philip ABBET's avatar
Philip ABBET committed
480
        self.assertEqual(worker, WORKER1)
481
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
482
483
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
484
485


Philip ABBET's avatar
Philip ABBET committed
486
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
487
488
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
491
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
492

Philip ABBET's avatar
Philip ABBET committed
493
494
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
495

Philip ABBET's avatar
Philip ABBET committed
496
497
498
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
499

Philip ABBET's avatar
Philip ABBET committed
500
501
502
503
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
504

Philip ABBET's avatar
Philip ABBET committed
505
506
507
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
508
509


Philip ABBET's avatar
Philip ABBET committed
510
511
512
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
513

Philip ABBET's avatar
Philip ABBET committed
514
        self.controller.execute(WORKER1, 1, config)
515
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
516

Philip ABBET's avatar
Philip ABBET committed
517
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
518

Philip ABBET's avatar
Philip ABBET committed
519
520
521
522
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
523
524


525
526
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
527

Philip ABBET's avatar
Philip ABBET committed
528
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
529

Philip ABBET's avatar
Philip ABBET committed
530
531
532
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
Philip ABBET's avatar
Philip ABBET committed
533
        self.assertEqual(data[0], "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
534
535


Philip ABBET's avatar
Philip ABBET committed
536
537
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
538
539
540

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
541
542
543
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
544
545


Philip ABBET's avatar
Philip ABBET committed
546
547
548
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
549
550


Philip ABBET's avatar
Philip ABBET committed
551
552
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
553

554
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
555

Philip ABBET's avatar
Philip ABBET committed
556
557
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
558

559
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
560

561
562
563
564
565
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
566
567


Philip ABBET's avatar
Philip ABBET committed
568
569
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
570

Philip ABBET's avatar
Philip ABBET committed
571
572
573
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
574

Philip ABBET's avatar
Philip ABBET committed
575
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
576

Philip ABBET's avatar
Philip ABBET committed
577
578
579
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
580

Philip ABBET's avatar
Philip ABBET committed
581
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
582

Philip ABBET's avatar
Philip ABBET committed
583
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
584
585


Philip ABBET's avatar
Philip ABBET committed
586
587
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
588
589


Philip ABBET's avatar
Philip ABBET committed
590
591
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
592
593


Philip ABBET's avatar
Philip ABBET committed
594
595
596
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
597

Philip ABBET's avatar
Philip ABBET committed
598
599
600
601
602
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
603

Philip ABBET's avatar
Philip ABBET committed
604
605
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
606
607


Philip ABBET's avatar
Philip ABBET committed
608
609
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
610

Philip ABBET's avatar
Philip ABBET committed
611
612
613
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
614

Philip ABBET's avatar
Philip ABBET committed
615
616
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
617

Philip ABBET's avatar
Philip ABBET committed
618
619
620
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
621

Philip ABBET's avatar
Philip ABBET committed
622
623
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
624

Philip ABBET's avatar
Philip ABBET committed
625
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
626
627


Philip ABBET's avatar
Philip ABBET committed
628
629
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
630
631
632

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
633
634
635
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
636
637


Philip ABBET's avatar
Philip ABBET committed
638
639
640
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)