test_worker.py 18.7 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
Philip ABBET's avatar
Philip ABBET committed
35
36
37
38
39
40


# Tests for experiment execution

import os
import unittest
41
import simplejson as json
Philip ABBET's avatar
Philip ABBET committed
42
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
43
import queue
Samuel GAIST's avatar
Samuel GAIST committed
44

Philip ABBET's avatar
Philip ABBET committed
45
from time import time
46
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
47

48
49
50
from ddt import ddt
from ddt import idata

Philip ABBET's avatar
Philip ABBET committed
51
52
from ..scripts import worker
from ..worker import WorkerController
53
from ..database import Database
54
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
55
56
57

from . import prefix, tmp_prefix

Samuel GAIST's avatar
Samuel GAIST committed
58

Samuel GAIST's avatar
Samuel GAIST committed
59
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
60
61


Samuel GAIST's avatar
Samuel GAIST committed
62
63
WORKER1 = b"worker1"
WORKER2 = b"worker2"
64
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
65
66


Samuel GAIST's avatar
Samuel GAIST committed
67
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
68

69
70
DATABASES = [f"integers_db/{i}" for i in range(1, 3)]

Philip ABBET's avatar
Philip ABBET committed
71

Philip ABBET's avatar
Philip ABBET committed
72
CONFIGURATION1 = {
Samuel GAIST's avatar
Samuel GAIST committed
73
74
75
76
77
78
79
80
81
82
83
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
84
85
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
86
87
88
89
90
91
92
93
94
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
95
96
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
97
98
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
99
100
101
}


Samuel GAIST's avatar
Samuel GAIST committed
102
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
103
104


Philip ABBET's avatar
Philip ABBET committed
105
CONFIGURATION2 = {
Samuel GAIST's avatar
Samuel GAIST committed
106
107
108
109
110
111
112
113
114
115
116
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
117
118
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
119
120
121
122
123
124
125
126
127
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
128
129
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
130
131
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
132
133
134
}


Samuel GAIST's avatar
Samuel GAIST committed
135
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
136

Philip ABBET's avatar
Philip ABBET committed
137

138
139
140
141
142
143
144
145
146
147
148
149
150
def prepare_database(db_name):
    CONFIGURATION1["inputs"]["in"]["database"] = db_name
    CONFIGURATION2["inputs"]["in"]["database"] = db_name

    for _, input_cfg in CONFIGURATION1["inputs"].items():
        database = Database(prefix, input_cfg["database"])
        view = database.view(input_cfg["protocol"], input_cfg["set"])
        view.index(os.path.join(tmp_prefix, input_cfg["path"]))


# ----------------------------------------------------------


151
152
153
154
155
156
157
class ControllerProcess(multiprocessing.Process):
    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue

    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
158
        self.queue.put("STARTED")
159
160

        def onWorkerReady(name):
Samuel GAIST's avatar
Samuel GAIST committed
161
            self.queue.put("READY %s" % name.decode())
162
163

        def onWorkerGone(name):
Samuel GAIST's avatar
Samuel GAIST committed
164
            self.queue.put("GONE %s" % name.decode())
165
166

        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
167
            "127.0.0.1",
168
            port=PORT,
Samuel GAIST's avatar
Samuel GAIST committed
169
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
170
171
172
173
174
175
176
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
Samuel GAIST's avatar
Samuel GAIST committed
177
                if command == "STOP":
178
                    break
Samuel GAIST's avatar
Samuel GAIST committed
179
            except queue.Empty:
180
181
182
183
184
                pass

        self.controller.destroy()


Samuel GAIST's avatar
Samuel GAIST committed
185
# ----------------------------------------------------------
186
187


Philip ABBET's avatar
Philip ABBET committed
188
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
189
190
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
191

Philip ABBET's avatar
Philip ABBET committed
192
193
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
194

Philip ABBET's avatar
Philip ABBET committed
195
    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
196
        self.queue.put("STARTED")
Philip ABBET's avatar
Philip ABBET committed
197
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
198
199


Samuel GAIST's avatar
Samuel GAIST committed
200
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
201

Philip ABBET's avatar
Philip ABBET committed
202

203
class TestWorkerBase(unittest.TestCase):
Samuel GAIST's avatar
Samuel GAIST committed
204
    def __init__(self, methodName="runTest"):
205
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
206
        self.controller = None
207
208
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
209
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
210

