Commit 5bfa0b76 authored by Samuel Gaist's avatar Samuel Gaist

[test][test_bcp] Refactor for Py38 on macOS

The default multiprocess instanciation has
changed from fork to spawn on macOS which
triggers different behaviour.
parent 96d86804
......@@ -37,6 +37,7 @@
# Tests for experiment execution
import multiprocessing
import multiprocessing.queues
import os
import queue
import unittest
......@@ -135,24 +136,37 @@ DEFAULT_MAX_ITERATION_COUNT = 30
class ZMQBrokerProcess(multiprocessing.Process):
def __init__(self, port, verbose, callbacks=None):
def __init__(self, port, verbose, queue=None):
super(ZMQBrokerProcess, self).__init__()
self.port = port
self.verbose = verbose
self.callbacks = callbacks
self.queue = queue
def run(self):
return broker.run(self.port, verbose=self.verbose, callbacks=self.callbacks)
callbacks = None
if self.queue:
callbacks = self.queue.callbacks()
return broker.run(self.port, verbose=self.verbose, callbacks=callbacks)
class ZMQWorkerProcess(multiprocessing.Process):
def __init__(
self, address, name, verbose, use_docker=False, docker_images_cache=None
self,
address,
name,
verbose,
prefix,
tmp_prefix,
use_docker=False,
docker_images_cache=None,
):
super(ZMQWorkerProcess, self).__init__()
self.broker_address = address
self.service_name = name
self.verbose = verbose
self.prefix = prefix
self.tmp_prefix = tmp_prefix
self.use_docker = use_docker
self.docker_images_cache = None
......@@ -161,8 +175,8 @@ class ZMQWorkerProcess(multiprocessing.Process):
self.broker_address,
service_name=self.service_name,
verbose=self.verbose,
prefix=prefix,
cache=tmp_prefix,
prefix=self.prefix,
cache=self.tmp_prefix,
use_docker=self.use_docker,
docker_images_cache=self.docker_images_cache,
)
......@@ -184,15 +198,27 @@ class ExecutionTestCase(unittest.TestCase):
view.index(os.path.join(tmp_prefix, input_cfg["path"]))
class TestBroker(unittest.TestCase):
class CallbackedQueue(multiprocessing.queues.Queue):
def __init__(self, *args, **kwargs):
from multiprocessing.context import BaseContext
ctx = BaseContext()
ctx._name = "callbacks"
super().__init__(*args, **kwargs, ctx=ctx)
def __on_ready(self, name):
self.queue.put("ready")
self.put("ready")
def __on_gone(self, name):
self.queue.put("gone")
self.put("gone")
def callbacks(self):
return (self.__on_ready, self.__on_gone)
class TestBroker(unittest.TestCase):
def setUp(self):
self.queue = multiprocessing.Queue()
self.queue = CallbackedQueue()
def test_callback(self):
worker_name = b"test_worker"
......@@ -200,12 +226,12 @@ class TestBroker(unittest.TestCase):
port = find_free_port()
broker_address = "tcp://localhost:{}".format(port)
broker_p = ZMQBrokerProcess(
port, VERBOSE_BCP_LOGGING, (self.__on_ready, self.__on_gone)
)
broker_p = ZMQBrokerProcess(port, VERBOSE_BCP_LOGGING, self.queue)
broker_p.start()
worker = ZMQWorkerProcess(broker_address, worker_name, VERBOSE_BCP_LOGGING)
worker = ZMQWorkerProcess(
broker_address, worker_name, VERBOSE_BCP_LOGGING, prefix, tmp_prefix
)
worker.start()
worker.join(2) # Give the worker enough time to announce itself
worker.terminate()
......@@ -249,6 +275,8 @@ class TestBCP(ExecutionTestCase):
broker_address,
self.worker_name,
VERBOSE_BCP_LOGGING,
prefix,
tmp_prefix,
self.use_docker,
self.docker_images_cache,
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment