test_worker.py 14.7 KB
Newer Older
Philip ABBET's avatar
Philip ABBET committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
Philip ABBET's avatar
Philip ABBET committed
32

Philip ABBET's avatar
Philip ABBET committed
33
34
35
36
37
import logging
logger = logging.getLogger(__name__)

import unittest
import simplejson
Philip ABBET's avatar
Philip ABBET committed
38
import multiprocessing
Philip ABBET's avatar
Philip ABBET committed
39
40
41
42
43
44
45
46
47
from time import time

from ..scripts import worker
from ..worker import WorkerController
from ..dock import Host

from . import prefix, tmp_prefix


Philip ABBET's avatar
Philip ABBET committed
48
49
50
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
51
52
53
54
WORKER1 = 'worker1'
WORKER2 = 'worker2'


Philip ABBET's avatar
Philip ABBET committed
55
56
57
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
CONFIGURATION1 = {
  'queue': 'queue',
  'inputs': {
    'in_data': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/integers_echo/1',
  'parameters': {},
  'environment': {
    'name': 'Python 2.7',
    'version': '1.1.0'
  },
  'outputs': {
    'out_data': {
      'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'out_data',
      'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


Philip ABBET's avatar
Philip ABBET committed
91
92
93
#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
CONFIGURATION2 = {
  'queue': 'queue',
  'inputs': {
    'in_data': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/integers_echo/1',
  'parameters': {},
  'environment': {
    'name': 'Python 2.7',
    'version': '1.1.0'
  },
  'outputs': {
    'out_data': {
      'path': '40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'out_data',
      'hash': '4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


Philip ABBET's avatar
Philip ABBET committed
127
128
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
129

Philip ABBET's avatar
Philip ABBET committed
130
class WorkerProcess(multiprocessing.Process):
Philip ABBET's avatar
Philip ABBET committed
131

Philip ABBET's avatar
Philip ABBET committed
132
133
    def __init__(self, queue, arguments):
        super(WorkerProcess, self).__init__()
Philip ABBET's avatar
Philip ABBET committed
134

Philip ABBET's avatar
Philip ABBET committed
135
136
        self.queue = queue
        self.arguments = arguments
Philip ABBET's avatar
Philip ABBET committed
137
138


Philip ABBET's avatar
Philip ABBET committed
139
140
141
    def run(self):
        self.queue.put('STARTED')
        worker.main(self.arguments)
Philip ABBET's avatar
Philip ABBET committed
142
143


Philip ABBET's avatar
Philip ABBET committed
144
145
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
146
147
148

class TestOneWorker(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
149
150
151
152
153
    def __init__(self, methodName='runTest'):
        super(TestOneWorker, self).__init__(methodName)
        self.controller = None
        self.worker_process = None
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
154
155


Philip ABBET's avatar
Philip ABBET committed
156
157
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
158

Philip ABBET's avatar
Philip ABBET committed
159
        connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
160

Philip ABBET's avatar
Philip ABBET committed
161
162
        def onWorkerReady(name):
            connected_workers.append(name)
Philip ABBET's avatar
Philip ABBET committed
163

Philip ABBET's avatar
Philip ABBET committed
164
165
166
167
168
169
170
        self.controller = WorkerController(
          '127.0.0.1',
          port=None,
          callbacks=dict(
            onWorkerReady = onWorkerReady,
          )
        )
Philip ABBET's avatar
Philip ABBET committed
171

Philip ABBET's avatar
Philip ABBET committed
172
173
174
175
176
177
        args = [
          '--prefix=%s' % prefix,
          '--cache=%s' % tmp_prefix,
          '--name=%s' % WORKER1,
          self.controller.address,
        ]
Philip ABBET's avatar
Philip ABBET committed
178

Philip ABBET's avatar
Philip ABBET committed
179
180
        if self.docker:
            args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
181

Philip ABBET's avatar
Philip ABBET committed
182
183
        self.worker_process = WorkerProcess(multiprocessing.Queue(), args)
        self.worker_process.start()
Philip ABBET's avatar
Philip ABBET committed
184

Philip ABBET's avatar
Philip ABBET committed
185
        self.worker_process.queue.get()
Philip ABBET's avatar
Philip ABBET committed
186

Philip ABBET's avatar
Philip ABBET committed
187
188
189
190
        start = time()
        while len(self.controller.workers) == 0:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds
Philip ABBET's avatar
Philip ABBET committed
191

Philip ABBET's avatar
Philip ABBET committed
192
193
        self.assertEqual(len(self.controller.workers), 1)
        self.assertTrue(WORKER1 in self.controller.workers)
Philip ABBET's avatar
Philip ABBET committed
194

Philip ABBET's avatar
Philip ABBET committed
195
196
        self.assertEqual(len(connected_workers), 1)
        self.assertTrue(WORKER1 in connected_workers)
Philip ABBET's avatar
Philip ABBET committed
197

Philip ABBET's avatar
Philip ABBET committed
198

Philip ABBET's avatar
Philip ABBET committed
199
200
    def tearDown(self):
        self.stop_worker()
Philip ABBET's avatar
Philip ABBET committed
201

Philip ABBET's avatar
Philip ABBET committed
202
203
204
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None
Philip ABBET's avatar
Philip ABBET committed
205
206


Philip ABBET's avatar
Philip ABBET committed
207
208
209
    def _wait(self, max=100):
        message = None
        nb = 0
Philip ABBET's avatar
Philip ABBET committed
210

Philip ABBET's avatar
Philip ABBET committed
211
212
213
        while (message is None) and (nb < max):
            message = self.controller.process(100)
            nb += 1
Philip ABBET's avatar
Philip ABBET committed
214

Philip ABBET's avatar
Philip ABBET committed
215
        return message
Philip ABBET's avatar
Philip ABBET committed
216
217


Philip ABBET's avatar
Philip ABBET committed
218
219
220
221
222
    def stop_worker(self):
        if self.worker_process is not None:
            self.worker_process.terminate()
            self.worker_process.join()
            self.worker_process = None
Philip ABBET's avatar
Philip ABBET committed
223
224


Philip ABBET's avatar
Philip ABBET committed
225
226
    def _check_done(self, message, expected_worker, expected_job_id):
        self.assertTrue(message is not None)
Philip ABBET's avatar
Philip ABBET committed
227

Philip ABBET's avatar
Philip ABBET committed
228
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
229

Philip ABBET's avatar
Philip ABBET committed
230
231
232
        self.assertEqual(worker, expected_worker)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, expected_job_id)
Philip ABBET's avatar
Philip ABBET committed
233

Philip ABBET's avatar
Philip ABBET committed
234
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
235

Philip ABBET's avatar
Philip ABBET committed
236
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
237
238


Philip ABBET's avatar
Philip ABBET committed
239
240
    def test_success(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
241

Philip ABBET's avatar
Philip ABBET committed
242
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
243

Philip ABBET's avatar
Philip ABBET committed
244
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
245
246


Philip ABBET's avatar
Philip ABBET committed
247
248
249
    def test_processing_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_crash/1'
Philip ABBET's avatar
Philip ABBET committed
250

Philip ABBET's avatar
Philip ABBET committed
251
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
252

Philip ABBET's avatar
Philip ABBET committed
253
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
254

Philip ABBET's avatar
Philip ABBET committed
255
256
257
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
258

Philip ABBET's avatar
Philip ABBET committed
259
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
260

Philip ABBET's avatar
Philip ABBET committed
261
262
        self.assertEqual(result['status'], 1)
        self.assertTrue('a = b' in result['user_error'])
Philip ABBET's avatar
Philip ABBET committed
263
264


Philip ABBET's avatar
Philip ABBET committed
265
266
267
    def test_error_unknown_algorithm(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/unknown/1'
Philip ABBET's avatar
Philip ABBET committed
268

Philip ABBET's avatar
Philip ABBET committed
269
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
270

Philip ABBET's avatar
Philip ABBET committed
271
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
272

Philip ABBET's avatar
Philip ABBET committed
273
274
275
276
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
277
278


Philip ABBET's avatar
Philip ABBET committed
279
280
281
    def test_error_syntax_error(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/syntax_error/1'
Philip ABBET's avatar
Philip ABBET committed
282

Philip ABBET's avatar
Philip ABBET committed
283
        self.controller.execute(WORKER1, 1, config)
Philip ABBET's avatar
Philip ABBET committed
284

Philip ABBET's avatar
Philip ABBET committed
285
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
286

Philip ABBET's avatar
Philip ABBET committed
287
288
289
290
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 1)
        self.assertTrue(len(data) > 0)
Philip ABBET's avatar
Philip ABBET committed
291
292


Philip ABBET's avatar
Philip ABBET committed
293
294
    def test_worker_shutdown(self):
        did_shutdown = True
Philip ABBET's avatar
Philip ABBET committed
295

Philip ABBET's avatar
Philip ABBET committed
296
297
        def onWorkerGone(name):
            did_shutdown = (name == WORKER1)
Philip ABBET's avatar
Philip ABBET committed
298

Philip ABBET's avatar
Philip ABBET committed
299
        self.controller.callbacks.onWorkerGone = onWorkerGone
Philip ABBET's avatar
Philip ABBET committed
300

Philip ABBET's avatar
Philip ABBET committed
301
        self.stop_worker()
Philip ABBET's avatar
Philip ABBET committed
302

Philip ABBET's avatar
Philip ABBET committed
303
304
305
        self.assertTrue(self.controller.process(2000) is None)
        self.assertEqual(len(self.controller.workers), 0)
        self.assertTrue(did_shutdown)
Philip ABBET's avatar
Philip ABBET committed
306
307


Philip ABBET's avatar
Philip ABBET committed
308
309
310
    def test_error_busy(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
311

Philip ABBET's avatar
Philip ABBET committed
312
313
        self.controller.execute(WORKER1, 1, config)
        self.controller.execute(WORKER1, 2, config)
Philip ABBET's avatar
Philip ABBET committed
314

Philip ABBET's avatar
Philip ABBET committed
315
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
316

Philip ABBET's avatar
Philip ABBET committed
317
318
319
320
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.JOB_ERROR)
        self.assertEqual(job_id, 2)
        self.assertEqual(data[0], 'Worker is already busy')
Philip ABBET's avatar
Philip ABBET committed
321

Philip ABBET's avatar
Philip ABBET committed
322
        message = self._wait()
Philip ABBET's avatar
Philip ABBET committed
323

Philip ABBET's avatar
Philip ABBET committed
324
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
325
326


Philip ABBET's avatar
Philip ABBET committed
327
328
329
330
    def test_reuse(self):
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 1)
Philip ABBET's avatar
Philip ABBET committed
331

Philip ABBET's avatar
Philip ABBET committed
332
333
334
        self.controller.execute(WORKER1, 2, CONFIGURATION1)
        message = self._wait()
        self._check_done(message, WORKER1, 2)
Philip ABBET's avatar
Philip ABBET committed
335
336


Philip ABBET's avatar
Philip ABBET committed
337
338
339
    def test_cancel(self):
        config = dict(CONFIGURATION1)
        config['algorithm'] = 'user/integers_echo_slow/1'
Philip ABBET's avatar
Philip ABBET committed
340

Philip ABBET's avatar
Philip ABBET committed
341
342
        self.controller.execute(WORKER1, 1, config)
        self.controller.cancel(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
343

Philip ABBET's avatar
Philip ABBET committed
344
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
345

Philip ABBET's avatar
Philip ABBET committed
346
347
348
349
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.CANCELLED)
        self.assertEqual(job_id, 1)
        self.assertEqual(len(data), 0)
Philip ABBET's avatar
Philip ABBET committed
350
351


Philip ABBET's avatar
Philip ABBET committed
352
353
    def test_error_cancel_free_worker(self):
        self.controller.cancel(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
354

Philip ABBET's avatar
Philip ABBET committed
355
        (worker, status, job_id, data) = self._wait()
Philip ABBET's avatar
Philip ABBET committed
356

Philip ABBET's avatar
Philip ABBET committed
357
358
359
360
        self.assertEqual(worker, WORKER1)
        self.assertEqual(status, WorkerController.ERROR)
        self.assertTrue(job_id is None)
        self.assertEqual(data[0], "Worker isn't busy")
Philip ABBET's avatar
Philip ABBET committed
361
362


Philip ABBET's avatar
Philip ABBET committed
363
364
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
365
366
367

class TestOneWorkerDocker(TestOneWorker):

Philip ABBET's avatar
Philip ABBET committed
368
369
370
    def __init__(self, methodName='runTest'):
        super(TestOneWorkerDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
371
372


Philip ABBET's avatar
Philip ABBET committed
373
374
375
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
Philip ABBET's avatar
Philip ABBET committed
376
377


Philip ABBET's avatar
Philip ABBET committed
378
379
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
380
381
382

class TestTwoWorkers(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
383
384
385
386
387
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkers, self).__init__(methodName)
        self.controller = None
        self.worker_processes = None
        self.docker = False
Philip ABBET's avatar
Philip ABBET committed
388
389


Philip ABBET's avatar
Philip ABBET committed
390
391
    def setUp(self):
        self.tearDown()   # In case another test failed badly during its setUp()
Philip ABBET's avatar
Philip ABBET committed
392

Philip ABBET's avatar
Philip ABBET committed
393
        connected_workers = []
Philip ABBET's avatar
Philip ABBET committed
394

Philip ABBET's avatar
Philip ABBET committed
395
396
        def onWorkerReady(name):
            connected_workers.append(name)
Philip ABBET's avatar
Philip ABBET committed
397

Philip ABBET's avatar
Philip ABBET committed
398
399
400
401
402
403
404
        self.controller = WorkerController(
          '127.0.0.1',
          port=None,
          callbacks=dict(
            onWorkerReady = onWorkerReady,
          )
        )
Philip ABBET's avatar
Philip ABBET committed
405

Philip ABBET's avatar
Philip ABBET committed
406
        self.worker_processes = []
Philip ABBET's avatar
Philip ABBET committed
407

Philip ABBET's avatar
Philip ABBET committed
408
409
410
411
412
413
414
        for name in [ WORKER1, WORKER2 ]:
            args = [
                '--prefix=%s' % prefix,
                '--cache=%s' % tmp_prefix,
                '--name=%s' % name,
                self.controller.address,
            ]
Philip ABBET's avatar
Philip ABBET committed
415

Philip ABBET's avatar
Philip ABBET committed
416
417
            if self.docker:
                args.insert(3, '--docker')
Philip ABBET's avatar
Philip ABBET committed
418

Philip ABBET's avatar
Philip ABBET committed
419
420
            worker_process = WorkerProcess(multiprocessing.Queue(), args)
            worker_process.start()
Philip ABBET's avatar
Philip ABBET committed
421

Philip ABBET's avatar
Philip ABBET committed
422
            worker_process.queue.get()
Philip ABBET's avatar
Philip ABBET committed
423

Philip ABBET's avatar
Philip ABBET committed
424
            self.worker_processes.append(worker_process)
Philip ABBET's avatar
Philip ABBET committed
425

Philip ABBET's avatar
Philip ABBET committed
426
427
428
429
        start = time()
        while len(self.controller.workers) < 2:
            self.assertTrue(self.controller.process(100) is None)
            self.assertTrue(time() - start < 10)  # Exit after 10 seconds
Philip ABBET's avatar
Philip ABBET committed
430

Philip ABBET's avatar
Philip ABBET committed
431
432
        self.assertTrue(WORKER1 in self.controller.workers)
        self.assertTrue(WORKER2 in self.controller.workers)
Philip ABBET's avatar
Philip ABBET committed
433

Philip ABBET's avatar
Philip ABBET committed
434
435
436
        self.assertEqual(len(connected_workers), 2)
        self.assertTrue(WORKER1 in connected_workers)
        self.assertTrue(WORKER2 in connected_workers)
Philip ABBET's avatar
Philip ABBET committed
437

Philip ABBET's avatar
Philip ABBET committed
438

Philip ABBET's avatar
Philip ABBET committed
439
440
441
442
443
444
    def tearDown(self):
        if self.worker_processes is not None:
            for worker_process in self.worker_processes:
                worker_process.terminate()
                worker_process.join()
            self.worker_processes = None
Philip ABBET's avatar
Philip ABBET committed
445

Philip ABBET's avatar
Philip ABBET committed
446
447
448
        if self.controller is not None:
            self.controller.destroy()
            self.controller = None
Philip ABBET's avatar
Philip ABBET committed
449
450


Philip ABBET's avatar
Philip ABBET committed
451
452
    def _test_success_one_worker(self, worker_name):
        self.controller.execute(worker_name, 1, CONFIGURATION1)
Philip ABBET's avatar
Philip ABBET committed
453

Philip ABBET's avatar
Philip ABBET committed
454
455
456
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
457

Philip ABBET's avatar
Philip ABBET committed
458
        (worker, status, job_id, data) = message
Philip ABBET's avatar
Philip ABBET committed
459

Philip ABBET's avatar
Philip ABBET committed
460
461
462
        self.assertEqual(worker, worker_name)
        self.assertEqual(status, WorkerController.DONE)
        self.assertEqual(job_id, 1)
Philip ABBET's avatar
Philip ABBET committed
463

Philip ABBET's avatar
Philip ABBET committed
464
        result = simplejson.loads(data[0])
Philip ABBET's avatar
Philip ABBET committed
465

Philip ABBET's avatar
Philip ABBET committed
466
        self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
467
468


Philip ABBET's avatar
Philip ABBET committed
469
470
    def test_success_worker1(self):
        self._test_success_one_worker(WORKER1)
Philip ABBET's avatar
Philip ABBET committed
471
472


Philip ABBET's avatar
Philip ABBET committed
473
474
    def test_success_worker2(self):
        self._test_success_one_worker(WORKER2)
Philip ABBET's avatar
Philip ABBET committed
475
476


Philip ABBET's avatar
Philip ABBET committed
477
478
479
    def test_success_both_workers(self):
        def _check(worker, status, job_id, data):
            self.assertEqual(status, WorkerController.DONE)
Philip ABBET's avatar
Philip ABBET committed
480

Philip ABBET's avatar
Philip ABBET committed
481
482
483
484
485
            if worker == WORKER1:
                self.assertEqual(job_id, 1)
            else:
                self.assertEqual(worker, WORKER2)
                self.assertEqual(job_id, 2)
Philip ABBET's avatar
Philip ABBET committed
486

Philip ABBET's avatar
Philip ABBET committed
487
488
            result = simplejson.loads(data[0])
            self.assertEqual(result['status'], 0)
Philip ABBET's avatar
Philip ABBET committed
489
490


Philip ABBET's avatar
Philip ABBET committed
491
492
        self.controller.execute(WORKER1, 1, CONFIGURATION1)
        self.controller.execute(WORKER2, 2, CONFIGURATION2)
Philip ABBET's avatar
Philip ABBET committed
493

Philip ABBET's avatar
Philip ABBET committed
494
495
496
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
497

Philip ABBET's avatar
Philip ABBET committed
498
499
        (worker1, status, job_id, data) = message
        _check(worker1, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
500

Philip ABBET's avatar
Philip ABBET committed
501
502
503
        message = None
        while message is None:
            message = self.controller.process(100)
Philip ABBET's avatar
Philip ABBET committed
504

Philip ABBET's avatar
Philip ABBET committed
505
506
        (worker2, status, job_id, data) = message
        _check(worker2, status, job_id, data)
Philip ABBET's avatar
Philip ABBET committed
507

Philip ABBET's avatar
Philip ABBET committed
508
        self.assertNotEqual(worker1, worker2)
Philip ABBET's avatar
Philip ABBET committed
509
510


Philip ABBET's avatar
Philip ABBET committed
511
512
#----------------------------------------------------------

Philip ABBET's avatar
Philip ABBET committed
513
514
515

class TestTwoWorkersDocker(TestTwoWorkers):

Philip ABBET's avatar
Philip ABBET committed
516
517
518
    def __init__(self, methodName='runTest'):
        super(TestTwoWorkersDocker, self).__init__(methodName)
        self.docker = True
Philip ABBET's avatar
Philip ABBET committed
519
520


Philip ABBET's avatar
Philip ABBET committed
521
522
523
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)