test_worker.py 17.4 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
39
40
41
42
try:
    import Queue
except ImportError:
    import queue as Queue
Philip ABBET's avatar
Philip ABBET committed
43
from time import time
44
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
45
46
47

from ..scripts import worker
from ..worker import WorkerController
48
from ..database import Database
49
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
50
51
52

from . import prefix, tmp_prefix

Philip ABBET's avatar
Philip ABBET committed
53
54
55
#----------------------------------------------------------


56
57
WORKER1 = b'worker1'
WORKER2 = b'worker2'
58
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
59
60


Philip ABBET's avatar
Philip ABBET committed
61
62
63
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
64
CONFIGURATION1 = {
65
66
    'queue': 'queue',
    'inputs': {
67
        'in': {
68
69
70
71
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
72
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
73
74
75
76
77
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
78
    'algorithm': 'legacy/echo/1',
79
80
81
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
82
        'version': '1.3.0'
83
84
    },
    'outputs': {
85
        'out': {
86
            'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
87
            'endpoint': 'out',
88
89
90
91
92
93
            'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
94
95
96
}


Philip ABBET's avatar
Philip ABBET committed
97
98
99
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
100
CONFIGURATION2 = {
101
102
    'queue': 'queue',
    'inputs': {
103
        'in': {
104
105
106
107
            'set': 'double',
            'protocol': 'double',
            'database': 'integers_db/1',
            'output': 'a',
108
            'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db',
109
110
111
112
113
            'endpoint': 'a',
            'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
            'channel': 'integers'
        }
    },
114
    'algorithm': 'legacy/echo/1',
115
116
117
    'parameters': {},
    'environment': {
        'name': 'Python 2.7',
118
        'version': '1.3.0'
119
120
    },
    'outputs': {
121
        'out': {
122
            'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
123
            'endpoint': 'out',
124
125
126
127
128
129
            'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
            'channel': 'integers'
        }
    },
    'nb_slots': 1,
    'channel': 'integers'
Philip ABBET's avatar
Philip ABBET committed
130
131
132
}


Philip ABBET's avatar
Philip ABBET committed
133
134
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
135

136
137
138
139
140
141
142
143
144
145
146
147
class ControllerProcess(multiprocessing.Process):

    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue


    def run(self):
        self.queue.put('STARTED')

        def onWorkerReady(name):
André Anjos's avatar
André Anjos committed
148
            self.queue.put('READY %s' % name.decode())
149
150

        def onWorkerGone(name):
André Anjos's avatar
André Anjos committed
151
            self.queue.put('GONE %s' % name.decode())
152
153
154

        self.controller = WorkerController(
            '127.0.0.1',
155
            port=PORT,
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
                if command == 'STOP':
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
178
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
181
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
182

Philip ABBET's avatar
Philip ABBET committed
183
184
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
185
186


Philip ABBET's avatar
Philip ABBET committed
187
188
189
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
190
191


Philip ABBET's avatar
Philip ABBET committed
192
193
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
194

195
class TestWorkerBase(unittest.TestCase):
Philip ABBET's avatar
Philip ABBET committed
196

Philip ABBET's avatar
Philip ABBET committed
197
    def __init__(self, methodName='runTest'):
198
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
199
        self.controller = None
200
201
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
202
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
203
204


Philip ABBET's avatar
Philip ABBET committed
205
    def setUp(self):
206
207
208
209
210
211
212
213
214
215
216
217
218
        self.shutdown_everything()  # In case another test failed badly during its setUp()


    def tearDown(self):
        self.shutdown_everything()


    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
219

220
221
222
223
224
        self.stop_controller()


    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
225

Philip ABBET's avatar
Philip ABBET committed
226
        def onWorkerReady(name):
227
228
229
230
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
231

Philip ABBET's avatar
Philip ABBET committed
232
        self.controller = WorkerController(
233
234
235
236
237
238
            '127.0.0.1',
            port=port,
            callbacks=dict(
                onWorkerReady = onWorkerReady,
                onWorkerGone = onWorkerGone,
            )
Philip ABBET's avatar
Philip ABBET committed
239
        )
Philip ABBET's avatar
Philip ABBET committed
240

241
242
243
244
245
246
247
248
249
250
        self.controller.process(100)


    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None


    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
251
        args = [
252
          '--prefix=%s' % prefix,
Philip ABBET's avatar
Philip ABBET committed
253
          '--cache=%s' % tmp_prefix,
André Anjos's avatar
André Anjos committed
254
          '--name=%s' % name.decode(),
255
          # '-vv',
256
          self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
257
        ]
Philip ABBET's avatar
Philip ABBET committed
258

Philip ABBET's avatar
Philip ABBET committed
259
260
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
261

262
263
264
265
266
267
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
268
269


270
271
272
273
274
275
276
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]


277
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
278
        start = time()
279
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
280
            self.assertTrue(self.controller.process(100) is None)
281
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
282

283
284
285
286
287
288
289
290
291
292
293
294
        self.assertTrue(name in self.controller.workers)


    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)


295
296
297
298
299
300
301
    def prepare_databases(self, configuration):
        for _, input_cfg in configuration['inputs'].items():
            database = Database(prefix, input_cfg['database'])
            view = database.view(input_cfg['protocol'], input_cfg['set'])
            view.index(os.path.join(tmp_prefix, input_cfg['path']))


302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#----------------------------------------------------------


class TestConnection(TestWorkerBase):

    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
318
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
319

Philip ABBET's avatar
Philip ABBET committed
320

321
322
323
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
324

325
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
326

Philip ABBET's avatar
Philip ABBET committed
327
328
        sleep(1)

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)


    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)


    def test_scheduler_last(self):
