#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.core module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### # Tests for experiment execution import os import logging logger = logging.getLogger(__name__) # in case you want to see the printouts dynamically, set to ``True`` if False: logger = logging.getLogger() #root logger logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) logger.addHandler(ch) import unittest import zmq from ..dbexecution import DBExecutor from ..inputs import RemoteInput from ..inputs import InputGroup from ..database import Database from . import prefix CONFIGURATION = { 'queue': 'queue', 'inputs': { 'a': { 'set': 'double', 'protocol': 'double', 'database': 'integers_db/1', 'output': 'a', 'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55', 'endpoint': 'a', 'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55', 'channel': 'integers' }, 'b': { 'set': 'double', 'protocol': 'double', 'database': 'integers_db/1', 'output': 'b', 'path': '6f/b6/66/68e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8', 'endpoint': 'b', 'hash': '6fb66668e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8', 'channel': 'integers' } }, 'algorithm': 'user/sum/1', 'parameters': {}, 'environment': { 'name': 'Python 2.7', 'version': '1.1.0' }, 'outputs': { 'sum': { 'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681', 'endpoint': 'sum', 'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681', 'channel': 'integers' } }, 'nb_slots': 1, 'channel': 'integers' } class HostSide(object): def __init__(self, zmq_context): # 0MQ server self.socket = zmq_context.socket(zmq.PAIR) self.address = 'tcp://127.0.0.1' port = self.socket.bind_to_random_port(self.address, min_port=50000) self.address += ':%d' % port database = Database(prefix, 'integers_db/1') # Creation of the inputs input_a_conf = CONFIGURATION['inputs']['a'] dataformat_name_a = database.set(input_a_conf['protocol'], input_a_conf['set'])['outputs']['a'] self.input_a = RemoteInput('a', database.dataformats[dataformat_name_a], self.socket) input_b_conf = CONFIGURATION['inputs']['b'] dataformat_name_b = database.set(input_b_conf['protocol'], input_b_conf['set'])['outputs']['b'] self.input_b = RemoteInput('b', database.dataformats[dataformat_name_b], self.socket) self.group = InputGroup('integers', restricted_access=False) self.group.add(self.input_a) self.group.add(self.input_b) class ContainerSide(object): def __init__(self, zmq_context, address): dataformat_cache = {} database_cache = {} self.dbexecutor = DBExecutor(prefix, CONFIGURATION, dataformat_cache, database_cache) assert self.dbexecutor.valid, '\n * %s' % '\n * '.join(self.dbexecutor.errors) self.socket = zmq_context.socket(zmq.PAIR) self.socket.connect(address) with self.dbexecutor: self.dbexecutor.process(zmq_context, self.socket) def wait(self): self.dbexecutor.wait() class TestExecution(unittest.TestCase): def test_success(self): context = zmq.Context() host = HostSide(context) container = ContainerSide(context, host.address) while host.group.hasMoreData(): host.group.next() host.socket.send('don') container.wait()