test_worker.py 18 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
Philip ABBET's avatar
Philip ABBET committed
35
36
37
38
39


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
40

Philip ABBET's avatar
Philip ABBET committed
41
import logging
Samuel GAIST's avatar
Samuel GAIST committed
42

Philip ABBET's avatar
Philip ABBET committed
43
44
45
46
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
47
import multiprocessing
Samuel GAIST's avatar
Samuel GAIST committed
48

Samuel GAIST's avatar
Samuel GAIST committed
49
50
51
52
try:
    import Queue
except ImportError:
    import queue as Queue
Philip ABBET's avatar
Philip ABBET committed
53
from time import time
54
from time import sleep
Philip ABBET's avatar
Philip ABBET committed
55
56
57

from ..scripts import worker
from ..worker import WorkerController
58
from ..database import Database
59
from ..utils import find_free_port
Philip ABBET's avatar
Philip ABBET committed
60
61
62

from . import prefix, tmp_prefix

Samuel GAIST's avatar
Samuel GAIST committed
63
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
64
65


Samuel GAIST's avatar
Samuel GAIST committed
66
67
WORKER1 = b"worker1"
WORKER2 = b"worker2"
68
PORT = find_free_port()
Philip ABBET's avatar
Philip ABBET committed
69
70


Samuel GAIST's avatar
Samuel GAIST committed
71
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
72
73


Philip ABBET's avatar
Philip ABBET committed
74
CONFIGURATION1 = {
Samuel GAIST's avatar
Samuel GAIST committed
75
76
77
78
79
80
81
82
83
84
85
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
86
87
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
88
89
90
91
92
93
94
95
96
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
97
98
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
99
100
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
101
102
103
}


Samuel GAIST's avatar
Samuel GAIST committed
104
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
105
106


Philip ABBET's avatar
Philip ABBET committed
107
CONFIGURATION2 = {
Samuel GAIST's avatar
Samuel GAIST committed
108
109
110
111
112
113
114
115
116
117
118
    "queue": "queue",
    "inputs": {
        "in": {
            "set": "double",
            "protocol": "double",
            "database": "integers_db/1",
            "output": "a",
            "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db",
            "endpoint": "a",
            "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55",
            "channel": "integers",
119
120
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
121
122
123
124
125
126
127
128
129
    "algorithm": "legacy/echo/1",
    "parameters": {},
    "environment": {"name": "Python 2.7", "version": "1.3.0"},
    "outputs": {
        "out": {
            "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "endpoint": "out",
            "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681",
            "channel": "integers",
130
131
        }
    },
Samuel GAIST's avatar
Samuel GAIST committed
132
133
    "nb_slots": 1,
    "channel": "integers",
Philip ABBET's avatar
Philip ABBET committed
134
135
136
}


Samuel GAIST's avatar
Samuel GAIST committed
137
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
138

Philip ABBET's avatar
Philip ABBET committed
139

140
141
142
143
144
145
146
class ControllerProcess(multiprocessing.Process):
    def __init__(self, queue):
        super(ControllerProcess, self).__init__()

        self.queue = queue

    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
147
        self.queue.put("STARTED")
148
149

        def onWorkerReady(name):
Samuel GAIST's avatar
Samuel GAIST committed
150
            self.queue.put("READY %s" % name.decode())
151
152

        def onWorkerGone(name):
Samuel GAIST's avatar
Samuel GAIST committed
153
            self.queue.put("GONE %s" % name.decode())
154
155

        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
156
            "127.0.0.1",
157
            port=PORT,
Samuel GAIST's avatar
Samuel GAIST committed
158
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
159
160
161
162
163
164
165
        )

        while True:
            self.controller.process(100)

            try:
                command = self.queue.get_nowait()
Samuel GAIST's avatar
Samuel GAIST committed
166
                if command == "STOP":
167
168
169
170
171
172
173
                    break
            except Queue.Empty:
                pass

        self.controller.destroy()


Samuel GAIST's avatar
Samuel GAIST committed
174
# ----------------------------------------------------------
175
176


Philip ABBET's avatar
Philip ABBET committed
177
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
178
179
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
180

Philip ABBET's avatar
Philip ABBET committed
181
182
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
183

Philip ABBET's avatar
Philip ABBET committed
184
    def run(self):
Samuel GAIST's avatar
Samuel GAIST committed
185
        self.queue.put("STARTED")
Philip ABBET's avatar
Philip ABBET committed
186
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
187
188


Samuel GAIST's avatar
Samuel GAIST committed
189
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
190

Philip ABBET's avatar
Philip ABBET committed
191

192
class TestWorkerBase(unittest.TestCase):
Samuel GAIST's avatar
Samuel GAIST committed
193
    def __init__(self, methodName="runTest"):
194
        super(TestWorkerBase, self).__init__(methodName)
Philip ABBET's avatar
Philip ABBET committed
195
        self.controller = None
196
197
        self.connected_workers = []
        self.worker_processes = {}
Philip ABBET's avatar
Philip ABBET committed
198
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
199

Philip ABBET's avatar
Philip ABBET committed
200
    def setUp(self):
201
202
203
204
205
206
207
208
209
210
211
        self.shutdown_everything()  # In case another test failed badly during its setUp()

    def tearDown(self):
        self.shutdown_everything()

    def shutdown_everything(self):
        for name in list(self.worker_processes.keys()):
            self.stop_worker(name)

        self.worker_processes = {}
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
212

213
214
215
216
        self.stop_controller()

    def start_controller(self, port=None):
        self.connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
217

Philip ABBET's avatar
Philip ABBET committed
218
        def onWorkerReady(name):
219
220
221
222
            self.connected_workers.append(name)

        def onWorkerGone(name):
            self.connected_workers.remove(name)
Philip ABBET's avatar
Philip ABBET committed
223

Philip ABBET's avatar
Philip ABBET committed
224
        self.controller = WorkerController(
Samuel GAIST's avatar
Samuel GAIST committed
225
            "127.0.0.1",
226
            port=port,
Samuel GAIST's avatar
Samuel GAIST committed
227
            callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone),
Philip ABBET's avatar
Philip ABBET committed
228
        )
