#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.core module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### # Tests for experiment execution import os import json import unittest import multiprocessing import queue import zmq from ..bcpapi import BCP from ..bcpapi.client import BeatComputationClient from ..bcpapi.execution import ExecutionProcess from ..bcpapi.processor import BeatComputationProcessor from ..bcp import worker from ..bcp import broker from ..database import Database from ..utils import find_free_port from ..dock import Host from . import prefix, tmp_prefix from . import VERBOSE_BCP_LOGGING # ---------------------------------------------------------- CONFIGURATION1 = { "queue": "queue", "inputs": { "in": { "set": "double", "protocol": "double", "database": "integers_db/1", "output": "a", "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db", "endpoint": "a", "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55", "channel": "integers", } }, "algorithm": "legacy/echo/1", "parameters": {}, "environment": {"name": "Python 2.7", "version": "1.3.0"}, "outputs": { "out": { "path": "20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681", "endpoint": "out", "hash": "2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681", "channel": "integers", } }, "nb_slots": 1, "channel": "integers", } # ---------------------------------------------------------- CONFIGURATION2 = { "queue": "queue", "inputs": { "in": { "set": "double", "protocol": "double", "database": "integers_db/1", "output": "a", "path": "ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55.db", "endpoint": "a", "hash": "ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55", "channel": "integers", } }, "algorithm": "legacy/echo/1", "parameters": {}, "environment": {"name": "Python 2.7", "version": "1.3.0"}, "outputs": { "out": { "path": "40/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681", "endpoint": "out", "hash": "4061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681", "channel": "integers", } }, "nb_slots": 1, "channel": "integers", } # ---------------------------------------------------------- class ZMQBrokerProcess(multiprocessing.Process): def __init__(self, port, verbose, callbacks=None): super(ZMQBrokerProcess, self).__init__() self.port = port self.verbose = verbose self.callbacks = callbacks def run(self): return broker.run(self.port, verbose=self.verbose, callbacks=self.callbacks) class ZMQWorkerProcess(multiprocessing.Process): def __init__( self, address, name, verbose, use_docker=False, docker_images_cache=None ): super(ZMQWorkerProcess, self).__init__() self.broker_address = address self.service_name = name self.verbose = verbose self.use_docker = use_docker self.docker_images_cache = None def run(self): return worker.run( self.broker_address, service_name=self.service_name, verbose=self.verbose, prefix=prefix, cache=tmp_prefix, use_docker=self.use_docker, docker_images_cache=self.docker_images_cache, ) # ---------------------------------------------------------- class ExcecutionTestCase(unittest.TestCase): def prepare_databases(self, configuration): for _, input_cfg in configuration["inputs"].items(): database = Database(prefix, input_cfg["database"]) view = database.view(input_cfg["protocol"], input_cfg["set"]) view.index(os.path.join(tmp_prefix, input_cfg["path"])) class TestBroker(unittest.TestCase): def __on_ready(self, name): self.queue.put("ready") def __on_gone(self, name): self.queue.put("gone") def setUp(self): self.queue = multiprocessing.Queue() def test_callback(self): worker_name = b"test_worker" port = find_free_port() broker_address = "tcp://localhost:{}".format(port) broker_p = ZMQBrokerProcess( port, VERBOSE_BCP_LOGGING, (self.__on_ready, self.__on_gone) ) broker_p.start() worker = ZMQWorkerProcess(broker_address, worker_name, VERBOSE_BCP_LOGGING) worker.start() worker.join(2) # Give the worker enough time to announce itself worker.terminate() worker.join() max_rounds = 5 queue_messages = [] while max_rounds > 0 and len(queue_messages) < 2: max_rounds -= 1 try: message = self.queue.get(block=True, timeout=5) except queue.Empty: pass else: queue_messages.append(message) broker_p.terminate() broker_p.join() self.assertEqual(len(queue_messages), 2) self.assertEqual(queue_messages[0], "ready") self.assertEqual(queue_messages[1], "gone") class TestBCP(ExcecutionTestCase): use_docker = False docker_images_cache = None def setUp(self): self.worker_name = b"test_worker" port = find_free_port() broker_address = "tcp://localhost:{}".format(port) self.broker_p = ZMQBrokerProcess(port, VERBOSE_BCP_LOGGING) self.broker_p.start() self.worker = ZMQWorkerProcess( broker_address, self.worker_name, VERBOSE_BCP_LOGGING, self.use_docker, self.docker_images_cache, ) self.worker.start() self.client = BeatComputationClient(broker_address, VERBOSE_BCP_LOGGING) def tearDown(self): self.worker.terminate() self.worker.join() self.broker_p.terminate() self.broker_p.join() self.client = None def test_cancel_unknown(self): request = [BCP.BCPE_CANCEL, b"1"] self.client.send(self.worker_name, request) reply = None while reply is None: try: reply = self.client.recv() except KeyboardInterrupt: break self.assertEqual(reply[1], BCP.BCPP_ERROR) self.assertEqual(reply[2], b"Unknown job: 1") def test_cancel(self): self.prepare_databases(CONFIGURATION1) job_id = b"1" request = [BCP.BCPE_EXECUTE, job_id, json.dumps(CONFIGURATION1).encode("utf-8")] self.client.send(self.worker_name, request) request = [BCP.BCPE_CANCEL, job_id] self.client.send(self.worker_name, request) messages = [] while len(messages) < 3: try: reply = self.client.recv() except KeyboardInterrupt: break else: if reply: messages.append(reply) self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED) self.assertEqual(messages[0][2], job_id) self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED) self.assertEqual(messages[1][2], job_id) self.assertEqual(messages[2][1], BCP.BCPP_JOB_CANCELLED) self.assertEqual(messages[2][2], job_id) def test_execute(self): self.prepare_databases(CONFIGURATION1) job_id = b"1" request = [BCP.BCPE_EXECUTE, job_id, json.dumps(CONFIGURATION1).encode("utf-8")] self.client.send(self.worker_name, request) messages = [] while len(messages) < 3: try: reply = self.client.recv() except KeyboardInterrupt: break else: if reply: messages.append(reply) self.assertEqual(messages[0][1], BCP.BCPP_JOB_RECEIVED) self.assertEqual(messages[0][2], job_id) self.assertEqual(messages[1][1], BCP.BCPP_JOB_STARTED) self.assertEqual(messages[1][2], job_id) self.assertEqual(messages[2][1], BCP.BCPP_JOB_DONE) self.assertEqual(messages[2][2], job_id) self.assertEqual(len(messages[2]), 4) class TestBCPDocker(TestBCP): use_docker = True @classmethod def setUpClass(cls): cls.images_cache = os.path.join(tmp_prefix, "docker_images_cache.json") cls.host = Host(images_cache=cls.docker_images_cache, raise_on_errors=False) class TestExcecutionProcess(ExcecutionTestCase): address = "ipc://execution_feed" def tearDown(self): os.remove(self.address.split("//")[1]) def setup_process(self): self.prepare_databases(CONFIGURATION1) process = ExecutionProcess( self.address, b"1", prefix, CONFIGURATION1, tmp_prefix, VERBOSE_BCP_LOGGING ) process.start() return process def test_execution_process(self): ctx = zmq.Context() socket = ctx.socket(zmq.ROUTER) socket.linger = 0 socket.bind(self.address) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) process = self.setup_process() done = False messages = [] while True: try: items = poller.poll(1000) except KeyboardInterrupt: break if items: msg = socket.recv_multipart() messages.append(msg) result = msg[1] if result == BCP.BCPP_JOB_DONE: done = True break elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]: break process.terminate() process.join() ctx.destroy() self.assertTrue(done) self.assertEqual(process.queue.get(), "started") self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE) def test_processor(self): poller = zmq.Poller() processor = BeatComputationProcessor(poller, self.address, VERBOSE_BCP_LOGGING) process = self.setup_process() done = False messages = [] while True: try: items = poller.poll(1000) except KeyboardInterrupt: break if items: msg = processor.process() messages.append(msg) result = msg[1] if result == BCP.BCPP_JOB_DONE: done = True break elif result in [BCP.BCPP_JOB_ERROR, BCP.BCPP_ERROR]: break process.terminate() process.join() self.assertTrue(done) self.assertEqual(process.queue.get(), "started") self.assertEqual(messages[0][1], BCP.BCPP_JOB_DONE)