Philip ABBET's avatar
Philip ABBET committed
211
    def setUp(self):
212
213
214
215
216
217
218
219
220
221
222
        self.shutdown_everything()  # In case another test failed badly during its setUp()

    def tearDown(self):
        self.shutdown_everything()

    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
223

224
225
226
227
        self.stop_controller()

    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
228

Philip ABBET's avatar
Philip ABBET committed
229
        def onWorkerReady(name):
230
231
232
233
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
234

Philip ABBET's avatar
Philip ABBET committed
235
        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
236
            "127.0.0.1",
237
            port=port,
Samuel GAIST's avatar
Samuel GAIST committed
238
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
Philip ABBET's avatar
Philip ABBET committed
239
        )
Philip ABBET's avatar
Philip ABBET committed
240

241
242
243
244
245
246
247
248
        self.controller.process(100)

    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None

    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
249
        args = [
Samuel GAIST's avatar
Samuel GAIST committed
250
251
252
253
254
            "--prefix=%s" % prefix,
            "--cache=%s" % tmp_prefix,
            "--name=%s" % name.decode(),
            # '-vv',
            self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
255
        ]
Philip ABBET's avatar
Philip ABBET committed
256

Philip ABBET's avatar
Philip ABBET committed
257
        if self.docker:
Samuel GAIST's avatar
Samuel GAIST committed
258
            args.insert(3, "--docker")
Philip ABBET's avatar
Philip ABBET committed
259

260
261
262
263
264
265
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
266

267
268
269
270
271
272
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]

273
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
274
        start = time()
275
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
276
            self.assertTrue(self.controller.process(100) is None)
277
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
278

279
280
281
282
283
284
285
286
287
288
        self.assertTrue(name in self.controller.workers)

    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)

289

Samuel GAIST's avatar
Samuel GAIST committed
290
# ----------------------------------------------------------
291
292
293
294
295
296
297
298
299
300
301
302
303
304


class TestConnection(TestWorkerBase):
    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
305
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
306

307
308
309
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
310

311
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
312

Philip ABBET's avatar
Philip ABBET committed
313
314
        sleep(1)

315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)

    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)

    def test_scheduler_last(self):
Samuel GAIST's avatar
Samuel GAIST committed
335
        self.start_worker(WORKER1, address="tcp://127.0.0.1:%i" % PORT)
336
337
        sleep(1)

338
        self.start_controller(port=PORT)
339
340
341

        self.wait_for_worker_connection(WORKER1)

342
343
344
345
346
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
347
        self.assertEqual(message, "STARTED")
348

Samuel GAIST's avatar
Samuel GAIST committed
349
        self.start_worker(WORKER1, "tcp://127.0.0.1:%i" % PORT)
350
351

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
352
        self.assertEqual(message, "READY " + WORKER1.decode())
353

Samuel GAIST's avatar
Samuel GAIST committed
354
        controller.queue.put("STOP")
355
356
357
358
359
360
361

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
362
        self.assertEqual(message, "STARTED")
363
364

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
365
        self.assertEqual(message, "READY " + WORKER1.decode())
366

Samuel GAIST's avatar
Samuel GAIST committed
367
        controller.queue.put("STOP")
368
369


Samuel GAIST's avatar
Samuel GAIST committed
370
# ----------------------------------------------------------
371
372


373
@ddt
374
375
376
377
378
379
class TestOneWorker(TestWorkerBase):
    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
380

381
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
382

383
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
384
385
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
386

Philip ABBET's avatar
Philip ABBET committed
387
388
389
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
390

391
392
393
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
394
        return message
Philip ABBET's avatar
Philip ABBET committed
395

Philip ABBET's avatar
Philip ABBET committed
396
    def _check_done(self, message, expected_worker, expected_job_id):
Samuel GAIST's avatar
Samuel GAIST committed
397
        self.assertIsNotNone(message)
Philip ABBET's avatar
Philip ABBET committed
398

Philip ABBET's avatar
Philip ABBET committed
399
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
400

Philip ABBET's avatar
Philip ABBET committed
401
402
403
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
404

405
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
406

Samuel GAIST's avatar
Samuel GAIST committed
407
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
408