Philip ABBET's avatar
Philip ABBET committed
229

230
231
232
233
234
235
236
237
        self.controller.process(100)

    def stop_controller(self):
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None

    def start_worker(self, name, address=None):
Philip ABBET's avatar
Philip ABBET committed
238
        args = [
Samuel GAIST's avatar
Samuel GAIST committed
239
240
241
242
243
            "--prefix=%s" % prefix,
            "--cache=%s" % tmp_prefix,
            "--name=%s" % name.decode(),
            # '-vv',
            self.controller.address if address is None else address,
Philip ABBET's avatar
Philip ABBET committed
244
        ]
Philip ABBET's avatar
Philip ABBET committed
245

Philip ABBET's avatar
Philip ABBET committed
246
        if self.docker:
Samuel GAIST's avatar
Samuel GAIST committed
247
            args.insert(3, "--docker")
Philip ABBET's avatar
Philip ABBET committed
248

249
250
251
252
253
254
        worker_process = WorkerProcess(multiprocessing.Queue(), args)
        worker_process.start()

        worker_process.queue.get()

        self.worker_processes[name] = worker_process
Philip ABBET's avatar
Philip ABBET committed
255

256
257
258
259
260
261
    def stop_worker(self, name):
        if name in self.worker_processes:
            self.worker_processes[name].terminate()
            self.worker_processes[name].join()
            del self.worker_processes[name]

262
    def wait_for_worker_connection(self, name, timeout=10):
Philip ABBET's avatar
Philip ABBET committed
263
        start = time()
264
        while name not in self.connected_workers:
Philip ABBET's avatar
Philip ABBET committed
265
            self.assertTrue(self.controller.process(100) is None)
266
            self.assertTrue(time() - start < timeout)  # Exit after 'timeout' seconds
Philip ABBET's avatar
Philip ABBET committed
267

268
269
270
271
272
273
274
275
276
277
        self.assertTrue(name in self.controller.workers)

    def wait_for_worker_disconnection(self, name):
        start = time()
        while name in self.connected_workers:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds

        self.assertTrue(name not in self.controller.workers)

278
    def prepare_databases(self, configuration):
Samuel GAIST's avatar
Samuel GAIST committed
279
280
281
282
        for _, input_cfg in configuration["inputs"].items():
            database = Database(prefix, input_cfg["database"])
            view = database.view(input_cfg["protocol"], input_cfg["set"])
            view.index(os.path.join(tmp_prefix, input_cfg["path"]))
283
284


Samuel GAIST's avatar
Samuel GAIST committed
285
# ----------------------------------------------------------
286
287
288
289
290
291
292
293
294
295
296
297
298
299


class TestConnection(TestWorkerBase):
    def test_worker_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)

        self.wait_for_worker_connection(WORKER1)

        self.assertEqual(len(self.connected_workers), 1)