351
        self.start_worker(WORKER1, address='tcp://127.0.0.1:%i' % PORT)
352
353
        sleep(1)

354
        self.start_controller(port=PORT)
355
356
357
358

        self.wait_for_worker_connection(WORKER1)


359
360
361
362
363
364
365
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

366
        self.start_worker(WORKER1, 'tcp://127.0.0.1:%i' % PORT)
367
368

        message = controller.queue.get()
André Anjos's avatar
André Anjos committed
369
        self.assertEqual(message, 'READY ' + WORKER1.decode())
370
371
372
373
374
375
376
377
378
379
380
381

        controller.queue.put('STOP')

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
        self.assertEqual(message, 'STARTED')

        message = controller.queue.get()
André Anjos's avatar
André Anjos committed
382
        self.assertEqual(message, 'READY ' + WORKER1.decode())
383
384
385
386

        controller.queue.put('STOP')


387
388
389
390
391
392
393
394
395
396
397
#----------------------------------------------------------


class TestOneWorker(TestWorkerBase):


    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
398

399
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
400

401
402
        self.prepare_databases(CONFIGURATION1)

Philip ABBET's avatar
Philip ABBET committed
403

404
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
405
406
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
407

Philip ABBET's avatar
Philip ABBET committed
408
409
410
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
411

412
413
414
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
415
        return message
Philip ABBET's avatar
Philip ABBET committed
416
417


Philip ABBET's avatar
Philip ABBET committed
418
419
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
420

Philip ABBET's avatar
Philip ABBET committed
421
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
422

Philip ABBET's avatar
Philip ABBET committed
423
424
425
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
426

Philip ABBET's avatar
Philip ABBET committed
427
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
428

Philip ABBET's avatar
Philip ABBET committed
429
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
430
431


Philip ABBET's avatar
Philip ABBET committed
432
433
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
436

Philip ABBET's avatar
Philip ABBET committed
437
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
438
439


Philip ABBET's avatar
Philip ABBET committed
440
441
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
442
        config['algorithm'] = 'legacy/process_crash/1'
Philip ABBET's avatar
Philip ABBET committed
443

Philip ABBET's avatar
Philip ABBET committed
444
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
445

446
447
448
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
449

Philip ABBET's avatar
Philip ABBET committed
450
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
451
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
452
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
455

Philip ABBET's avatar
Philip ABBET committed
456
457
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
458
459


Philip ABBET's avatar
Philip ABBET committed
460
461
462
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
463

Philip ABBET's avatar
Philip ABBET committed
464
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
465

466
467
468
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
469

Philip ABBET's avatar
Philip ABBET committed
470
471
472
473
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
474
475


Philip ABBET's avatar
Philip ABBET committed
476
477
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
478
        config['algorithm'] = 'legacy/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
479

Philip ABBET's avatar
Philip ABBET committed
480
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
481

482
483
484
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
485

Philip ABBET's avatar
Philip ABBET committed
486
        self.assertEqual(worker, WORKER1)
487
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
488
489
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
490
491


Philip ABBET's avatar
Philip ABBET committed
492
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
493
494
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
495

Philip ABBET's avatar
Philip ABBET committed
496
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
497
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
498

Philip ABBET's avatar
Philip ABBET committed
499
500
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
501

Philip ABBET's avatar
Philip ABBET committed
502
503
504
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
505

Philip ABBET's avatar
Philip ABBET committed
506
507
508
509
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
510

Philip ABBET's avatar
Philip ABBET committed
511
512
513
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
514
515


Philip ABBET's avatar
Philip ABBET committed
516
517
518
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
519

Philip ABBET's avatar
Philip ABBET committed
520
        self.controller.execute(WORKER1, 1, config)
521
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
522

523
524
525
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
526

Philip ABBET's avatar
Philip ABBET committed
527
528
529
530
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
531
532


533
534
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
535

536
537
538
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
539

Philip ABBET's avatar
Philip ABBET committed
540
541
542
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
543
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
544
545


Philip ABBET's avatar
Philip ABBET committed
546
547
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
548

549
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
550

Philip ABBET's avatar
Philip ABBET committed
551
552
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
553

554
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
555

556
557
558
559
560
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
561
562


Philip ABBET's avatar
Philip ABBET committed
563
564
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
565

Philip ABBET's avatar
Philip ABBET committed
566
567
568
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
569

570
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
571
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
572

Philip ABBET's avatar
Philip ABBET committed
573
574
575
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
576

Philip ABBET's avatar
Philip ABBET committed
577
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
578

Philip ABBET's avatar
Philip ABBET committed
579
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
580
581


Philip ABBET's avatar
Philip ABBET committed
582
583
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
584
585


Philip ABBET's avatar
Philip ABBET committed
586
587
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
588
589


Philip ABBET's avatar
Philip ABBET committed
590
591
592
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
593

Philip ABBET's avatar
Philip ABBET committed
594
595
596
597
598
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
599

Philip ABBET's avatar
Philip ABBET committed
600
601
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
602
603


Philip ABBET's avatar
Philip ABBET committed
604
605
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
606

Philip ABBET's avatar
Philip ABBET committed
607
608
609
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
610

Philip ABBET's avatar
Philip ABBET committed
611
612
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
613

Philip ABBET's avatar
Philip ABBET committed
614
615
616
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
617

Philip ABBET's avatar
Philip ABBET committed
618
619
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
620

Philip ABBET's avatar
Philip ABBET committed
621
        self.assertNotEqual(worker1, worker2)