test_dbexecution.py 5.14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import logging
logger = logging.getLogger(__name__)

import unittest
36
import zmq
37 38 39

from ..dbexecution import DBExecutor
from ..inputs import RemoteInput
40
from ..inputs import InputGroup
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
from ..database import Database

from . import prefix


CONFIGURATION = {
  'queue': 'queue',
  'inputs': {
    'a': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    },
    'b': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'b',
      'path': '6f/b6/66/68e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8',
      'endpoint': 'b',
      'hash': '6fb66668e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/sum/1',
  'parameters': {},
  'environment': {
73
    'name': 'Python 2.7',
Philip ABBET's avatar
Philip ABBET committed
74
    'version': '1.2.0'
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  },
  'outputs': {
    'sum': {
      'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'sum',
      'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


class HostSide(object):

Philip ABBET's avatar
Philip ABBET committed
91
    def __init__(self, zmq_context):
92

Philip ABBET's avatar
Philip ABBET committed
93 94 95 96 97
        # 0MQ server
        self.socket = zmq_context.socket(zmq.PAIR)
        self.address = 'tcp://127.0.0.1'
        port = self.socket.bind_to_random_port(self.address, min_port=50000)
        self.address += ':%d' % port
98

Philip ABBET's avatar
Philip ABBET committed
99
        database = Database(prefix, 'integers_db/1')
100

Philip ABBET's avatar
Philip ABBET committed
101 102 103 104
        # Creation of the inputs
        input_a_conf = CONFIGURATION['inputs']['a']
        dataformat_name_a = database.set(input_a_conf['protocol'], input_a_conf['set'])['outputs']['a']
        self.input_a = RemoteInput('a', database.dataformats[dataformat_name_a], self.socket)
105

Philip ABBET's avatar
Philip ABBET committed
106 107 108
        input_b_conf = CONFIGURATION['inputs']['b']
        dataformat_name_b = database.set(input_b_conf['protocol'], input_b_conf['set'])['outputs']['b']
        self.input_b = RemoteInput('b', database.dataformats[dataformat_name_b], self.socket)
109

Philip ABBET's avatar
Philip ABBET committed
110 111 112
        self.group = InputGroup('integers', restricted_access=False)
        self.group.add(self.input_a)
        self.group.add(self.input_b)
113 114 115 116 117



class ContainerSide(object):

Philip ABBET's avatar
Philip ABBET committed
118
    def __init__(self, zmq_context, address):
119

Philip ABBET's avatar
Philip ABBET committed
120 121
        dataformat_cache = {}
        database_cache = {}
122

Philip ABBET's avatar
Philip ABBET committed
123 124 125
        self.dbexecutor = DBExecutor(prefix, CONFIGURATION,
            dataformat_cache, database_cache)
        assert self.dbexecutor.valid, '\n  * %s' % '\n  * '.join(self.dbexecutor.errors)
126

Philip ABBET's avatar
Philip ABBET committed
127 128
        self.socket = zmq_context.socket(zmq.PAIR)
        self.socket.connect(address)
129

Philip ABBET's avatar
Philip ABBET committed
130 131
        with self.dbexecutor:
            self.dbexecutor.process(zmq_context, self.socket)
132 133


Philip ABBET's avatar
Philip ABBET committed
134 135
    def wait(self):
        self.dbexecutor.wait()
136 137 138 139 140



class TestExecution(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
141
    def test_success(self):
142

Philip ABBET's avatar
Philip ABBET committed
143
        context = zmq.Context()
144

Philip ABBET's avatar
Philip ABBET committed
145 146
        host = HostSide(context)
        container = ContainerSide(context, host.address)
147

Philip ABBET's avatar
Philip ABBET committed
148 149
        while host.group.hasMoreData():
            host.group.next()
150

Philip ABBET's avatar
Philip ABBET committed
151
        host.socket.send('don')
152

Philip ABBET's avatar
Philip ABBET committed
153
        container.wait()
154