Philip ABBET's avatar
Philip ABBET committed
300
        self.assertEqual(len(self.controller.workers), 1)
Philip ABBET's avatar
Philip ABBET committed
301

302
303
304
    def test_worker_disconnection(self):
        self.start_controller()
        self.start_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
305

306
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
307

Philip ABBET's avatar
Philip ABBET committed
308
309
        sleep(1)

310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
        self.stop_worker(WORKER1)

        self.wait_for_worker_disconnection(WORKER1)

    def test_two_workers_connection(self):
        self.start_controller()

        self.assertEqual(len(self.connected_workers), 0)
        self.assertEqual(len(self.controller.workers), 0)

        self.start_worker(WORKER1)
        self.start_worker(WORKER2)

        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)

        self.assertEqual(len(self.connected_workers), 2)
        self.assertEqual(len(self.controller.workers), 2)

    def test_scheduler_last(self):
Samuel GAIST's avatar
Samuel GAIST committed
330
        self.start_worker(WORKER1, address="tcp://127.0.0.1:%i" % PORT)
331
332
        sleep(1)

333
        self.start_controller(port=PORT)
334
335
336

        self.wait_for_worker_connection(WORKER1)

337
338
339
340
341
    def test_scheduler_shutdown(self):
        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
342
        self.assertEqual(message, "STARTED")
343

Samuel GAIST's avatar
Samuel GAIST committed
344
        self.start_worker(WORKER1, "tcp://127.0.0.1:%i" % PORT)
345
346

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
347
        self.assertEqual(message, "READY " + WORKER1.decode())
348

Samuel GAIST's avatar
Samuel GAIST committed
349
        controller.queue.put("STOP")
350
351
352
353
354
355
356

        sleep(1)

        controller = ControllerProcess(multiprocessing.Queue())
        controller.start()

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
357
        self.assertEqual(message, "STARTED")
358
359

        message = controller.queue.get()
Samuel GAIST's avatar
Samuel GAIST committed
360
        self.assertEqual(message, "READY " + WORKER1.decode())
361

Samuel GAIST's avatar
Samuel GAIST committed
362
        controller.queue.put("STOP")
363
364


Samuel GAIST's avatar
Samuel GAIST committed
365
# ----------------------------------------------------------
366
367
368
369
370
371
372
373


class TestOneWorker(TestWorkerBase):
    def setUp(self):
        super(TestOneWorker, self).setUp()

        self.start_controller()
        self.start_worker(WORKER1)
374

