test_worker.py 18.8 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
Philip ABBET's avatar
Philip ABBET committed
35
36
37
38
39
40
41


# Tests for experiment execution

import os
import logging
import unittest
42
import simplejson as json
Philip ABBET's avatar
Philip ABBET committed
43
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
44
import queue
Samuel GAIST's avatar
Samuel GAIST committed
45

Philip ABBET's avatar
Philip ABBET committed
46
from time import time
47
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
48

49
50
51
from ddt import ddt
from ddt import idata

Philip ABBET's avatar
Philip ABBET committed
52
53
from ..scripts import worker
from ..worker import WorkerController
54
from ..database import Database
55
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
56
57
58

from . import prefix, tmp_prefix

Samuel GAIST's avatar
Samuel GAIST committed
59
60
61

logger = logging.getLogger(__name__)

Samuel GAIST's avatar
Samuel GAIST committed
62
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
63
64


Samuel GAIST's avatar
Samuel GAIST committed
65
66
WORKER1 = b"worker1"
WORKER2 = b"worker2"
67
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
68
69


Samuel GAIST's avatar
Samuel GAIST committed
70
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
71

72
73
DATABASES = [f"integers_db/{i}" for i in range(1, 3)]

Philip ABBET's avatar
Philip ABBET committed
74

Philip ABBET's avatar
Philip ABBET committed
75
CONFIGURATION1 = {
Samuel GAIST's avatar
Samuel GAIST committed
76
77
78
79
80
81
82
83
84
85
86
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
87
88
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
89
90
91
92
93
94
95
96
97
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
98
99
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
100
101
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
102
103
104
}


Samuel GAIST's avatar
Samuel GAIST committed
105
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
106
107


Philip ABBET's avatar
Philip ABBET committed
108
CONFIGURATION2 = {
Samuel GAIST's avatar
Samuel GAIST committed
109
110
111
112
113
114
115
116
117
118
119
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
120
121
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
122
123
124
125
126
127
128
129
130
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
131
132
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
133
134
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
135
136
137
}


Samuel GAIST's avatar
Samuel GAIST committed
138
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
139

Philip ABBET's avatar
Philip ABBET committed
140

141
142
143
144
145
146
147
148
149
150
151
152
153
def prepare_database(db_name):
    CONFIGURATION1["inputs"]["in"]["database"] = db_name
    CONFIGURATION2["inputs"]["in"]["database"] = db_name

    for _, input_cfg in CONFIGURATION1["inputs"].items():
        database = Database(prefix, input_cfg["database"])
        view = database.view(input_cfg["protocol"], input_cfg["set"])
        view.index(os.path.join(tmp_prefix, input_cfg["path"]))


# ----------------------------------------------------------


154
155
156
157
158
159
160
class ControllerProcess(multiprocessing.Process):
    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue

    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
161
        self.queue.put("STARTED")
162
163

        def onWorkerReady(name):
Samuel GAIST's avatar
Samuel GAIST committed
164
            self.queue.put("READY %s" % name.decode())
165
166

        def onWorkerGone(name):
Samuel GAIST's avatar
Samuel GAIST committed
167
            self.queue.put("GONE %s" % name.decode())
168
169

        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
170
            "127.0.0.1",
171
            port=PORT,
Samuel GAIST's avatar
Samuel GAIST committed
172
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
173
174
175
176
177
178
179
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
Samuel GAIST's avatar
Samuel GAIST committed
180
                if command == "STOP":
181
                    break
Samuel GAIST's avatar
Samuel GAIST committed
182
            except queue.Empty:
183
184
185
186
187
                pass

        self.controller.destroy()


Samuel GAIST's avatar
Samuel GAIST committed
188
# ----------------------------------------------------------
189
190


Philip ABBET's avatar
Philip ABBET committed
191
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
192
193
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
194

Philip ABBET's avatar
Philip ABBET committed
195
196
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
197

Philip ABBET's avatar
Philip ABBET committed
198
    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
199
        self.queue.put("STARTED")
Philip ABBET's avatar
Philip ABBET committed
200
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
201
202


Samuel GAIST's avatar
Samuel GAIST committed
203
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
204

Philip ABBET's avatar
Philip ABBET committed
205

206
class TestWorkerBase(unittest.TestCase):
Samuel GAIST's avatar
Samuel GAIST committed
207
    def __init__(self, methodName="runTest"):
208
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
209
        self.controller = None
