test_worker.py 17 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
39
40
41
42
try:
    import Queue
except ImportError:
    import queue as Queue
Philip ABBET's avatar
Philip ABBET committed
43
from time import time
44
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
45
46
47

from ..scripts import worker
from ..worker import WorkerController
48
from ..database import Database
Philip ABBET's avatar
Philip ABBET committed
49
50
51
52

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
53
54
55
#----------------------------------------------------------


56
57
WORKER1 = b'worker1'
WORKER2 = b'worker2'
Philip ABBET's avatar
Philip ABBET committed
58
59


Philip ABBET's avatar
Philip ABBET committed
60
61
62
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
63
CONFIGURATION1 = {
64
65
    'queue': 'queue',
    'inputs': {
66
        'in': {
67
68
69
70
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
71
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
72
73
74
75
76
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
77
    'algorithm': 'legacy/echo/1',
78
79
80
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
81
        'version': '1.3.0'
82
83
    },
    'outputs': {
84
        'out': {
85
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
86
            'endpoint': 'out',
87
88
89
90
91
92
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
93
94
95
}


Philip ABBET's avatar
Philip ABBET committed
96
97
98
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
99
CONFIGURATION2 = {
100
101
    'queue': 'queue',
    'inputs': {
102
        'in': {
103
104
105
106
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
107
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
108
109
110
111
112
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
113
    'algorithm': 'legacy/echo/1',
114
115
116
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
117
        'version': '1.3.0'
118
119
    },
    'outputs': {
120
        'out': {
121
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
122
            'endpoint': 'out',
123
124
125
126
127
128
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
129
130
131
}


Philip ABBET's avatar
Philip ABBET committed
132
133
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
134

135
136
137
138
139
140
141
142
143
144
145
146
class ControllerProcess(multiprocessing.Process):

    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue


    def run(self):
        self.queue.put('STARTED')

        def onWorkerReady(name):
147
            self.queue.put('READY %s' % name)
148
149

        def onWorkerGone(name):
150
            self.queue.put('GONE %s' % name)
151
152
153

        self.controller = WorkerController(
            '127.0.0.1',
154
            port=50999,
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
                if command == 'STOP':
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
177
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
178

Philip ABBET's avatar
Philip ABBET committed
179
180
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
181

Philip ABBET's avatar
Philip ABBET committed
182
183
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
184
185


Philip ABBET's avatar
Philip ABBET committed
186
187
188
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
189
190


Philip ABBET's avatar
Philip ABBET committed
191
192
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
193

194
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
195

Philip ABBET's avatar
Philip ABBET committed
196
    def __init__(self, methodName='runTest'):
197
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
198
        self.controller = None
199
200
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
201
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
202
203


Philip ABBET's avatar
Philip ABBET committed
204
    def setUp(self):
205
206
207
208
209
210
211
212
213
214
215
216
217
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
218

219
220
221
222
223
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
224

Philip ABBET's avatar
Philip ABBET committed
225
        def onWorkerReady(name):
226
227
228
229
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
230

Philip ABBET's avatar
Philip ABBET committed
231
        self.controller = WorkerController(
232
233
234
235
236
237
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
238
        )
Philip ABBET's avatar
Philip ABBET committed
239

240
241
242
243
244
245
246
247
248
249
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
250
        args = [
251
          '--prefix=%s' % prefix,
Philip ABBET's avatar
Philip ABBET committed
252
          '--cache=%s' % tmp_prefix,
253
          '--name=%s' % name.decode('utf-8'),
254
          # '-vv',
255
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
256
        ]
Philip ABBET's avatar
Philip ABBET committed
257

Philip ABBET's avatar
Philip ABBET committed
258
259
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
260

261
262
263
264
265
266
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
267
268


269
270
271
272
273
274
275
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


276
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
277
        start = time()
278
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
279
            self.assertTrue(self.controller.process(100) is None)
280
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
281

282
283
284
285
286
287
288
289
290
291
292
293
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


294
295
296
297
298
299
300
    def prepare_databases(self, configuration):
        for _, input_cfg in configuration['inputs'].items():
            database = Database(prefix, input_cfg['database'])
            view = database.view(input_cfg['protocol'], input_cfg['set'])
            view.index(os.path.join(tmp_prefix, input_cfg['path']))


301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
317
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
318

Philip ABBET's avatar
Philip ABBET committed
319

320
321
322
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
323

324
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
325

Philip ABBET's avatar
Philip ABBET committed
326
327
        sleep(1)

328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
350
        self.start_worker(WORKER1, address='tcp://127.0.0.1:50999')
351
352
        sleep(1)

353
        self.start_controller(port=50999)
354
355
356
357

        self.wait_for_worker_connection(WORKER1)


358
359
360
361
362
363
364
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

365
        self.start_worker(WORKER1, 'tcp://127.0.0.1:50999')
366
367

        message = controller.queue.get()
368
        self.assertEqual(message, 'READY %s' % WORKER1)
369
370
371
372
373
374
375
376
377
378
379
380

        controller.queue.put('STOP')

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

        message = controller.queue.get()
381
        self.assertEqual(message, 'READY %s' % WORKER1)
382
383
384
385

        controller.queue.put('STOP')


386
387
388
389
390
391
392
393
394
395
396
#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
397

398
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
399

400
401
        self.prepare_databases(CONFIGURATION1)

Philip ABBET's avatar
Philip ABBET committed
402

403
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
404
405
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
406

Philip ABBET's avatar
Philip ABBET committed
407
408
409
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
410

411
412
413
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
414
        return message
Philip ABBET's avatar
Philip ABBET committed
415
416


Philip ABBET's avatar
Philip ABBET committed
417
418
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
419

Philip ABBET's avatar
Philip ABBET committed
420
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
421

Philip ABBET's avatar
Philip ABBET committed
422
423
424
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
427

Philip ABBET's avatar
Philip ABBET committed
428
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
429
430


Philip ABBET's avatar
Philip ABBET committed
431
432
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
433

Philip ABBET's avatar
Philip ABBET committed
434
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
435

Philip ABBET's avatar
Philip ABBET committed
436
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
437
438


Philip ABBET's avatar
Philip ABBET committed
439
440
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
441
        config['algorithm'] = 'legacy/process_crash/1'
Philip ABBET's avatar
Philip ABBET committed
442

Philip ABBET's avatar
Philip ABBET committed
443
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
444

Philip ABBET's avatar
Philip ABBET committed
445
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
446

Philip ABBET's avatar
Philip ABBET committed
447
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
448
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
449
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
450

Philip ABBET's avatar
Philip ABBET committed
451
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
452

Philip ABBET's avatar
Philip ABBET committed
453
454
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
455
456


Philip ABBET's avatar
Philip ABBET committed
457
458
459
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
460

Philip ABBET's avatar
Philip ABBET committed
461
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
462

Philip ABBET's avatar
Philip ABBET committed
463
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
464

Philip ABBET's avatar
Philip ABBET committed
465
466
467
468
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
469
470


Philip ABBET's avatar
Philip ABBET committed
471
472
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
473
        config['algorithm'] = 'legacy/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
474

Philip ABBET's avatar
Philip ABBET committed
475
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
476

Philip ABBET's avatar
Philip ABBET committed
477
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
478

Philip ABBET's avatar
Philip ABBET committed
479
        self.assertEqual(worker, WORKER1)
480
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
481
482
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
483
484


Philip ABBET's avatar
Philip ABBET committed
485
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
486
487
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
488

Philip ABBET's avatar
Philip ABBET committed
489
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
490
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
491

Philip ABBET's avatar
Philip ABBET committed
492
493
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
494

Philip ABBET's avatar
Philip ABBET committed
495
496
497
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
498

Philip ABBET's avatar
Philip ABBET committed
499
500
501
502
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
503

Philip ABBET's avatar
Philip ABBET committed
504
505
506
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
507
508


Philip ABBET's avatar
Philip ABBET committed
509
510
511
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
512

Philip ABBET's avatar
Philip ABBET committed
513
        self.controller.execute(WORKER1, 1, config)
514
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
515

Philip ABBET's avatar
Philip ABBET committed
516
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
517

Philip ABBET's avatar
Philip ABBET committed
518
519
520
521
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
522
523


524
525
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
526

Philip ABBET's avatar
Philip ABBET committed
527
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
528

Philip ABBET's avatar
Philip ABBET committed
529
530
531
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
532
        self.assertEqual(data[0], b"Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
533
534


Philip ABBET's avatar
Philip ABBET committed
535
536
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
537

538
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
539

Philip ABBET's avatar
Philip ABBET committed
540
541
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
542

543
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
544

545
546
547
548
549
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
550
551


Philip ABBET's avatar
Philip ABBET committed
552
553
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
554

Philip ABBET's avatar
Philip ABBET committed
555
556
557
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
558

Philip ABBET's avatar
Philip ABBET committed
559
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
560

Philip ABBET's avatar
Philip ABBET committed
561
562
563
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
564

Philip ABBET's avatar
Philip ABBET committed
565
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
566

Philip ABBET's avatar
Philip ABBET committed
567
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
568
569


Philip ABBET's avatar
Philip ABBET committed
570
571
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
572
573


Philip ABBET's avatar
Philip ABBET committed
574
575
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
576
577


Philip ABBET's avatar
Philip ABBET committed
578
579
580
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
581

Philip ABBET's avatar
Philip ABBET committed
582
583
584
585
586
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
587

Philip ABBET's avatar
Philip ABBET committed
588
589
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
590
591


Philip ABBET's avatar
Philip ABBET committed
592
593
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
594

Philip ABBET's avatar
Philip ABBET committed
595
596
597
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
598

Philip ABBET's avatar
Philip ABBET committed
599
600
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
601

Philip ABBET's avatar
Philip ABBET committed
602
603
604
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
605

Philip ABBET's avatar
Philip ABBET committed
606
607
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
608

Philip ABBET's avatar
Philip ABBET committed
609
        self.assertNotEqual(worker1, worker2)