test_worker.py 18.7 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
Philip ABBET's avatar
Philip ABBET committed
35 36 37 38 39 40


# Tests for experiment execution

import os
import unittest
41
import simplejson as json
42
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
43
import queue
44

Philip ABBET's avatar
Philip ABBET committed
45
from time import time
46
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
47

48 49 50
from ddt import ddt
from ddt import idata

Philip ABBET's avatar
Philip ABBET committed
51 52
from ..scripts import worker
from ..worker import WorkerController
53
from ..database import Database
54
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
55 56 57

from . import prefix, tmp_prefix

Samuel GAIST's avatar
Samuel GAIST committed
58

59
# ----------------------------------------------------------
60 61


62 63
WORKER1 = b"worker1"
WORKER2 = b"worker2"
64
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
65 66


67
# ----------------------------------------------------------
68

69 70
DATABASES = [f"integers_db/{i}" for i in range(1, 3)]

71

Philip ABBET's avatar
Philip ABBET committed
72
CONFIGURATION1 = {
73 74 75 76 77 78 79 80 81 82 83
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
84 85
        }
    },
86 87 88 89 90 91 92 93 94
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
95 96
        }
    },
97 98
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
99 100 101
}


102
# ----------------------------------------------------------
103 104


Philip ABBET's avatar
Philip ABBET committed
105
CONFIGURATION2 = {
106 107 108 109 110 111 112 113 114 115 116
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
117 118
        }
    },
119 120 121 122 123 124 125 126 127
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
128 129
        }
    },
130 131
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
132 133 134
}


135
# ----------------------------------------------------------
136

Philip ABBET's avatar
Philip ABBET committed
137

138 139 140 141 142 143 144 145 146 147 148 149 150
def prepare_database(db_name):
    CONFIGURATION1["inputs"]["in"]["database"] = db_name
    CONFIGURATION2["inputs"]["in"]["database"] = db_name

    for _, input_cfg in CONFIGURATION1["inputs"].items():
        database = Database(prefix, input_cfg["database"])
        view = database.view(input_cfg["protocol"], input_cfg["set"])
        view.index(os.path.join(tmp_prefix, input_cfg["path"]))


# ----------------------------------------------------------


151 152 153 154 155 156 157
class ControllerProcess(multiprocessing.Process):
    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue

    def run(self):
158
        self.queue.put("STARTED")
159 160

        def onWorkerReady(name):
161
            self.queue.put("READY %s" % name.decode())
162 163

        def onWorkerGone(name):
164
            self.queue.put("GONE %s" % name.decode())
165 166

        self.controller = WorkerController(
167
            "127.0.0.1",
168
            port=PORT,
169
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
170 171 172 173 174 175 176
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
177
                if command == "STOP":
178
                    break
Samuel GAIST's avatar
Samuel GAIST committed
179
            except queue.Empty:
180 181 182 183 184
                pass

        self.controller.destroy()


185
# ----------------------------------------------------------
186 187


188
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
189 190
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
191

Philip ABBET's avatar
Philip ABBET committed
192 193
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
194

Philip ABBET's avatar
Philip ABBET committed
195
    def run(self):
196
        self.queue.put("STARTED")
Philip ABBET's avatar
Philip ABBET committed
197
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
198 199


200
# ----------------------------------------------------------
201

Philip ABBET's avatar
Philip ABBET committed
202

203
class TestWorkerBase(unittest.TestCase):
204
    def __init__(self, methodName="runTest"):
205
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
206
        self.controller = None
207 208
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
209
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
210

Philip ABBET's avatar
Philip ABBET committed
211
    def setUp(self):
212 213 214 215 216 217 218 219 220 221 222
        self.shutdown_everything()  # In case another test failed badly during its setUp()

    def tearDown(self):
        self.shutdown_everything()

    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
223

224 225 226 227
        self.stop_controller()

    def start_controller(self, port=None):
        self.connected_workers = []
228

Philip ABBET's avatar
Philip ABBET committed
229
        def onWorkerReady(name):
230 231 232 233
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
234

Philip ABBET's avatar
Philip ABBET committed
235
        self.controller = WorkerController(
236
            "127.0.0.1",
237
            port=port,
238
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
Philip ABBET's avatar
Philip ABBET committed
239
        )
Philip ABBET's avatar
Philip ABBET committed
240

241 242 243 244 245 246 247 248
        self.controller.process(100)

    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None

    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
249
        args = [
250 251 252 253 254
            "--prefix=%s" % prefix,
            "--cache=%s" % tmp_prefix,
            "--name=%s" % name.decode(),
            # '-vv',
            self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
255
        ]
Philip ABBET's avatar
Philip ABBET committed
256

Philip ABBET's avatar
Philip ABBET committed
257
        if self.docker:
258
            args.insert(3, "--docker")
Philip ABBET's avatar
Philip ABBET committed
259

260 261 262 263 264 265
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
266

267 268 269 270 271 272
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]

