Commit 9c25d8b1 authored by Samuel GAIST's avatar Samuel GAIST

Merge branch '90_subprocess_with_different_environment' into 'master'

Subprocess execution with different environment

Closes #90

See merge request !100
parents 43de84c3 3cc71593
Pipeline #34106 failed with stages
in 37 minutes and 37 seconds
...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor):
guarantee that the cache is refreshed as appropriate in case the guarantee that the cache is refreshed as appropriate in case the
underlying libraries change. underlying libraries change.
custom_root_folders (dict): A dictionary mapping databases name and
their location on disk
ip_address (str): IP address of the machine to connect to for the database
execution and message handlers.
python_path (str): Path to the python executable of the environment to use
for experiment execution.
Attributes: Attributes:
...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=None, library_cache=None,
custom_root_folders=None, custom_root_folders=None,
ip_address="127.0.0.1", ip_address="127.0.0.1",
python_path=None,
): ):
super(SubprocessExecutor, self).__init__( super(SubprocessExecutor, self).__init__(
prefix, prefix,
data, data,
...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor):
custom_root_folders=custom_root_folders, custom_root_folders=custom_root_folders,
) )
if python_path is None:
base_path = os.path.dirname(sys.argv[0])
# We need three apps to run this function: databases_provider and execute # We need three apps to run this function: databases_provider and execute
self.EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]), "execute")) self.EXECUTE_BIN = _which(os.path.join(base_path, "execute"))
self.LOOP_EXECUTE_BIN = _which( self.LOOP_EXECUTE_BIN = _which(os.path.join(base_path, "loop_execute"))
os.path.join(os.path.dirname(sys.argv[0]), "loop_execute") self.DBPROVIDER_BIN = _which(os.path.join(base_path, "databases_provider"))
) else:
self.DBPROVIDER_BIN = _which( base_path = os.path.dirname(python_path)
os.path.join(os.path.dirname(sys.argv[0]), "databases_provider") self.EXECUTE_BIN = os.path.join(base_path, "execute")
) self.LOOP_EXECUTE_BIN = os.path.join(base_path, "loop_execute")
self.DBPROVIDER_BIN = os.path.join(base_path, "databases_provider")
if any(
[
not os.path.exists(executable)
for executable in [
self.EXECUTE_BIN,
self.LOOP_EXECUTE_BIN,
self.DBPROVIDER_BIN,
]
]
):
raise RuntimeError("Invalid environment")
def __create_db_process(self, configuration_name=None): def __create_db_process(self, configuration_name=None):
databases_process = None databases_process = None
......
...@@ -44,6 +44,8 @@ import zmq ...@@ -44,6 +44,8 @@ import zmq
import simplejson as json import simplejson as json
from flaky import flaky
from ..bcpapi import BCP from ..bcpapi import BCP
from ..bcpapi.client import BeatComputationClient from ..bcpapi.client import BeatComputationClient
from ..bcpapi.execution import ExecutionProcess from ..bcpapi.execution import ExecutionProcess
...@@ -171,7 +173,7 @@ class ZMQWorkerProcess(multiprocessing.Process): ...@@ -171,7 +173,7 @@ class ZMQWorkerProcess(multiprocessing.Process):
# ---------------------------------------------------------- # ----------------------------------------------------------
class ExcecutionTestCase(unittest.TestCase): class ExecutionTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.MAX_ITERATION_COUNT = int( self.MAX_ITERATION_COUNT = int(
os.environ.get("BPC_MAX_ITERATION_COUNT", DEFAULT_MAX_ITERATION_COUNT) os.environ.get("BPC_MAX_ITERATION_COUNT", DEFAULT_MAX_ITERATION_COUNT)
...@@ -230,7 +232,7 @@ class TestBroker(unittest.TestCase): ...@@ -230,7 +232,7 @@ class TestBroker(unittest.TestCase):
self.assertEqual(queue_messages[1], "gone") self.assertEqual(queue_messages[1], "gone")
class TestBCP(ExcecutionTestCase): class TestBCP(ExecutionTestCase):
use_docker = False use_docker = False
docker_images_cache = None docker_images_cache = None
...@@ -311,6 +313,7 @@ class TestBCP(ExcecutionTestCase): ...@@ -311,6 +313,7 @@ class TestBCP(ExcecutionTestCase):
self.assertEqual(messages[2][1], BCP.BCPP_JOB_CANCELLED) self.assertEqual(messages[2][1], BCP.BCPP_JOB_CANCELLED)
self.assertEqual(messages[2][2], job_id) self.assertEqual(messages[2][2], job_id)
@flaky(max_runs=3)
def test_execute(self): def test_execute(self):
self.prepare_databases(CONFIGURATION1) self.prepare_databases(CONFIGURATION1)
job_id = b"1" job_id = b"1"
...@@ -349,16 +352,21 @@ class TestBCPDocker(TestBCP): ...@@ -349,16 +352,21 @@ class TestBCPDocker(TestBCP):
cls.host = Host(images_cache=cls.docker_images_cache, raise_on_errors=False) cls.host = Host(images_cache=cls.docker_images_cache, raise_on_errors=False)
class TestExcecutionProcess(ExcecutionTestCase): class TestExecutionProcess(ExecutionTestCase):
address = "ipc://execution_feed" REMOTE_ADDRESS = "ipc://execution_feed"
def tearDown(self): def tearDown(self):
os.remove(self.address.split("//")[1]) os.remove(self.REMOTE_ADDRESS.split("//")[1])
def setup_process(self): def setup_process(self):
self.prepare_databases(CONFIGURATION1) self.prepare_databases(CONFIGURATION1)
process = ExecutionProcess( process = ExecutionProcess(
self.address, b"1", prefix, CONFIGURATION1, tmp_prefix, VERBOSE_BCP_LOGGING self.REMOTE_ADDRESS,
b"1",
prefix,
CONFIGURATION1,
tmp_prefix,
VERBOSE_BCP_LOGGING,
) )
process.start() process.start()
return process return process
...@@ -367,7 +375,7 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -367,7 +375,7 @@ class TestExcecutionProcess(ExcecutionTestCase):
ctx = zmq.Context() ctx = zmq.Context()
socket = ctx.socket(zmq.ROUTER) socket = ctx.socket(zmq.ROUTER)
socket.linger = 0 socket.linger = 0
socket.bind(self.address) socket.bind(self.REMOTE_ADDRESS)
poller = zmq.Poller() poller = zmq.Poller()
poller.register(socket, zmq.POLLIN) poller.register(socket, zmq.POLLIN)
...@@ -407,7 +415,9 @@ class TestExcecutionProcess(ExcecutionTestCase): ...@@ -407,7 +415,9 @@ class TestExcecutionProcess(ExcecutionTestCase):
def test_processor(self): def test_processor(self):
poller = zmq.Poller() poller = zmq.Poller()
processor = BeatComputationProcessor(poller, self.address, VERBOSE_BCP_LOGGING) processor = BeatComputationProcessor(
poller, self.REMOTE_ADDRESS, VERBOSE_BCP_LOGGING
)
process = self.setup_process() process = self.setup_process()
......
...@@ -40,6 +40,9 @@ import os ...@@ -40,6 +40,9 @@ import os
import glob import glob
import logging import logging
import nose.tools import nose.tools
import subprocess as sp # nosec
from shutil import rmtree
from ..experiment import Experiment from ..experiment import Experiment
from ..execution import LocalExecutor from ..execution import LocalExecutor
...@@ -106,6 +109,8 @@ class BaseExecutionMixIn(object): ...@@ -106,6 +109,8 @@ class BaseExecutionMixIn(object):
machine. It borrows some code from the package ``beat.cmdline``. machine. It borrows some code from the package ``beat.cmdline``.
""" """
executor_parameters = kwargs.pop("executor_parameters", {})
dataformat_cache = {} dataformat_cache = {}
database_cache = {} database_cache = {}
algorithm_cache = {} algorithm_cache = {}
...@@ -140,6 +145,7 @@ class BaseExecutionMixIn(object): ...@@ -140,6 +145,7 @@ class BaseExecutionMixIn(object):
dataformat_cache, dataformat_cache,
database_cache, database_cache,
algorithm_cache, algorithm_cache,
**executor_parameters
) )
nose.tools.assert_true( nose.tools.assert_true(
executor.valid, "\n * %s" % "\n * ".join(executor.errors) executor.valid, "\n * %s" % "\n * ".join(executor.errors)
...@@ -391,6 +397,7 @@ class TestLocalExecution(BaseExecutionMixIn): ...@@ -391,6 +397,7 @@ class TestLocalExecution(BaseExecutionMixIn):
dataformat_cache, dataformat_cache,
database_cache, database_cache,
algorithm_cache, algorithm_cache,
**kwargs
): ):
return LocalExecutor( return LocalExecutor(
prefix, prefix,
...@@ -426,14 +433,16 @@ class TestSubprocessExecution(BaseExecutionMixIn): ...@@ -426,14 +433,16 @@ class TestSubprocessExecution(BaseExecutionMixIn):
dataformat_cache, dataformat_cache,
database_cache, database_cache,
algorithm_cache, algorithm_cache,
python_path=None,
): ):
return SubprocessExecutor( return SubprocessExecutor(
prefix, prefix=prefix,
configuration, data=configuration,
tmp_prefix, cache=tmp_prefix,
dataformat_cache, dataformat_cache=dataformat_cache,
database_cache, database_cache=database_cache,
algorithm_cache, algorithm_cache=algorithm_cache,
python_path=python_path,
) )
@slow @slow
...@@ -453,3 +462,56 @@ class TestSubprocessExecution(BaseExecutionMixIn): ...@@ -453,3 +462,56 @@ class TestSubprocessExecution(BaseExecutionMixIn):
nose.tools.eq_( nose.tools.eq_(
result["user_error"], "'Could not setup algorithm (returned False)'" result["user_error"], "'Could not setup algorithm (returned False)'"
) )
def create_conda_environment(self, additional_packages=[]):
environment_name = "subprocess_environment"
environment_prefix = os.path.join(tmp_prefix, environment_name)
packages = ["python=3"] + additional_packages
sp.run(
[
"conda",
"create",
"-y",
"-c",
"defaults",
"-c",
"http://www.idiap.ch/software/bob/conda/",
"--prefix",
environment_prefix,
]
+ packages,
check=True,
stdout=sp.PIPE,
stderr=sp.PIPE,
)
return environment_prefix
def clear_conda_environment(self, environment_prefix):
rmtree(environment_prefix)
@slow
def test_different_environment(self):
environment_prefix = self.create_conda_environment(["beat.backend.python"])
result = self.execute(
"user/user/loop/1/loop",
[{"sum": 135, "nb": 9}, {"sum": 9, "nb": 9}],
executor_parameters={
"python_path": os.path.join(environment_prefix, "bin", "python")
},
)
self.clear_conda_environment(environment_prefix)
nose.tools.assert_is_none(result)
@slow
def test_wrong_different_environment(self):
environment_prefix = self.create_conda_environment()
with nose.tools.assert_raises(RuntimeError):
self.execute(
"user/user/loop/1/loop",
[{"sum": 135, "nb": 9}, {"sum": 9, "nb": 9}],
executor_parameters={
"python_path": os.path.join(environment_prefix, "bin", "python")
},
)
self.clear_conda_environment(environment_prefix)
...@@ -47,6 +47,7 @@ test: ...@@ -47,6 +47,7 @@ test:
- ddt - ddt
- nose - nose
- coverage - coverage
- flaky
- sphinx - sphinx
- sphinx_rtd_theme - sphinx_rtd_theme
# required for plotting related tests # required for plotting related tests
...@@ -60,7 +61,7 @@ test: ...@@ -60,7 +61,7 @@ test:
# pulls required images once before running the tests # pulls required images once before running the tests
- python -c "from beat.core.test.utils import pull_docker_test_images as f; f()" - python -c "from beat.core.test.utils import pull_docker_test_images as f; f()"
- worker --help - worker --help
- nosetests --with-coverage --cover-package={{ name }} -sv {{ name }} - nosetests --with-flaky --with-coverage --cover-package={{ name }} -sv {{ name }}
- sphinx-build -aEW ${PREFIX}/share/doc/{{ name }}/doc sphinx - sphinx-build -aEW ${PREFIX}/share/doc/{{ name }}/doc sphinx
- if [ -n "${CI_PROJECT_DIR}" ]; then mv sphinx "${CI_PROJECT_DIR}/"; fi - if [ -n "${CI_PROJECT_DIR}" ]; then mv sphinx "${CI_PROJECT_DIR}/"; fi
- sphinx-build -aEb doctest ${PREFIX}/share/doc/{{ name }}/doc sphinx - sphinx-build -aEb doctest ${PREFIX}/share/doc/{{ name }}/doc sphinx
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment