test_worker.py 18 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
Philip ABBET's avatar
Philip ABBET committed
35
36
37
38
39
40
41
42


# Tests for experiment execution

import os
import logging
import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
43
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
44
import queue
Samuel GAIST's avatar
Samuel GAIST committed
45

Philip ABBET's avatar
Philip ABBET committed
46
from time import time
47
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
48
49
50

from ..scripts import worker
from ..worker import WorkerController
51
from ..database import Database
52
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
53
54
55

from . import prefix, tmp_prefix

Samuel GAIST's avatar
Samuel GAIST committed
56
57
58

logger = logging.getLogger(__name__)

Samuel GAIST's avatar
Samuel GAIST committed
59
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
60
61


Samuel GAIST's avatar
Samuel GAIST committed
62
63
WORKER1 = b"worker1"
WORKER2 = b"worker2"
64
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
65
66


Samuel GAIST's avatar
Samuel GAIST committed
67
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
68
69


Philip ABBET's avatar
Philip ABBET committed
70
CONFIGURATION1 = {
Samuel GAIST's avatar
Samuel GAIST committed
71
72
73
74
75
76
77
78
79
80
81
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
82
83
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
84
85
86
87
88
89
90
91
92
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
93
94
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
95
96
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
97
98
99
}


Samuel GAIST's avatar
Samuel GAIST committed
100
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
101
102


Philip ABBET's avatar
Philip ABBET committed
103
CONFIGURATION2 = {
Samuel GAIST's avatar
Samuel GAIST committed
104
105
106
107
108
109
110
111
112
113
114
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
115
116
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
117
118
119
120
121
122
123
124
125
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
126
127
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
128
129
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
130
131
132
}


Samuel GAIST's avatar
Samuel GAIST committed
133
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
134

Philip ABBET's avatar
Philip ABBET committed
135

136
137
138
139
140
141
142
class ControllerProcess(multiprocessing.Process):
    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue

    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
143
        self.queue.put("STARTED")
144
145

        def onWorkerReady(name):
Samuel GAIST's avatar
Samuel GAIST committed
146
            self.queue.put("READY %s" % name.decode())
147
148

        def onWorkerGone(name):
Samuel GAIST's avatar
Samuel GAIST committed
149
            self.queue.put("GONE %s" % name.decode())
150
151

        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
152
            "127.0.0.1",
153
            port=PORT,
Samuel GAIST's avatar
Samuel GAIST committed
154
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
155
156
157
158
159
160
161
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
Samuel GAIST's avatar
Samuel GAIST committed
162
                if command == "STOP":
163
                    break
Samuel GAIST's avatar
Samuel GAIST committed
164
            except queue.Empty:
165
166
167
168
169
                pass

        self.controller.destroy()


Samuel GAIST's avatar
Samuel GAIST committed
170
# ----------------------------------------------------------
171
172


Philip ABBET's avatar
Philip ABBET committed
173
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
174
175
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
176

Philip ABBET's avatar
Philip ABBET committed
177
178
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
179

Philip ABBET's avatar
Philip ABBET committed
180
    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
181
        self.queue.put("STARTED")
Philip ABBET's avatar
Philip ABBET committed
182
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
183
184


Samuel GAIST's avatar
Samuel GAIST committed
185
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
186

Philip ABBET's avatar
Philip ABBET committed
187

188
class TestWorkerBase(unittest.TestCase):
Samuel GAIST's avatar
Samuel GAIST committed
189
    def __init__(self, methodName="runTest"):
190
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
191
        self.controller = None
192
193
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
194
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
195

Philip ABBET's avatar
Philip ABBET committed
196
    def setUp(self):
197
198
199
200
201
202
203
204
205
206
207
        self.shutdown_everything()  # In case another test failed badly during its setUp()

    def tearDown(self):
        self.shutdown_everything()

    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
208

209
210
211
212
        self.stop_controller()

    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
213

Philip ABBET's avatar
Philip ABBET committed
214
        def onWorkerReady(name):
215
216
217
218
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
219

Philip ABBET's avatar
Philip ABBET committed
220
        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
221
            "127.0.0.1",
222
            port=port,
Samuel GAIST's avatar
Samuel GAIST committed
223
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
Philip ABBET's avatar
Philip ABBET committed
224
        )
Philip ABBET's avatar
Philip ABBET committed
225

226
227
228
229
230
231
232
233
        self.controller.process(100)

    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None

    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
234
        args = [
Samuel GAIST's avatar
Samuel GAIST committed
235
236
237
238
239
            "--prefix=%s" % prefix,
            "--cache=%s" % tmp_prefix,
            "--name=%s" % name.decode(),
            # '-vv',
            self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
240
        ]
Philip ABBET's avatar
Philip ABBET committed
241

Philip ABBET's avatar
Philip ABBET committed
242
        if self.docker:
Samuel GAIST's avatar
Samuel GAIST committed
243
            args.insert(3, "--docker")
Philip ABBET's avatar
Philip ABBET committed
244

245
246
247
248
249
250
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
251

252
253
254
255
256
257
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]

258
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
259
        start = time()
260
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
261
            self.assertTrue(self.controller.process(100) is None)
262
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
263

264
265
266
267
268
269
270
271
272
273
        self.assertTrue(name in self.controller.workers)

    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)

274
    def prepare_databases(self, configuration):
Samuel GAIST's avatar
Samuel GAIST committed
275
276
277
278
        for _, input_cfg in configuration["inputs"].items():
            database = Database(prefix, input_cfg["database"])
            view = database.view(input_cfg["protocol"], input_cfg["set"])
            view.index(os.path.join(tmp_prefix, input_cfg["path"]))
279
280


Samuel GAIST's avatar
Samuel GAIST committed
281
# ----------------------------------------------------------
282
283
284
285
286
287
288
289
290
291
292
293
294
295


class TestConnection(TestWorkerBase):
    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
296
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
297

298
299
300
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
301

302
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
303

Philip ABBET's avatar
Philip ABBET committed
304
305
        sleep(1)

306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)

    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)

    def test_scheduler_last(self):
Samuel GAIST's avatar
Samuel GAIST committed
326
        self.start_worker(WORKER1, address="tcp://127.0.0.1:%i" % PORT)
327
328
        sleep(1)

329
        self.start_controller(port=PORT)
330
331
332

        self.wait_for_worker_connection(WORKER1)

333
334
335
336
337
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
338
        self.assertEqual(message, "STARTED")
339

Samuel GAIST's avatar
Samuel GAIST committed
340
        self.start_worker(WORKER1, "tcp://127.0.0.1:%i" % PORT)
341
342

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
343
        self.assertEqual(message, "READY " + WORKER1.decode())
344

Samuel GAIST's avatar
Samuel GAIST committed
345
        controller.queue.put("STOP")
346
347
348
349
350
351
352

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
353
        self.assertEqual(message, "STARTED")
354
355

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
356
        self.assertEqual(message, "READY " + WORKER1.decode())
357

Samuel GAIST's avatar
Samuel GAIST committed
358
        controller.queue.put("STOP")
359
360


Samuel GAIST's avatar
Samuel GAIST committed
361
# ----------------------------------------------------------
362
363
364
365
366
367
368
369


class TestOneWorker(TestWorkerBase):
    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
370

371
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
372

373
374
375
        self.prepare_databases(CONFIGURATION1)

    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
376
377
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
378

Philip ABBET's avatar
Philip ABBET committed
379
380
381
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
382

383
384
385
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
386
        return message
Philip ABBET's avatar
Philip ABBET committed
387

Philip ABBET's avatar
Philip ABBET committed
388
    def _check_done(self, message, expected_worker, expected_job_id):
Samuel GAIST's avatar
Samuel GAIST committed
389
        self.assertIsNotNone(message)
Philip ABBET's avatar
Philip ABBET committed
390

Philip ABBET's avatar
Philip ABBET committed
391
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
392

Philip ABBET's avatar
Philip ABBET committed
393
394
395
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
396

Philip ABBET's avatar
Philip ABBET committed
397
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
398

Samuel GAIST's avatar
Samuel GAIST committed
399
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
400

Philip ABBET's avatar
Philip ABBET committed
401
402
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
403

Philip ABBET's avatar
Philip ABBET committed
404
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
405

Philip ABBET's avatar
Philip ABBET committed
406
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
407

Philip ABBET's avatar
Philip ABBET committed
408
409
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
410
        config["algorithm"] = "legacy/process_crash/1"
Philip ABBET's avatar
Philip ABBET committed
411

Philip ABBET's avatar
Philip ABBET committed
412
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
413

414
415
416
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
417

Philip ABBET's avatar
Philip ABBET committed
418
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
419
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
420
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
421

Philip ABBET's avatar
Philip ABBET committed
422
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
423

Samuel GAIST's avatar
Samuel GAIST committed
424
425
        self.assertEqual(result["status"], 1)
        self.assertTrue("a = b" in result["user_error"])
Philip ABBET's avatar
Philip ABBET committed
426

Philip ABBET's avatar
Philip ABBET committed
427
428
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
429
        config["algorithm"] = "user/unknown/1"
Philip ABBET's avatar
Philip ABBET committed
430

Philip ABBET's avatar
Philip ABBET committed
431
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
432

433
434
435
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
436

Philip ABBET's avatar
Philip ABBET committed
437
438
439
440
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
441

Philip ABBET's avatar
Philip ABBET committed
442
443
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
444
        config["algorithm"] = "legacy/syntax_error/1"
Philip ABBET's avatar
Philip ABBET committed
445

Philip ABBET's avatar
Philip ABBET committed
446
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
447

448
449
450
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
451

Philip ABBET's avatar
Philip ABBET committed
452
        self.assertEqual(worker, WORKER1)
453
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
454
455
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
456

Philip ABBET's avatar
Philip ABBET committed
457
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
458
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
459
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
460

Philip ABBET's avatar
Philip ABBET committed
461
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
462
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
463

Philip ABBET's avatar
Philip ABBET committed
464
465
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
466

Philip ABBET's avatar
Philip ABBET committed
467
468
469
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
470
471
472
473
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
474

Philip ABBET's avatar
Philip ABBET committed
475
476
477
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
478

Philip ABBET's avatar
Philip ABBET committed
479
480
    def test_cancel(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
481
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
482

Philip ABBET's avatar
Philip ABBET committed
483
        self.controller.execute(WORKER1, 1, config)
484
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
485

486
487
488
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
489

Philip ABBET's avatar
Philip ABBET committed
490
491
492
493
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
494

495
496
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
497

498
499
500
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
501

Philip ABBET's avatar
Philip ABBET committed
502
503
504
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
505
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
506
507


Samuel GAIST's avatar
Samuel GAIST committed
508
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
509

Philip ABBET's avatar
Philip ABBET committed
510

511
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
512
    def setUp(self):
Samuel GAIST's avatar
Samuel GAIST committed
513
        self.tearDown()  # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
514

515
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
516

517
518
519
520
521
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
522

Philip ABBET's avatar
Philip ABBET committed
523
524
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
525

Philip ABBET's avatar
Philip ABBET committed
526
527
528
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
529

530
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
531
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
532

Philip ABBET's avatar
Philip ABBET committed
533
534
535
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
536

Philip ABBET's avatar
Philip ABBET committed
537
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
538

Samuel GAIST's avatar
Samuel GAIST committed
539
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
540

Philip ABBET's avatar
Philip ABBET committed
541
542
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
543

Philip ABBET's avatar
Philip ABBET committed
544
545
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
546

Philip ABBET's avatar
Philip ABBET committed
547
548
549
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
550

Philip ABBET's avatar
Philip ABBET committed
551
552
553
554
555
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
556

Philip ABBET's avatar
Philip ABBET committed
557
            result = simplejson.loads(data[0])
Samuel GAIST's avatar
Samuel GAIST committed
558
            self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
559

Philip ABBET's avatar
Philip ABBET committed
560
561
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
562

Philip ABBET's avatar
Philip ABBET committed
563
564
565
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
566

Philip ABBET's avatar
Philip ABBET committed
567
568
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
569

Philip ABBET's avatar
Philip ABBET committed
570
571
572
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
573

Philip ABBET's avatar
Philip ABBET committed
574
575
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
576

Philip ABBET's avatar
Philip ABBET committed
577
        self.assertNotEqual(worker1, worker2)