273
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
274
        start = time()
275
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
276
            self.assertTrue(self.controller.process(100) is None)
277
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
278

279 280 281 282 283 284 285 286 287 288
        self.assertTrue(name in self.controller.workers)

    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)

289

290
# ----------------------------------------------------------
291 292 293 294 295 296 297 298 299 300 301 302 303 304


class TestConnection(TestWorkerBase):
    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
305
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
306

307 308 309
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
310

311
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
312

Philip ABBET's avatar
Philip ABBET committed
313 314
        sleep(1)

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)

    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)

    def test_scheduler_last(self):
335
        self.start_worker(WORKER1, address="tcp://127.0.0.1:%i" % PORT)
336 337
        sleep(1)

338
        self.start_controller(port=PORT)
339 340 341

        self.wait_for_worker_connection(WORKER1)

342 343 344 345 346
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
347
        self.assertEqual(message, "STARTED")
348

349
        self.start_worker(WORKER1, "tcp://127.0.0.1:%i" % PORT)
350 351

        message = controller.queue.get()
352
        self.assertEqual(message, "READY " + WORKER1.decode())
353

354
        controller.queue.put("STOP")
355 356 357 358 359 360 361

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
362
        self.assertEqual(message, "STARTED")
363 364

        message = controller.queue.get()
365
        self.assertEqual(message, "READY " + WORKER1.decode())
366

367
        controller.queue.put("STOP")
368 369


370
# ----------------------------------------------------------
371 372


373
@ddt
374 375 376 377 378 379
class TestOneWorker(TestWorkerBase):
    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
380

381
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
382

383
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
384 385
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
386

Philip ABBET's avatar
Philip ABBET committed
387 388 389
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
390

391 392 393
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
394
        return message
Philip ABBET's avatar
Philip ABBET committed
395

Philip ABBET's avatar
Philip ABBET committed
396
    def _check_done(self, message, expected_worker, expected_job_id):
Samuel GAIST's avatar
Samuel GAIST committed
397
        self.assertIsNotNone(message)
Philip ABBET's avatar
Philip ABBET committed
398

Philip ABBET's avatar
Philip ABBET committed
399
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
400

Philip ABBET's avatar
Philip ABBET committed
401 402 403
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
404

405
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
406

407
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
408