210
211
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
212
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
213

Philip ABBET's avatar
Philip ABBET committed
214
    def setUp(self):
215
216
217
218
219
220
221
222
223
224
225
        self.shutdown_everything()  # In case another test failed badly during its setUp()

    def tearDown(self):
        self.shutdown_everything()

    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
226

227
228
229
230
        self.stop_controller()

    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
231

Philip ABBET's avatar
Philip ABBET committed
232
        def onWorkerReady(name):
233
234
235
236
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
237

Philip ABBET's avatar
Philip ABBET committed
238
        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
239
            "127.0.0.1",
240
            port=port,
Samuel GAIST's avatar
Samuel GAIST committed
241
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
Philip ABBET's avatar
Philip ABBET committed
242
        )
Philip ABBET's avatar
Philip ABBET committed
243

244
245
246
247
248
249
250
251
        self.controller.process(100)

    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None

    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
252
        args = [
Samuel GAIST's avatar
Samuel GAIST committed
253
254
255
256
257
            "--prefix=%s" % prefix,
            "--cache=%s" % tmp_prefix,
            "--name=%s" % name.decode(),
            # '-vv',
            self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
258
        ]
Philip ABBET's avatar
Philip ABBET committed
259

Philip ABBET's avatar
Philip ABBET committed
260
        if self.docker:
Samuel GAIST's avatar
Samuel GAIST committed
261
            args.insert(3, "--docker")
Philip ABBET's avatar
Philip ABBET committed
262

263
264
265
266
267
268
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
269

270
271
272
273
274
275
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]

276
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
277
        start = time()
278
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
279
            self.assertTrue(self.controller.process(100) is None)
280
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
281

282
283
284
285
286
287
288
289
290
291
        self.assertTrue(name in self.controller.workers)

    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)

292

Samuel GAIST's avatar
Samuel GAIST committed
293
# ----------------------------------------------------------
294
295
296
297
298
299
300
301
302
303
304
305
306
307


class TestConnection(TestWorkerBase):
    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
308
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
309

310
311
312
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
313

314
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
315

Philip ABBET's avatar
Philip ABBET committed
316
317
        sleep(1)

318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)

    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)

    def test_scheduler_last(self):
Samuel GAIST's avatar
Samuel GAIST committed
338
        self.start_worker(WORKER1, address="tcp://127.0.0.1:%i" % PORT)
339
340
        sleep(1)

341
        self.start_controller(port=PORT)
342
343
344

        self.wait_for_worker_connection(WORKER1)

345
346
347
348
349
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
350
        self.assertEqual(message, "STARTED")
351

Samuel GAIST's avatar
Samuel GAIST committed
352
        self.start_worker(WORKER1, "tcp://127.0.0.1:%i" % PORT)
353
354

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
355
        self.assertEqual(message, "READY " + WORKER1.decode())
356

Samuel GAIST's avatar
Samuel GAIST committed
357
        controller.queue.put("STOP")
358
359
360
361
362
363
364

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
365
        self.assertEqual(message, "STARTED")
366
367

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
368
        self.assertEqual(message, "READY " + WORKER1.decode())
369

Samuel GAIST's avatar
Samuel GAIST committed
370
        controller.queue.put("STOP")
371
372


Samuel GAIST's avatar
Samuel GAIST committed
373
# ----------------------------------------------------------
374
375


376
@ddt
377
378
379
380
381
382
class TestOneWorker(TestWorkerBase):
    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
383

384
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
385

386
    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
387
388
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
389

Philip ABBET's avatar
Philip ABBET committed
390
391
392
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
393

394
395
396
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
397
        return message
Philip ABBET's avatar
Philip ABBET committed
398

Philip ABBET's avatar
Philip ABBET committed
399
    def _check_done(self, message, expected_worker, expected_job_id):
Samuel GAIST's avatar
Samuel GAIST committed
400
        self.assertIsNotNone(message)
Philip ABBET's avatar
Philip ABBET committed
401

Philip ABBET's avatar
Philip ABBET committed
402
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
403

Philip ABBET's avatar
Philip ABBET committed
404
405
406
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
407

408
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
409

Samuel GAIST's avatar
Samuel GAIST committed
410
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
411

