Commit 05b2b084 authored by Samuel GAIST's avatar Samuel GAIST

Merge branch 'improve_two_loops_test' into 'master'

Improve two_loops test experiment

See merge request !94
parents da776b0c a4d0c96b
Pipeline #33481 passed with stages
in 45 minutes and 9 seconds
......@@ -32,6 +32,9 @@
"processor_outputs": {
"out": "out"
},
"processor_parameters": {
"step": 1
},
"evaluator_algorithm": "user/db_input_loop_evaluator/1",
"evaluator_parameters": {
......@@ -98,7 +101,7 @@
},
"queue": "queue",
"user/db_input_loop_processor/1" : {
"step": 1
"step": 2
},
"user/db_input_loop_evaluator/1" : {
"threshold": 2
......
......@@ -128,6 +128,12 @@ CONFIGURATION2 = {
# ----------------------------------------------------------
DEFAULT_MAX_ITERATION_COUNT = 30
# ----------------------------------------------------------
class ZMQBrokerProcess(multiprocessing.Process):
def __init__(self, port, verbose, callbacks=None):
super(ZMQBrokerProcess, self).__init__()
......@@ -166,6 +172,11 @@ class ZMQWorkerProcess(multiprocessing.Process):
class ExcecutionTestCase(unittest.TestCase):
def setUp(self):
self.MAX_ITERATION_COUNT = int(
os.environ.get("BPC_MAX_ITERATION_COUNT", DEFAULT_MAX_ITERATION_COUNT)
)
def prepare_databases(self, configuration):
for _, input_cfg in configuration["inputs"].items():
database = Database(prefix, input_cfg["database"])
......@@ -224,6 +235,8 @@ class TestBCP(ExcecutionTestCase):
docker_images_cache = None
def setUp(self):
super().setUp()
self.worker_name = b"test_worker"
port = find_free_port()
......@@ -255,12 +268,16 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request)
reply = None
while reply is None:
iterations = 0
while reply is None and iterations < self.MAX_ITERATION_COUNT:
try:
reply = self.client.recv()
except KeyboardInterrupt:
break
else:
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(reply[1], BCP.BCPP_ERROR)
self.assertEqual(reply[2], b"Unknown job: 1")
......@@ -275,7 +292,8 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request)
messages = []
while len(messages) < 3:
iterations = 0
while len(messages) < 3 and iterations < self.MAX_ITERATION_COUNT:
try:
reply = self.client.recv()
except KeyboardInterrupt:
......@@ -283,7 +301,9 @@ class TestBCP(ExcecutionTestCase):
else:
if reply:
messages.append(reply)
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED)
self.assertEqual(messages[0][2], job_id)
self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED)
......@@ -299,7 +319,8 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request)
messages = []
while len(messages) < 3:
iterations = 0
while len(messages) < 3 and iterations < self.MAX_ITERATION_COUNT:
try:
reply = self.client.recv()
except KeyboardInterrupt:
......@@ -307,7 +328,9 @@ class TestBCP(ExcecutionTestCase):
else:
if reply:
messages.append(reply)
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED)
self.assertEqual(messages[0][2], job_id)
self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED)
......@@ -350,10 +373,12 @@ class TestExcecutionProcess(ExcecutionTestCase):
poller.register(socket, zmq.POLLIN)
process = self.setup_process()
done = False
done = False
iterations = 0
messages = []
while True:
while True and iterations < self.MAX_ITERATION_COUNT:
try:
items = poller.poll(1000)
except KeyboardInterrupt:
......@@ -369,11 +394,13 @@ class TestExcecutionProcess(ExcecutionTestCase):
break
elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]:
break
iterations += 1
process.terminate()
process.join()
ctx.destroy()
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertTrue(done)
self.assertEqual(process.queue.get(), "started")
self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE)
......@@ -385,9 +412,11 @@ class TestExcecutionProcess(ExcecutionTestCase):
process = self.setup_process()
done = False
iterations = 0
messages = []
while True:
while True and iterations < self.MAX_ITERATION_COUNT:
try:
items = poller.poll(1000)
except KeyboardInterrupt:
......@@ -403,10 +432,12 @@ class TestExcecutionProcess(ExcecutionTestCase):
break
elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]:
break
iterations += 1
process.terminate()
process.join()
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertTrue(done)
self.assertEqual(process.queue.get(), "started")
self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment