Commit a4d0c96b authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[test][bcp] Add max iteration count as watchdog system

Without that some tests might stale indefinitely.
parent 9679e736
Pipeline #33478 passed with stage
in 22 minutes and 23 seconds
...@@ -128,6 +128,12 @@ CONFIGURATION2 = { ...@@ -128,6 +128,12 @@ CONFIGURATION2 = {
# ---------------------------------------------------------- # ----------------------------------------------------------
DEFAULT_MAX_ITERATION_COUNT = 30
# ----------------------------------------------------------
class ZMQBrokerProcess(multiprocessing.Process): class ZMQBrokerProcess(multiprocessing.Process):
def __init__(self, port, verbose, callbacks=None): def __init__(self, port, verbose, callbacks=None):
super(ZMQBrokerProcess, self).__init__() super(ZMQBrokerProcess, self).__init__()
...@@ -166,6 +172,11 @@ class ZMQWorkerProcess(multiprocessing.Process): ...@@ -166,6 +172,11 @@ class ZMQWorkerProcess(multiprocessing.Process):
class ExcecutionTestCase(unittest.TestCase): class ExcecutionTestCase(unittest.TestCase):
def setUp(self):
self.MAX_ITERATION_COUNT = int(
os.environ.get("BPC_MAX_ITERATION_COUNT", DEFAULT_MAX_ITERATION_COUNT)
)
def prepare_databases(self, configuration): def prepare_databases(self, configuration):
for _, input_cfg in configuration["inputs"].items(): for _, input_cfg in configuration["inputs"].items():
database = Database(prefix, input_cfg["database"]) database = Database(prefix, input_cfg["database"])
...@@ -224,6 +235,8 @@ class TestBCP(ExcecutionTestCase): ...@@ -224,6 +235,8 @@ class TestBCP(ExcecutionTestCase):
docker_images_cache = None docker_images_cache = None
def setUp(self): def setUp(self):
super().setUp()
self.worker_name = b"test_worker" self.worker_name = b"test_worker"
port = find_free_port() port = find_free_port()
...@@ -255,12 +268,16 @@ class TestBCP(ExcecutionTestCase): ...@@ -255,12 +268,16 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request) self.client.send(self.worker_name, request)
reply = None reply = None
while reply is None: iterations = 0
while reply is None and iterations < self.MAX_ITERATION_COUNT:
try: try:
reply = self.client.recv() reply = self.client.recv()
except KeyboardInterrupt: except KeyboardInterrupt:
break break
else:
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(reply[1], BCP.BCPP_ERROR) self.assertEqual(reply[1], BCP.BCPP_ERROR)
self.assertEqual(reply[2], b"Unknown job: 1") self.assertEqual(reply[2], b"Unknown job: 1")
...@@ -275,7 +292,8 @@ class TestBCP(ExcecutionTestCase): ...@@ -275,7 +292,8 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request) self.client.send(self.worker_name, request)
messages = [] messages = []
while len(messages) < 3: iterations = 0
while len(messages) < 3 and iterations < self.MAX_ITERATION_COUNT:
try: try:
reply = self.client.recv() reply = self.client.recv()
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -283,7 +301,9 @@ class TestBCP(ExcecutionTestCase): ...@@ -283,7 +301,9 @@ class TestBCP(ExcecutionTestCase):
else: else:
if reply: if reply:
messages.append(reply) messages.append(reply)
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED) self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED)
self.assertEqual(messages[0][2], job_id) self.assertEqual(messages[0][2], job_id)
self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED) self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED)
...@@ -299,7 +319,8 @@ class TestBCP(ExcecutionTestCase): ...@@ -299,7 +319,8 @@ class TestBCP(ExcecutionTestCase):
self.client.send(self.worker_name, request) self.client.send(self.worker_name, request)
messages = [] messages = []
while len(messages) < 3: iterations = 0
while len(messages) < 3 and iterations < self.MAX_ITERATION_COUNT:
try: try:
reply = self.client.recv() reply = self.client.recv()
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -307,7 +328,9 @@ class TestBCP(ExcecutionTestCase): ...@@ -307,7 +328,9 @@ class TestBCP(ExcecutionTestCase):
else: else:
if reply: if reply:
messages.append(reply) messages.append(reply)
iterations += 1
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED) self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED)
self.assertEqual(messages[0][2], job_id) self.assertEqual(messages[0][2], job_id)
self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED) self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED)
...@@ -350,10 +373,12 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -350,10 +373,12 @@ class TestExcecutionProcess(ExcecutionTestCase):
poller.register(socket, zmq.POLLIN) poller.register(socket, zmq.POLLIN)
process = self.setup_process() process = self.setup_process()
done = False
done = False
iterations = 0
messages = [] messages = []
while True:
while True and iterations < self.MAX_ITERATION_COUNT:
try: try:
items = poller.poll(1000) items = poller.poll(1000)
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -369,11 +394,13 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -369,11 +394,13 @@ class TestExcecutionProcess(ExcecutionTestCase):
break break
elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]: elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]:
break break
iterations += 1
process.terminate() process.terminate()
process.join() process.join()
ctx.destroy() ctx.destroy()
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertTrue(done) self.assertTrue(done)
self.assertEqual(process.queue.get(), "started") self.assertEqual(process.queue.get(), "started")
self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE) self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE)
...@@ -385,9 +412,11 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -385,9 +412,11 @@ class TestExcecutionProcess(ExcecutionTestCase):
process = self.setup_process() process = self.setup_process()
done = False done = False
iterations = 0
messages = [] messages = []
while True:
while True and iterations < self.MAX_ITERATION_COUNT:
try: try:
items = poller.poll(1000) items = poller.poll(1000)
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -403,10 +432,12 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -403,10 +432,12 @@ class TestExcecutionProcess(ExcecutionTestCase):
break break
elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]: elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]:
break break
iterations += 1
process.terminate() process.terminate()
process.join() process.join()
self.assertTrue(iterations < self.MAX_ITERATION_COUNT)
self.assertTrue(done) self.assertTrue(done)
self.assertEqual(process.queue.get(), "started") self.assertEqual(process.queue.get(), "started")
self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE) self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment