test_message_handler.py 7.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import logging
logger = logging.getLogger(__name__)

import unittest
36
import zmq
37 38
import nose.tools

Philip ABBET's avatar
Philip ABBET committed
39
from ..message_handler import MessageHandler
40 41
from ..dataformat import DataFormat
from ..inputs import RemoteInput
42
from ..inputs import RemoteException
43 44 45 46 47
from ..inputs import Input
from ..inputs import InputGroup
from ..inputs import InputList

from .mocks import MockDataSource
48
from .mocks import MockDataSource_Crash
49 50 51 52 53 54 55

from . import prefix



class TestMessageHandler(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
56 57
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
58

Philip ABBET's avatar
Philip ABBET committed
59 60 61 62 63 64 65 66 67
        data_source_a = MockDataSource([
            dataformat.type(value=10),
            dataformat.type(value=20),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
68

Philip ABBET's avatar
Philip ABBET committed
69
        input_a = Input('a', 'user/single_integer/1', data_source_a)
70

Philip ABBET's avatar
Philip ABBET committed
71 72 73 74 75 76 77 78 79
        data_source_b = MockDataSource([
            dataformat.type(value=100),
            dataformat.type(value=200),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
80

Philip ABBET's avatar
Philip ABBET committed
81
        input_b = Input('b', 'user/single_integer/1', data_source_b)
82

83
        group = InputGroup('channel', restricted_access=False)
Philip ABBET's avatar
Philip ABBET committed
84 85
        group.add(input_a)
        group.add(input_b)
86

Philip ABBET's avatar
Philip ABBET committed
87 88
        self.input_list = InputList()
        self.input_list.add(group)
89 90


Philip ABBET's avatar
Philip ABBET committed
91 92 93 94 95
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
96

Philip ABBET's avatar
Philip ABBET committed
97
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
98 99


Philip ABBET's avatar
Philip ABBET committed
100 101 102
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
103

Philip ABBET's avatar
Philip ABBET committed
104 105
        self.remote_input_a = RemoteInput('a', dataformat, client_socket)
        self.remote_input_b = RemoteInput('b', dataformat, client_socket)
106

Philip ABBET's avatar
Philip ABBET committed
107 108 109
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input_a)
        self.remote_group.add(self.remote_input_b)
110

Philip ABBET's avatar
Philip ABBET committed
111 112
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
113

Philip ABBET's avatar
Philip ABBET committed
114
        self.message_handler.start()
115 116


Philip ABBET's avatar
Philip ABBET committed
117 118 119 120
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
121 122


Philip ABBET's avatar
Philip ABBET committed
123 124
    def test_input_has_more_data(self):
        assert self.remote_input_a.hasMoreData()
125 126


Philip ABBET's avatar
Philip ABBET committed
127 128 129
    def test_input_next(self):
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
130 131


Philip ABBET's avatar
Philip ABBET committed
132 133 134 135
    def test_input_full_cycle(self):
        assert self.remote_input_a.hasMoreData()
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
136

137
        assert self.remote_input_a.hasDataChanged()
Philip ABBET's avatar
Philip ABBET committed
138
        assert self.remote_input_a.hasMoreData()
139
        assert self.remote_input_a.isDataUnitDone()
Philip ABBET's avatar
Philip ABBET committed
140 141
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
142

143
        assert self.remote_input_a.hasDataChanged()
Philip ABBET's avatar
Philip ABBET committed
144
        assert not self.remote_input_a.hasMoreData()
145
        assert self.remote_input_a.isDataUnitDone()
146 147


Philip ABBET's avatar
Philip ABBET committed
148 149
    def test_group_has_more_data(self):
        assert self.remote_group.hasMoreData()
150 151


Philip ABBET's avatar
Philip ABBET committed
152 153 154 155
    def test_group_next(self):
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
156 157


Philip ABBET's avatar
Philip ABBET committed
158 159 160 161 162
    def test_group_full_cycle(self):
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
163

Philip ABBET's avatar
Philip ABBET committed
164 165 166 167
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
        nose.tools.eq_(self.remote_input_b.data.value, 200)
168

Philip ABBET's avatar
Philip ABBET committed
169
        assert not self.remote_group.hasMoreData()
170 171 172 173 174 175 176


#----------------------------------------------------------


class TestMessageHandlerErrorHandling(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
177 178
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
179

Philip ABBET's avatar
Philip ABBET committed
180
        data_source = MockDataSource_Crash()
181

Philip ABBET's avatar
Philip ABBET committed
182
        input = Input('in', 'user/single_integer/1', data_source)
183

Philip ABBET's avatar
Philip ABBET committed
184 185
        group = InputGroup('channel')
        group.add(input)
186

Philip ABBET's avatar
Philip ABBET committed
187 188
        self.input_list = InputList()
        self.input_list.add(group)
189 190


Philip ABBET's avatar
Philip ABBET committed
191 192 193 194 195
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
196

Philip ABBET's avatar
Philip ABBET committed
197
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
198 199


Philip ABBET's avatar
Philip ABBET committed
200 201 202
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
203

Philip ABBET's avatar
Philip ABBET committed
204
        self.remote_input = RemoteInput('in', dataformat, client_socket)
205

Philip ABBET's avatar
Philip ABBET committed
206 207
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input)
208

Philip ABBET's avatar
Philip ABBET committed
209 210
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
211

Philip ABBET's avatar
Philip ABBET committed
212
        self.message_handler.start()
213 214


Philip ABBET's avatar
Philip ABBET committed
215 216 217 218
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
219 220


Philip ABBET's avatar
Philip ABBET committed
221 222
    def test_input_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_input.hasMoreData)
223 224


Philip ABBET's avatar
Philip ABBET committed
225 226
    def test_input_next(self):
        self.assertRaises(RemoteException, self.remote_input.next)
227 228


Philip ABBET's avatar
Philip ABBET committed
229 230
    def test_group_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_group.hasMoreData)
231 232


Philip ABBET's avatar
Philip ABBET committed
233 234
    def test_group_next(self):
        self.assertRaises(RemoteException, self.remote_group.next)