409
410
411
412
    @idata(DATABASES)
    def test_success(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
413
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
414

Philip ABBET's avatar
Philip ABBET committed
415
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
416

Philip ABBET's avatar
Philip ABBET committed
417
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
418

419
420
421
422
    @idata(DATABASES)
    def test_processing_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
423
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
424
        config["algorithm"] = "legacy/process_crash/1"
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
427

428
429
430
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
431

Philip ABBET's avatar
Philip ABBET committed
432
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
433
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
434
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
435

436
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
437

Samuel GAIST's avatar
Samuel GAIST committed
438
439
        self.assertEqual(result["status"], 1)
        self.assertTrue("a = b" in result["user_error"])
Philip ABBET's avatar
Philip ABBET committed
440

441
442
443
444
    @idata(DATABASES)
    def test_error_unknown_algorithm(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
445
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
446
        config["algorithm"] = "user/unknown/1"
Philip ABBET's avatar
Philip ABBET committed
447

Philip ABBET's avatar
Philip ABBET committed
448
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
449

450
451
452
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454
455
456
457
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
458

459
460
461
462
    @idata(DATABASES)
    def test_error_syntax_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
463
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
464
        config["algorithm"] = "legacy/syntax_error/1"
Philip ABBET's avatar
Philip ABBET committed
465

Philip ABBET's avatar
Philip ABBET committed
466
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
467

468
469
470
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
471

Philip ABBET's avatar
Philip ABBET committed
472
        self.assertEqual(worker, WORKER1)
473
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
474
475
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
476

477
478
479
480
    @idata(DATABASES)
    def test_multiple_jobs(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
481
        config = dict(CONFIGURATION1)
482
        config["algorithm"] = "v1/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
483

Philip ABBET's avatar
Philip ABBET committed
484
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
485
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
488
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
492
        message = self._wait()
        self._check_done(message, WORKER1, 2)

493
494
495
496
    @idata(DATABASES)
    def test_reuse(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
497
498
499
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
500

Philip ABBET's avatar
Philip ABBET committed
501
502
503
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
504

505
506
507
508
    @idata(DATABASES)
    def test_cancel(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
509
        config = dict(CONFIGURATION1)
510
        config["algorithm"] = "v1/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
511

Philip ABBET's avatar
Philip ABBET committed
512
        self.controller.execute(WORKER1, 1, config)
513
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
514

515
516
517
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
518

Philip ABBET's avatar
Philip ABBET committed
519
520
521
522
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
523

524
525
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
526

527
528
529
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
530

Philip ABBET's avatar
Philip ABBET committed
531
532
533
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
534
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
535
536


Samuel GAIST's avatar
Samuel GAIST committed
537
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
538

Philip ABBET's avatar
Philip ABBET committed
539

540
@ddt
541
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
542
    def setUp(self):
Samuel GAIST's avatar
Samuel GAIST committed
543
        self.tearDown()  # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
544

545
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
546

547
548
549
550
551
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
552

553
554
555
    def _test_success_one_worker(self, worker_name, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
556
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
557

Philip ABBET's avatar
Philip ABBET committed
558
559
560
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
561

562
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
563
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
564

Philip ABBET's avatar
Philip ABBET committed
565
566
567
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
568

569
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
570

Samuel GAIST's avatar
Samuel GAIST committed
571
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
572

573
574
575
    @idata(DATABASES)
    def test_success_worker1(self, db_name):
        self._test_success_one_worker(WORKER1, db_name)
Philip ABBET's avatar
Philip ABBET committed
576

577
578
579
    @idata(DATABASES)
    def test_success_worker2(self, db_name):
        self._test_success_one_worker(WORKER2, db_name)
Philip ABBET's avatar
Philip ABBET committed
580

581
582
    @idata(DATABASES)
    def test_success_both_workers(self, db_name):
Philip ABBET's avatar
Philip ABBET committed
583
584
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
585

Philip ABBET's avatar
Philip ABBET committed
586
587
588
589
590
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
591

592
            result = json.loads(data[0])
Samuel GAIST's avatar
Samuel GAIST committed
593
            self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
594

595
596
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
597
598
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
599

Philip ABBET's avatar
Philip ABBET committed
600
601
602
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
603

Philip ABBET's avatar
Philip ABBET committed
604
605
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
606

Philip ABBET's avatar
Philip ABBET committed
607
608
609
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
610

Philip ABBET's avatar
Philip ABBET committed
611
612
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
613

Philip ABBET's avatar
Philip ABBET committed
614
        self.assertNotEqual(worker1, worker2)