test_worker.py 17.6 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
39
import Queue
Philip ABBET's avatar
Philip ABBET committed
40
from time import time
41
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
42
43
44
45

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host
46
from ..database import Database
Philip ABBET's avatar
Philip ABBET committed
47
48
49
50

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
51
52
53
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
54
55
56
57
WORKER1 = 'worker1'
WORKER2 = 'worker2'


Philip ABBET's avatar
Philip ABBET committed
58
59
60
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
61
CONFIGURATION1 = {
62
63
    'queue': 'queue',
    'inputs': {
64
        'in': {
65
66
67
68
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
69
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
70
71
72
73
74
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
75
    'algorithm': 'legacy/echo/1',
76
77
78
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
79
        'version': '1.3.0'
80
81
    },
    'outputs': {
82
        'out': {
83
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
84
            'endpoint': 'out',
85
86
87
88
89
90
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
91
92
93
}


Philip ABBET's avatar
Philip ABBET committed
94
95
96
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
97
CONFIGURATION2 = {
98
99
    'queue': 'queue',
    'inputs': {
100
        'in': {
101
102
103
104
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
105
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
106
107
108
109
110
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
111
    'algorithm': 'legacy/echo/1',
112
113
114
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
115
        'version': '1.3.0'
116
117
    },
    'outputs': {
118
        'out': {
119
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
120
            'endpoint': 'out',
121
122
123
124
125
126
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
127
128
129
}


Philip ABBET's avatar
Philip ABBET committed
130
131
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
132

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class ControllerProcess(multiprocessing.Process):

    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue


    def run(self):
        self.queue.put('STARTED')

        def onWorkerReady(name):
            self.queue.put('READY ' + name)

        def onWorkerGone(name):
            self.queue.put('GONE ' + name)

        self.controller = WorkerController(
            '127.0.0.1',
152
            port=50999,
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
                if command == 'STOP':
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
175
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
176

Philip ABBET's avatar
Philip ABBET committed
177
178
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
181
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
182
183


Philip ABBET's avatar
Philip ABBET committed
184
185
186
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
187
188


Philip ABBET's avatar
Philip ABBET committed
189
190
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
191

192
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
193

Philip ABBET's avatar
Philip ABBET committed
194
    def __init__(self, methodName='runTest'):
195
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
196
        self.controller = None
197
198
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
199
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
200
201


Philip ABBET's avatar
Philip ABBET committed
202
    def setUp(self):
203
204
205
206
207
208
209
210
211
212
213
214
215
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
216

217
218
219
220
221
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
222

Philip ABBET's avatar
Philip ABBET committed
223
        def onWorkerReady(name):
224
225
226
227
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
228

Philip ABBET's avatar
Philip ABBET committed
229
        self.controller = WorkerController(
230
231
232
233
234
235
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
236
        )
Philip ABBET's avatar
Philip ABBET committed
237

238
239
240
241
242
243
244
245
246
247
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
248
        args = [
249
          '--prefix=%s' % prefix,
Philip ABBET's avatar
Philip ABBET committed
250
          '--cache=%s' % tmp_prefix,
251
          '--name=%s' % name,
252
          # '-vv',
253
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
254
        ]
Philip ABBET's avatar
Philip ABBET committed
255

Philip ABBET's avatar
Philip ABBET committed
256
257
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
258

259
260
261
262
263
264
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
265
266


267
268
269
270
271
272
273
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


274
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
275
        start = time()
276
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
277
            self.assertTrue(self.controller.process(100) is None)
278
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
279

280
281
282
283
284
285
286
287
288
289
290
291
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


292
293
294
295
296
297
298
    def prepare_databases(self, configuration):
        for _, input_cfg in configuration['inputs'].items():
            database = Database(prefix, input_cfg['database'])
            view = database.view(input_cfg['protocol'], input_cfg['set'])
            view.index(os.path.join(tmp_prefix, input_cfg['path']))


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
315
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
316

Philip ABBET's avatar
Philip ABBET committed
317

318
319
320
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
321

322
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
323

Philip ABBET's avatar
Philip ABBET committed
324
325
        sleep(1)

326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
348
        self.start_worker(WORKER1, address='tcp://127.0.0.1:50999')
349
350
        sleep(1)

351
        self.start_controller(port=50999)
352
353
354
355

        self.wait_for_worker_connection(WORKER1)


356
357
358
359
360
361
362
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

363
        self.start_worker(WORKER1, 'tcp://127.0.0.1:50999')
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

        message = controller.queue.get()
        self.assertEqual(message, 'READY ' + WORKER1)

        controller.queue.put('STOP')


384
385
386
387
388
389
390
391
392
393
394
#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
395

396
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
397

398
399
        self.prepare_databases(CONFIGURATION1)

Philip ABBET's avatar
Philip ABBET committed
400

401
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
402
403
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
404

Philip ABBET's avatar
Philip ABBET committed
405
406
407
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
408

409
410
411
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
412
        return message
Philip ABBET's avatar
Philip ABBET committed
413
414


Philip ABBET's avatar
Philip ABBET committed
415
416
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
417

Philip ABBET's avatar
Philip ABBET committed
418
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
419

Philip ABBET's avatar
Philip ABBET committed
420
421
422
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
423

Philip ABBET's avatar
Philip ABBET committed
424
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
427
428


Philip ABBET's avatar
Philip ABBET committed
429
430
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
431

Philip ABBET's avatar
Philip ABBET committed
432
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
433

Philip ABBET's avatar
Philip ABBET committed
434
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
435
436


Philip ABBET's avatar
Philip ABBET committed
437
438
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
439
        config['algorithm'] = 'legacy/process_crash/1'
Philip ABBET's avatar
Philip ABBET committed
440

Philip ABBET's avatar
Philip ABBET committed
441
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
442

Philip ABBET's avatar
Philip ABBET committed
443
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
444

Philip ABBET's avatar
Philip ABBET committed
445
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
446
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
447
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
448

Philip ABBET's avatar
Philip ABBET committed
449
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
450

Philip ABBET's avatar
Philip ABBET committed
451
452
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
453
454


Philip ABBET's avatar
Philip ABBET committed
455
456
457
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
458

Philip ABBET's avatar
Philip ABBET committed
459
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
460

Philip ABBET's avatar
Philip ABBET committed
461
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
462

Philip ABBET's avatar
Philip ABBET committed
463
464
465
466
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
467
468


Philip ABBET's avatar
Philip ABBET committed
469
470
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
471
        config['algorithm'] = 'legacy/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
472

Philip ABBET's avatar
Philip ABBET committed
473
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
474

Philip ABBET's avatar
Philip ABBET committed
475
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
476

Philip ABBET's avatar
Philip ABBET committed
477
        self.assertEqual(worker, WORKER1)
478
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
479
480
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
481
482


Philip ABBET's avatar
Philip ABBET committed
483
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
484
485
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
488
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
492

Philip ABBET's avatar
Philip ABBET committed
493
494
495
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
496

Philip ABBET's avatar
Philip ABBET committed
497
498
499
500
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
501

Philip ABBET's avatar
Philip ABBET committed
502
503
504
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
505
506


Philip ABBET's avatar
Philip ABBET committed
507
508
509
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
510

Philip ABBET's avatar
Philip ABBET committed
511
        self.controller.execute(WORKER1, 1, config)
512
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
513

Philip ABBET's avatar
Philip ABBET committed
514
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
515

Philip ABBET's avatar
Philip ABBET committed
516
517
518
519
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
520
521


522
523
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
524

Philip ABBET's avatar
Philip ABBET committed
525
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
526

Philip ABBET's avatar
Philip ABBET committed
527
528
529
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
Philip ABBET's avatar
Philip ABBET committed
530
        self.assertEqual(data[0], "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
531
532


Philip ABBET's avatar
Philip ABBET committed
533
534
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
535
536
537

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
538
539
540
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
541
542


Philip ABBET's avatar
Philip ABBET committed
543
544
545
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
546
547


Philip ABBET's avatar
Philip ABBET committed
548
549
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
550

551
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
552

Philip ABBET's avatar
Philip ABBET committed
553
554
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
555

556
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
557

558
559
560
561
562
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
563
564


Philip ABBET's avatar
Philip ABBET committed
565
566
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
567

Philip ABBET's avatar
Philip ABBET committed
568
569
570
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
571

Philip ABBET's avatar
Philip ABBET committed
572
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
573

Philip ABBET's avatar
Philip ABBET committed
574
575
576
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
577

Philip ABBET's avatar
Philip ABBET committed
578
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
579

Philip ABBET's avatar
Philip ABBET committed
580
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
581
582


Philip ABBET's avatar
Philip ABBET committed
583
584
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
585
586


Philip ABBET's avatar
Philip ABBET committed
587
588
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
589
590


Philip ABBET's avatar
Philip ABBET committed
591
592
593
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
594

Philip ABBET's avatar
Philip ABBET committed
595
596
597
598
599
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
600

Philip ABBET's avatar
Philip ABBET committed
601
602
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
603
604


Philip ABBET's avatar
Philip ABBET committed
605
606
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
607

Philip ABBET's avatar
Philip ABBET committed
608
609
610
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
611

Philip ABBET's avatar
Philip ABBET committed
612
613
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
614

Philip ABBET's avatar
Philip ABBET committed
615
616
617
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
618

Philip ABBET's avatar
Philip ABBET committed
619
620
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
621

Philip ABBET's avatar
Philip ABBET committed
622
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
623
624


Philip ABBET's avatar
Philip ABBET committed
625
626
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
627
628
629

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
630
631
632
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
633
634


Philip ABBET's avatar
Philip ABBET committed
635
636
637
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)