412
413
414
415
    @idata(DATABASES)
    def test_success(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
416
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
417

Philip ABBET's avatar
Philip ABBET committed
418
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
419

Philip ABBET's avatar
Philip ABBET committed
420
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
421

422
423
424
425
    @idata(DATABASES)
    def test_processing_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
426
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
427
        config["algorithm"] = "legacy/process_crash/1"
Philip ABBET's avatar
Philip ABBET committed
428

Philip ABBET's avatar
Philip ABBET committed
429
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
430

431
432
433
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
436
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
437
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
438

439
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
440

Samuel GAIST's avatar
Samuel GAIST committed
441
442
        self.assertEqual(result["status"], 1)
        self.assertTrue("a = b" in result["user_error"])
Philip ABBET's avatar
Philip ABBET committed
443

444
445
446
447
    @idata(DATABASES)
    def test_error_unknown_algorithm(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
448
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
449
        config["algorithm"] = "user/unknown/1"
Philip ABBET's avatar
Philip ABBET committed
450

Philip ABBET's avatar
Philip ABBET committed
451
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
452

453
454
455
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
456

Philip ABBET's avatar
Philip ABBET committed
457
458
459
460
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
461

462
463
464
465
    @idata(DATABASES)
    def test_error_syntax_error(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
466
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
467
        config["algorithm"] = "legacy/syntax_error/1"
Philip ABBET's avatar
Philip ABBET committed
468

Philip ABBET's avatar
Philip ABBET committed
469
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
470

471
472
473
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
474

Philip ABBET's avatar
Philip ABBET committed
475
        self.assertEqual(worker, WORKER1)
476
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
477
478
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
479

480
481
482
483
    @idata(DATABASES)
    def test_multiple_jobs(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
484
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
485
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
488
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
492

Philip ABBET's avatar
Philip ABBET committed
493
494
495
        message = self._wait()
        self._check_done(message, WORKER1, 2)

496
497
498
499
    @idata(DATABASES)
    def test_reuse(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
500
501
502
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
503

Philip ABBET's avatar
Philip ABBET committed
504
505
506
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
507

508
509
510
511
    @idata(DATABASES)
    def test_cancel(self, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
512
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
513
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
514

Philip ABBET's avatar
Philip ABBET committed
515
        self.controller.execute(WORKER1, 1, config)
516
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
517

518
519
520
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
521

Philip ABBET's avatar
Philip ABBET committed
522
523
524
525
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
526

527
528
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
529

530
531
532
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
533

Philip ABBET's avatar
Philip ABBET committed
534
535
536
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
537
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
538
539


Samuel GAIST's avatar
Samuel GAIST committed
540
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
541

Philip ABBET's avatar
Philip ABBET committed
542

543
@ddt
544
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
545
    def setUp(self):
Samuel GAIST's avatar
Samuel GAIST committed
546
        self.tearDown()  # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
547

548
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
549

550
551
552
553
554
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
555

556
557
558
    def _test_success_one_worker(self, worker_name, db_name):
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
559
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
560

Philip ABBET's avatar
Philip ABBET committed
561
562
563
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
564

565
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
566
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
567

Philip ABBET's avatar
Philip ABBET committed
568
569
570
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
571

572
        result = json.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
573

Samuel GAIST's avatar
Samuel GAIST committed
574
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
575

576
577
578
    @idata(DATABASES)
    def test_success_worker1(self, db_name):
        self._test_success_one_worker(WORKER1, db_name)
Philip ABBET's avatar
Philip ABBET committed
579

580
581
582
    @idata(DATABASES)
    def test_success_worker2(self, db_name):
        self._test_success_one_worker(WORKER2, db_name)
Philip ABBET's avatar
Philip ABBET committed
583

584
585
    @idata(DATABASES)
    def test_success_both_workers(self, db_name):
Philip ABBET's avatar
Philip ABBET committed
586
587
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
588

Philip ABBET's avatar
Philip ABBET committed
589
590
591
592
593
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
594

595
            result = json.loads(data[0])
Samuel GAIST's avatar
Samuel GAIST committed
596
            self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
597

598
599
        prepare_database(db_name)

Philip ABBET's avatar
Philip ABBET committed
600
601
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
602

Philip ABBET's avatar
Philip ABBET committed
603
604
605
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
606

Philip ABBET's avatar
Philip ABBET committed
607
608
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
609

Philip ABBET's avatar
Philip ABBET committed
610
611
612
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
613

Philip ABBET's avatar
Philip ABBET committed
614
615
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
616

Philip ABBET's avatar
Philip ABBET committed
617
        self.assertNotEqual(worker1, worker2)