test_message_handler.py 7.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import logging
logger = logging.getLogger(__name__)

# in case you want to see the printouts dynamically, set to ``True``
if False:
Philip ABBET's avatar
Philip ABBET committed
37
38
39
40
41
42
    logger = logging.getLogger() #root logger
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
    logger.addHandler(ch)
43
44

import unittest
Philip ABBET's avatar
Philip ABBET committed
45
import zmq
46
47
48
49
50
import nose.tools

from ..agent import MessageHandler
from ..dataformat import DataFormat
from ..inputs import RemoteInput
51
from ..inputs import RemoteException
52
53
54
55
56
from ..inputs import Input
from ..inputs import InputGroup
from ..inputs import InputList

from .mocks import MockDataSource
57
from .mocks import MockDataSource_Crash
58
59
60
61
62
63
64

from . import prefix



class TestMessageHandler(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
65
66
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
67

Philip ABBET's avatar
Philip ABBET committed
68
69
70
71
72
73
74
75
76
        data_source_a = MockDataSource([
            dataformat.type(value=10),
            dataformat.type(value=20),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
77

Philip ABBET's avatar
Philip ABBET committed
78
        input_a = Input('a', 'user/single_integer/1', data_source_a)
79

Philip ABBET's avatar
Philip ABBET committed
80
81
82
83
84
85
86
87
88
        data_source_b = MockDataSource([
            dataformat.type(value=100),
            dataformat.type(value=200),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
89

Philip ABBET's avatar
Philip ABBET committed
90
        input_b = Input('b', 'user/single_integer/1', data_source_b)
91

92
        group = InputGroup('channel', restricted_access=False)
Philip ABBET's avatar
Philip ABBET committed
93
94
        group.add(input_a)
        group.add(input_b)
95

Philip ABBET's avatar
Philip ABBET committed
96
97
        self.input_list = InputList()
        self.input_list.add(group)
98
99


Philip ABBET's avatar
Philip ABBET committed
100
101
102
103
104
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
105

Philip ABBET's avatar
Philip ABBET committed
106
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
107
108


Philip ABBET's avatar
Philip ABBET committed
109
110
111
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
112

Philip ABBET's avatar
Philip ABBET committed
113
114
        self.remote_input_a = RemoteInput('a', dataformat, client_socket)
        self.remote_input_b = RemoteInput('b', dataformat, client_socket)
115

Philip ABBET's avatar
Philip ABBET committed
116
117
118
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input_a)
        self.remote_group.add(self.remote_input_b)
119

Philip ABBET's avatar
Philip ABBET committed
120
121
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
122

Philip ABBET's avatar
Philip ABBET committed
123
        self.message_handler.start()
124
125


Philip ABBET's avatar
Philip ABBET committed
126
127
128
129
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
Philip ABBET's avatar
Philip ABBET committed
130
131


Philip ABBET's avatar
Philip ABBET committed
132
133
    def test_input_has_more_data(self):
        assert self.remote_input_a.hasMoreData()
134
135


Philip ABBET's avatar
Philip ABBET committed
136
137
138
    def test_input_next(self):
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
139
140


Philip ABBET's avatar
Philip ABBET committed
141
142
143
144
    def test_input_full_cycle(self):
        assert self.remote_input_a.hasMoreData()
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
145

146
        assert self.remote_input_a.hasDataChanged()
Philip ABBET's avatar
Philip ABBET committed
147
        assert self.remote_input_a.hasMoreData()
148
        assert self.remote_input_a.isDataUnitDone()
Philip ABBET's avatar
Philip ABBET committed
149
150
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
151

152
        assert self.remote_input_a.hasDataChanged()
Philip ABBET's avatar
Philip ABBET committed
153
        assert not self.remote_input_a.hasMoreData()
154
        assert self.remote_input_a.isDataUnitDone()
155
156


Philip ABBET's avatar
Philip ABBET committed
157
158
    def test_group_has_more_data(self):
        assert self.remote_group.hasMoreData()
159
160


Philip ABBET's avatar
Philip ABBET committed
161
162
163
164
    def test_group_next(self):
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
165
166


Philip ABBET's avatar
Philip ABBET committed
167
168
169
170
171
    def test_group_full_cycle(self):
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
172

Philip ABBET's avatar
Philip ABBET committed
173
174
175
176
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
        nose.tools.eq_(self.remote_input_b.data.value, 200)
177

Philip ABBET's avatar
Philip ABBET committed
178
        assert not self.remote_group.hasMoreData()
179
180
181
182
183
184
185


#----------------------------------------------------------


class TestMessageHandlerErrorHandling(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
186
187
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
188

Philip ABBET's avatar
Philip ABBET committed
189
        data_source = MockDataSource_Crash()
190

Philip ABBET's avatar
Philip ABBET committed
191
        input = Input('in', 'user/single_integer/1', data_source)
192

Philip ABBET's avatar
Philip ABBET committed
193
194
        group = InputGroup('channel')
        group.add(input)
195

Philip ABBET's avatar
Philip ABBET committed
196
197
        self.input_list = InputList()
        self.input_list.add(group)
198
199


Philip ABBET's avatar
Philip ABBET committed
200
201
202
203
204
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
205

Philip ABBET's avatar
Philip ABBET committed
206
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
207
208


Philip ABBET's avatar
Philip ABBET committed
209
210
211
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
212

Philip ABBET's avatar
Philip ABBET committed
213
        self.remote_input = RemoteInput('in', dataformat, client_socket)
214

Philip ABBET's avatar
Philip ABBET committed
215
216
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input)
217

Philip ABBET's avatar
Philip ABBET committed
218
219
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
220

Philip ABBET's avatar
Philip ABBET committed
221
        self.message_handler.start()
222
223


Philip ABBET's avatar
Philip ABBET committed
224
225
226
227
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
Philip ABBET's avatar
Philip ABBET committed
228
229


Philip ABBET's avatar
Philip ABBET committed
230
231
    def test_input_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_input.hasMoreData)
232
233


Philip ABBET's avatar
Philip ABBET committed
234
235
    def test_input_next(self):
        self.assertRaises(RemoteException, self.remote_input.next)
236
237


Philip ABBET's avatar
Philip ABBET committed
238
239
    def test_group_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_group.hasMoreData)
240
241


Philip ABBET's avatar
Philip ABBET committed
242
243
    def test_group_next(self):
        self.assertRaises(RemoteException, self.remote_group.next)