409 410 411 412
    @idata(DATABASES)
    def test_success(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
413
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
414

Philip ABBET's avatar
Philip ABBET committed
415
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
416

Philip ABBET's avatar
Philip ABBET committed
417
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
418

419 420 421 422
    @idata(DATABASES)
    def test_processing_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
423
        config = dict(CONFIGURATION1)
424
        config["algorithm"] = "legacy/process_crash/1"
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
427

428 429 430
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
431

Philip ABBET's avatar
Philip ABBET committed
432
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
433
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
434
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
435

436
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
437

438 439
        self.assertEqual(result["status"], 1)
        self.assertTrue("a = b" in result["user_error"])
Philip ABBET's avatar
Philip ABBET committed
440

441 442 443 444
    @idata(DATABASES)
    def test_error_unknown_algorithm(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
445
        config = dict(CONFIGURATION1)
446
        config["algorithm"] = "user/unknown/1"
Philip ABBET's avatar
Philip ABBET committed
447

Philip ABBET's avatar
Philip ABBET committed
448
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
449

450 451 452
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454 455 456 457
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
458

459 460 461 462
    @idata(DATABASES)
    def test_error_syntax_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
463
        config = dict(CONFIGURATION1)
464
        config["algorithm"] = "legacy/syntax_error/1"
Philip ABBET's avatar
Philip ABBET committed
465

Philip ABBET's avatar
Philip ABBET committed
466
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
467

468 469 470
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
471

Philip ABBET's avatar
Philip ABBET committed
472
        self.assertEqual(worker, WORKER1)
473
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
474 475
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
476

477 478 479 480
    @idata(DATABASES)
    def test_multiple_jobs(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
481
        config = dict(CONFIGURATION1)
482
        config["algorithm"] = "v1/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
483

Philip ABBET's avatar
Philip ABBET committed
484
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
485
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487 488
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490 491 492
        message = self._wait()
        self._check_done(message, WORKER1, 2)

493 494 495 496
    @idata(DATABASES)
    def test_reuse(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
497 498 499
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
500

Philip ABBET's avatar
Philip ABBET committed
501 502 503
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
504

505 506 507 508
    @idata(DATABASES)
    def test_cancel(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
509
        config = dict(CONFIGURATION1)
510
        config["algorithm"] = "v1/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
511

Philip ABBET's avatar
Philip ABBET committed
512
        self.controller.execute(WORKER1, 1, config)
513
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
514

515 516 517
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
518

Philip ABBET's avatar
Philip ABBET committed
519 520 521 522
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
523

524 525
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
526

527 528 529
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
530

Philip ABBET's avatar
Philip ABBET committed
531 532 533
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
534
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
535 536


537
# ----------------------------------------------------------
538

Philip ABBET's avatar
Philip ABBET committed
539

540
@ddt
541
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
542
    def setUp(self):
543
        self.tearDown()  # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
544

545
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
546

547 548 549 550 551
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
552

553 554 555
    def _test_success_one_worker(self, worker_name, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
556
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
557

Philip ABBET's avatar
Philip ABBET committed
558 559 560
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
561

562
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
563
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
564

Philip ABBET's avatar
Philip ABBET committed
565 566 567
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
568

569
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
570

571
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
572

573 574 575
    @idata(DATABASES)
    def test_success_worker1(self, db_name):
        self._test_success_one_worker(WORKER1, db_name)
Philip ABBET's avatar
Philip ABBET committed
576

577 578 579
    @idata(DATABASES)
    def test_success_worker2(self, db_name):
        self._test_success_one_worker(WORKER2, db_name)
Philip ABBET's avatar
Philip ABBET committed
580

581 582
    @idata(DATABASES)
    def test_success_both_workers(self, db_name):
Philip ABBET's avatar
Philip ABBET committed
583 584
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
585

Philip ABBET's avatar
Philip ABBET committed
586 587 588 589 590
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
591

592
            result = json.loads(data[0])
593
            self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
594

595 596
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
597 598
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
599

Philip ABBET's avatar
Philip ABBET committed
600 601 602
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
603

Philip ABBET's avatar
Philip ABBET committed
604 605
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
606

Philip ABBET's avatar
Philip ABBET committed
607 608 609
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
610

Philip ABBET's avatar
Philip ABBET committed
611 612
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
613

Philip ABBET's avatar
Philip ABBET committed
614
        self.assertNotEqual(worker1, worker2)