diff --git a/beat/core/dbexecution.py b/beat/core/dbexecution.py old mode 100644 new mode 100755 index 5f13a7efcf101cc5a068702d0a1ecd5845ab95ca..ff73ae41eaba4be5b16993262bf3ee113beaa974 --- a/beat/core/dbexecution.py +++ b/beat/core/dbexecution.py @@ -47,7 +47,7 @@ from . import inputs from . import outputs from . import data from . import stats -from . import agent +from .message_handler import MessageHandler class DBExecutor(object): @@ -259,7 +259,7 @@ class DBExecutor(object): def process(self, zmq_context, zmq_socket): - self.handler = agent.MessageHandler(self.input_list, zmq_context, zmq_socket) + self.handler = MessageHandler(self.input_list, zmq_context, zmq_socket) self.handler.start() diff --git a/beat/core/scripts/worker.py b/beat/core/scripts/worker.py index 770786dd2da2e081de4e5503fb6561bb24e3eb65..f2b1656d1a4b8b1fa288007e02b7f5d80b89c62f 100755 --- a/beat/core/scripts/worker.py +++ b/beat/core/scripts/worker.py @@ -234,6 +234,8 @@ def main(user_input=None): break + logger.info("The scheduler answered") + # Process the requests execution_processes = [] @@ -328,7 +330,7 @@ def main(user_input=None): job_id = parts[1] try: - execution_process = [ p for p in execution_processes if p.job_id == job_id ] + execution_process = [ p for p in execution_processes if p.job_id == job_id ][0] except: socket.send_multipart([ WorkerController.ERROR, diff --git a/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json index bb12429ae0c513b42b0ff456adb4433380fe4ac4..b055f4714777555707bbba24f8cee262adcbe7c9 100644 --- a/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json +++ b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json @@ -46,7 +46,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/double/1/double.json b/beat/core/test/prefix/experiments/user/user/double/1/double.json index ecbde7e0490333cc39b6fef520304ed98ef438a2..4820a823fad7ed6528abbbcf672e840696998cca 100644 --- a/beat/core/test/prefix/experiments/user/user/double/1/double.json +++ b/beat/core/test/prefix/experiments/user/user/double/1/double.json @@ -38,7 +38,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/double/1/large.json b/beat/core/test/prefix/experiments/user/user/double/1/large.json index ea20697e4fcc7f092fe2f7a7888bd2f1279864b3..5275b0a7273be2b904540065ead1d0de2d514bbb 100644 --- a/beat/core/test/prefix/experiments/user/user/double/1/large.json +++ b/beat/core/test/prefix/experiments/user/user/double/1/large.json @@ -38,7 +38,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/double/1/large2.json b/beat/core/test/prefix/experiments/user/user/double/1/large2.json index ade7e4e6360e7b8b04ed4723c2d494a6f0049c2f..4f9b7e4f5916708189e477e98fd94d87a6484b90 100644 --- a/beat/core/test/prefix/experiments/user/user/double/1/large2.json +++ b/beat/core/test/prefix/experiments/user/user/double/1/large2.json @@ -38,7 +38,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/double_triangle/1/double_triangle.json b/beat/core/test/prefix/experiments/user/user/double_triangle/1/double_triangle.json index c903417027c9d701ebd8e9430609ee0f47149370..607ba54d790f1ab47b66ed3ccea74f7c3385f836 100644 --- a/beat/core/test/prefix/experiments/user/user/double_triangle/1/double_triangle.json +++ b/beat/core/test/prefix/experiments/user/user/double_triangle/1/double_triangle.json @@ -71,7 +71,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/1/test.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/1/test.json index f770f4002c7c6aa3a265efe15f237dc33be28753..f02b8ca009f16ae4fb4ada2e397f1f87aff1f91b 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/1/test.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/1/test.json @@ -44,7 +44,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/2/test.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/2/test.json index 832aa2a115dfe757c8b075ab25d76af87cca7b9b..202e35d0b7bc7f36aa477cd013d5e41ae699d14a 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/2/test.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/2/test.json @@ -49,7 +49,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test.json index e24864d741ab4278b351d628685a90e480783606..a145dee3ef98224ad924cc0874ef450f087f61c9 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test.json @@ -50,7 +50,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test2.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test2.json index 8add54bebce8aa1c83f6d6ab7ef90568bc58d330..ee50c17fdc36bb8a9c58ffde75a1093ae05587ab 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test2.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/3/test2.json @@ -50,7 +50,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test.json index ed505bd6b7a1e2760c5afb2f1671440ff4d21cba..0dab39d49b8a4bd1918151fe33ef97e0191f07ac 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test.json @@ -62,7 +62,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test2.json b/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test2.json index 397cb65c85bdc484c7b362d5f77707cdd8456a31..8a67503a051e0937c00cd9aaa0a5e3eaa9b263e1 100644 --- a/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test2.json +++ b/beat/core/test/prefix/experiments/user/user/inputs_mix/4/test2.json @@ -62,7 +62,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/integers_addition/1/integers_addition.json b/beat/core/test/prefix/experiments/user/user/integers_addition/1/integers_addition.json index a707f223ef82d2bf12492c6edffd9e06043a64fb..cdd49190f34733f1efa780bfd7cbca32b6eea1b5 100644 --- a/beat/core/test/prefix/experiments/user/user/integers_addition/1/integers_addition.json +++ b/beat/core/test/prefix/experiments/user/user/integers_addition/1/integers_addition.json @@ -35,7 +35,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/integers_addition/2/integers_addition.json b/beat/core/test/prefix/experiments/user/user/integers_addition/2/integers_addition.json index 16c5be38ecbde35b49024cd97097feed508d832a..dc92e5e2973c600399e94b8e010c298936b94608 100644 --- a/beat/core/test/prefix/experiments/user/user/integers_addition/2/integers_addition.json +++ b/beat/core/test/prefix/experiments/user/user/integers_addition/2/integers_addition.json @@ -45,7 +45,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/integers_addition/3/integers_addition.json b/beat/core/test/prefix/experiments/user/user/integers_addition/3/integers_addition.json index da67fb632e99d77b6361f955bbc35d02b3962d5e..5e3c6a96bd46c3acf3091e1f9449adb0aac63863 100644 --- a/beat/core/test/prefix/experiments/user/user/integers_addition/3/integers_addition.json +++ b/beat/core/test/prefix/experiments/user/user/integers_addition/3/integers_addition.json @@ -62,7 +62,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/integers_echo/1/integers_echo.json b/beat/core/test/prefix/experiments/user/user/integers_echo/1/integers_echo.json index 5714d3481e8e2bf200aac005f1a5395e5723e8fa..5695c82f7724c9c7552326edbce3166928346f7b 100644 --- a/beat/core/test/prefix/experiments/user/user/integers_echo/1/integers_echo.json +++ b/beat/core/test/prefix/experiments/user/user/integers_echo/1/integers_echo.json @@ -22,7 +22,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/integers_labelled/1/test.json b/beat/core/test/prefix/experiments/user/user/integers_labelled/1/test.json index 6720d3e7460d12e129c547a557f29bbe0133a46b..521ea8a6e0e5cbf7400ccdfa68b0693f7ef2c154 100644 --- a/beat/core/test/prefix/experiments/user/user/integers_labelled/1/test.json +++ b/beat/core/test/prefix/experiments/user/user/integers_labelled/1/test.json @@ -33,7 +33,7 @@ "globals": { "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "queue": "queue" } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single.json b/beat/core/test/prefix/experiments/user/user/single/1/single.json index 6071986ecbeb433d9c29dccb03b7e39e09128616..1e656e68222ca482927402a9a8e30579deec5f38 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_add.json b/beat/core/test/prefix/experiments/user/user/single/1/single_add.json index 5db0154e02087eb3d27d730a9b923bf01a6c5c70..2cc9cc5b72d1435a83cfe73f60cf1170162100a5 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_add.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_add.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_add2.json b/beat/core/test/prefix/experiments/user/user/single/1/single_add2.json index 23406907743fc79c4c58260f42100fcd187f5169..397400ef6edfc9a48c46993be31a4126aea267cf 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_add2.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_add2.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" }, "user/integers_add/1": { "offset": 2 diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_crash.json b/beat/core/test/prefix/experiments/user/user/single/1/single_crash.json index c393dc9be65f4d2c1dcabfcd00d9dce564de3d54..7c066751ec8a551c3fddfae5918bd855454396b0 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_crash.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_crash.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_done.json b/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_done.json index bda651df86f71e689243a70cf2319a1c30325f17..903a8d884ae3a9625d79a728d4042557035853bd 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_done.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_done.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_next.json b/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_next.json index dc52640d60c66d9605a01a07d39290cbc818efab..a636df633457d6d176ea1f30f8213d787fe0c98a 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_next.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_db_crash_next.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_error.json b/beat/core/test/prefix/experiments/user/user/single/1/single_error.json index bd0ef6b3532f02ed79e79933b14eef23d81f4069..e8007e8ec932826cbcd898c5f82b541619636a78 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_error.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_error.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_error2.json b/beat/core/test/prefix/experiments/user/user/single/1/single_error2.json index bd0ef6b3532f02ed79e79933b14eef23d81f4069..e8007e8ec932826cbcd898c5f82b541619636a78 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_error2.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_error2.json @@ -29,7 +29,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/single/1/single_large.json b/beat/core/test/prefix/experiments/user/user/single/1/single_large.json index 898dc168561e7a1e552cd7f634adfbc9aaa3b87a..7fbdaad66a726a86e7104aecd243258d78943b40 100644 --- a/beat/core/test/prefix/experiments/user/user/single/1/single_large.json +++ b/beat/core/test/prefix/experiments/user/user/single/1/single_large.json @@ -34,7 +34,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/triangle/1/too_many_nexts.json b/beat/core/test/prefix/experiments/user/user/triangle/1/too_many_nexts.json index 0aaa325fcc63a8b16e647cd62f8689b7d16cb634..b75fc76ba5e0cd6a595b86922b65eca283464d56 100644 --- a/beat/core/test/prefix/experiments/user/user/triangle/1/too_many_nexts.json +++ b/beat/core/test/prefix/experiments/user/user/triangle/1/too_many_nexts.json @@ -53,7 +53,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/prefix/experiments/user/user/triangle/1/triangle.json b/beat/core/test/prefix/experiments/user/user/triangle/1/triangle.json index 99f283d07603bd07d97c878d244084cf6faf3220..608c4882641d68b100ff65c86a7cebada9f1c181 100644 --- a/beat/core/test/prefix/experiments/user/user/triangle/1/triangle.json +++ b/beat/core/test/prefix/experiments/user/user/triangle/1/triangle.json @@ -53,7 +53,7 @@ "queue": "queue", "environment": { "name": "Python 2.7", - "version": "1.1.0" + "version": "1.2.0" } } } diff --git a/beat/core/test/test_dbexecution.py b/beat/core/test/test_dbexecution.py index 4542db175e414804cfb706e2f6a45426a1a308ef..c2a3198597f727191bc8fdc9e16a8923ef71d592 100644 --- a/beat/core/test/test_dbexecution.py +++ b/beat/core/test/test_dbexecution.py @@ -80,7 +80,7 @@ CONFIGURATION = { 'parameters': {}, 'environment': { 'name': 'Python 2.7', - 'version': '1.1.0' + 'version': '1.2.0' }, 'outputs': { 'sum': { diff --git a/beat/core/test/test_message_handler.py b/beat/core/test/test_message_handler.py index 0b7eb024c85c1cbcbc93c63fdc2c3eea54bec0cf..7ec49cddf603d586f706c2eeb09b00b876c6f8ef 100644 --- a/beat/core/test/test_message_handler.py +++ b/beat/core/test/test_message_handler.py @@ -45,7 +45,7 @@ import unittest import zmq import nose.tools -from ..agent import MessageHandler +from ..message_handler import MessageHandler from ..dataformat import DataFormat from ..inputs import RemoteInput from ..inputs import RemoteException diff --git a/beat/core/test/test_worker.py b/beat/core/test/test_worker.py index 408f056957f756e3f97d899a4a3c8b6a08aff4c7..e5b73e58dcaa9d39e7043f8580c621c400b7adb5 100644 --- a/beat/core/test/test_worker.py +++ b/beat/core/test/test_worker.py @@ -73,7 +73,7 @@ CONFIGURATION1 = { 'parameters': {}, 'environment': { 'name': 'Python 2.7', - 'version': '1.1.0' + 'version': '1.2.0' }, 'outputs': { 'out_data': { @@ -109,7 +109,7 @@ CONFIGURATION2 = { 'parameters': {}, 'environment': { 'name': 'Python 2.7', - 'version': '1.1.0' + 'version': '1.2.0' }, 'outputs': { 'out_data': { @@ -173,6 +173,7 @@ class TestOneWorker(unittest.TestCase): '--prefix=%s' % prefix, '--cache=%s' % tmp_prefix, '--name=%s' % WORKER1, + # '-vvv', self.controller.address, ] @@ -253,7 +254,7 @@ class TestOneWorker(unittest.TestCase): (worker, status, job_id, data) = self._wait() self.assertEqual(worker, WORKER1) - self.assertEqual(status, WorkerController.DONE) + self.assertEqual(status, WorkerController.JOB_ERROR) self.assertEqual(job_id, 1) result = simplejson.loads(data[0]) @@ -305,24 +306,19 @@ class TestOneWorker(unittest.TestCase): self.assertTrue(did_shutdown) - def test_error_busy(self): + def test_multiple_jobs(self): config = dict(CONFIGURATION1) config['algorithm'] = 'user/integers_echo_slow/1' - self.controller.execute(WORKER1, 1, config) + self.controller.execute(WORKER1, 1, CONFIGURATION1) self.controller.execute(WORKER1, 2, config) - (worker, status, job_id, data) = self._wait() - - self.assertEqual(worker, WORKER1) - self.assertEqual(status, WorkerController.JOB_ERROR) - self.assertEqual(job_id, 2) - self.assertEqual(data[0], 'Worker is already busy') - message = self._wait() - self._check_done(message, WORKER1, 1) + message = self._wait() + self._check_done(message, WORKER1, 2) + def test_reuse(self): self.controller.execute(WORKER1, 1, CONFIGURATION1) @@ -357,7 +353,7 @@ class TestOneWorker(unittest.TestCase): self.assertEqual(worker, WORKER1) self.assertEqual(status, WorkerController.ERROR) self.assertTrue(job_id is None) - self.assertEqual(data[0], "Worker isn't busy") + self.assertEqual(data[0], "Unknown job: 1") #---------------------------------------------------------- diff --git a/beat/core/worker.py b/beat/core/worker.py index 2e85379d242b28a9322ab8ff88d787df28435163..3b44ab448bb37db847403ccba79e687ec895e76f 100755 --- a/beat/core/worker.py +++ b/beat/core/worker.py @@ -96,8 +96,8 @@ class WorkerController(object): def cancel(self, worker, job_id): self.socket.send_multipart([ str(worker), - str(job_id), - WorkerController.CANCEL + WorkerController.CANCEL, + str(job_id) ])