375
        self.wait_for_worker_connection(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
376

377
378
379
        self.prepare_databases(CONFIGURATION1)

    def _wait(self, max=200):
Philip ABBET's avatar
Philip ABBET committed
380
381
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
382

Philip ABBET's avatar
Philip ABBET committed
383
384
385
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
386

387
388
389
        if message is None:
            print("Process failed with the allocate range: {}".format(max))

Philip ABBET's avatar
Philip ABBET committed
390
        return message
Philip ABBET's avatar
Philip ABBET committed
391

Philip ABBET's avatar
Philip ABBET committed
392
393
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
394

Philip ABBET's avatar
Philip ABBET committed
395
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
396

Philip ABBET's avatar
Philip ABBET committed
397
398
399
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
400

Philip ABBET's avatar
Philip ABBET committed
401
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
402

Samuel GAIST's avatar
Samuel GAIST committed
403
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
404

Philip ABBET's avatar
Philip ABBET committed
405
406
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
407

Philip ABBET's avatar
Philip ABBET committed
408
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
409

Philip ABBET's avatar
Philip ABBET committed
410
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
411

Philip ABBET's avatar
Philip ABBET committed
412
413
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
414
        config["algorithm"] = "legacy/process_crash/1"
Philip ABBET's avatar
Philip ABBET committed
415

Philip ABBET's avatar
Philip ABBET committed
416
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
417

418
419
420
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
421

Philip ABBET's avatar
Philip ABBET committed
422
        self.assertEqual(worker, WORKER1)
Philip ABBET's avatar
Philip ABBET committed
423
        self.assertEqual(status, WorkerController.JOB_ERROR)
Philip ABBET's avatar
Philip ABBET committed
424
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
427

Samuel GAIST's avatar
Samuel GAIST committed
428
429
        self.assertEqual(result["status"], 1)
        self.assertTrue("a = b" in result["user_error"])
Philip ABBET's avatar
Philip ABBET committed
430

Philip ABBET's avatar
Philip ABBET committed
431
432
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
433
        config["algorithm"] = "user/unknown/1"
Philip ABBET's avatar
Philip ABBET committed
434

Philip ABBET's avatar
Philip ABBET committed
435
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
436

437
438
439
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
440

Philip ABBET's avatar
Philip ABBET committed
441
442
443
444
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
445

Philip ABBET's avatar
Philip ABBET committed
446
447
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
448
        config["algorithm"] = "legacy/syntax_error/1"
Philip ABBET's avatar
Philip ABBET committed
449

Philip ABBET's avatar
Philip ABBET committed
450
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
451

452
453
454
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
455

Philip ABBET's avatar
Philip ABBET committed
456
        self.assertEqual(worker, WORKER1)
457
        self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
Philip ABBET's avatar
Philip ABBET committed
458
459
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
460

Philip ABBET's avatar
Philip ABBET committed
461
    def test_multiple_jobs(self):
Philip ABBET's avatar
Philip ABBET committed
462
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
463
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
464

Philip ABBET's avatar
Philip ABBET committed
465
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
466
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
467

Philip ABBET's avatar
Philip ABBET committed
468
469
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
470

Philip ABBET's avatar
Philip ABBET committed
471
472
473
        message = self._wait()
        self._check_done(message, WORKER1, 2)

Philip ABBET's avatar
Philip ABBET committed
474
475
476
477
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
478

Philip ABBET's avatar
Philip ABBET committed
479
480
481
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
482

Philip ABBET's avatar
Philip ABBET committed
483
484
    def test_cancel(self):
        config = dict(CONFIGURATION1)
Samuel GAIST's avatar
Samuel GAIST committed
485
        config["algorithm"] = "user/integers_echo_slow/1"
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
        self.controller.execute(WORKER1, 1, config)
488
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
489

490
491
492
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
493

Philip ABBET's avatar
Philip ABBET committed
494
495
496
497
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
498

499
500
    def test_error_cancel_unknown_job(self):
        self.controller.cancel(WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
501

502
503
504
        message = self._wait()
        self.assertTrue(message is not None)
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
505

Philip ABBET's avatar
Philip ABBET committed
506
507
508
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
André Anjos's avatar
André Anjos committed
509
        self.assertEqual(data[0].decode(), "Unknown job: 1")
Philip ABBET's avatar
Philip ABBET committed
510
511


Samuel GAIST's avatar
Samuel GAIST committed
512
# ----------------------------------------------------------
Philip ABBET's avatar
Philip ABBET committed
513

Philip ABBET's avatar
Philip ABBET committed
514

515
class TestTwoWorkers(TestWorkerBase):
Philip ABBET's avatar
Philip ABBET committed
516
    def setUp(self):
Samuel GAIST's avatar
Samuel GAIST committed
517
        self.tearDown()  # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
518

519
        super(TestTwoWorkers, self).setUp()
Philip ABBET's avatar
Philip ABBET committed
520

521
522
523
524
525
        self.start_controller()
        self.start_worker(WORKER1)
        self.start_worker(WORKER2)
        self.wait_for_worker_connection(WORKER1)
        self.wait_for_worker_connection(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
526

Philip ABBET's avatar
Philip ABBET committed
527
528
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
529

Philip ABBET's avatar
Philip ABBET committed
530
531
532
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
533

534
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
535
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
536

Philip ABBET's avatar
Philip ABBET committed
537
538
539
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
540

Philip ABBET's avatar
Philip ABBET committed
541
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
542

Samuel GAIST's avatar
Samuel GAIST committed
543
        self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
544

Philip ABBET's avatar
Philip ABBET committed
545
546
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
547

Philip ABBET's avatar
Philip ABBET committed
548
549
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
550

Philip ABBET's avatar
Philip ABBET committed
551
552
553
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
554

Philip ABBET's avatar
Philip ABBET committed
555
556
557
558
559
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
560

Philip ABBET's avatar
Philip ABBET committed
561
            result = simplejson.loads(data[0])
Samuel GAIST's avatar
Samuel GAIST committed
562
            self.assertEqual(result["status"], 0)
Philip ABBET's avatar
Philip ABBET committed
563

Philip ABBET's avatar
Philip ABBET committed
564
565
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
566

Philip ABBET's avatar
Philip ABBET committed
567
568
569
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
570

Philip ABBET's avatar
Philip ABBET committed
571
572
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
573

Philip ABBET's avatar
Philip ABBET committed
574
575
576
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
577

Philip ABBET's avatar
Philip ABBET committed
578
579
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
580

Philip ABBET's avatar
Philip ABBET committed
581
        self.assertNotEqual(worker1, worker2)