test_message_handler.py 7.68 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import logging
logger = logging.getLogger(__name__)

# in case you want to see the printouts dynamically, set to ``True``
if False:
Philip ABBET's avatar
Philip ABBET committed
37
38
39
40
41
42
    logger = logging.getLogger() #root logger
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
    logger.addHandler(ch)
43
44

import unittest
Philip ABBET's avatar
Philip ABBET committed
45
import zmq
46
47
48
49
50
import nose.tools

from ..agent import MessageHandler
from ..dataformat import DataFormat
from ..inputs import RemoteInput
51
from ..inputs import RemoteException
52
53
54
55
56
from ..inputs import Input
from ..inputs import InputGroup
from ..inputs import InputList

from .mocks import MockDataSource
57
from .mocks import MockDataSource_Crash
58
59
60
61
62
63
64

from . import prefix



class TestMessageHandler(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
65
66
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
67

Philip ABBET's avatar
Philip ABBET committed
68
69
70
71
72
73
74
75
76
        data_source_a = MockDataSource([
            dataformat.type(value=10),
            dataformat.type(value=20),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
77

Philip ABBET's avatar
Philip ABBET committed
78
        input_a = Input('a', 'user/single_integer/1', data_source_a)
79

Philip ABBET's avatar
Philip ABBET committed
80
81
82
83
84
85
86
87
88
        data_source_b = MockDataSource([
            dataformat.type(value=100),
            dataformat.type(value=200),
          ],
          [
            (0, 0),
            (1, 1),
          ]
        )
89

Philip ABBET's avatar
Philip ABBET committed
90
        input_b = Input('b', 'user/single_integer/1', data_source_b)
91

Philip ABBET's avatar
Philip ABBET committed
92
93
94
        group = InputGroup('channel')
        group.add(input_a)
        group.add(input_b)
95

Philip ABBET's avatar
Philip ABBET committed
96
97
        self.input_list = InputList()
        self.input_list.add(group)
98
99


Philip ABBET's avatar
Philip ABBET committed
100
101
102
103
104
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
105

Philip ABBET's avatar
Philip ABBET committed
106
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
107
108


Philip ABBET's avatar
Philip ABBET committed
109
110
111
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
112

Philip ABBET's avatar
Philip ABBET committed
113
114
        self.remote_input_a = RemoteInput('a', dataformat, client_socket)
        self.remote_input_b = RemoteInput('b', dataformat, client_socket)
115

Philip ABBET's avatar
Philip ABBET committed
116
117
118
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input_a)
        self.remote_group.add(self.remote_input_b)
119

Philip ABBET's avatar
Philip ABBET committed
120
121
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
122

Philip ABBET's avatar
Philip ABBET committed
123
        self.message_handler.start()
124
125


Philip ABBET's avatar
Philip ABBET committed
126
127
128
129
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
Philip ABBET's avatar
Philip ABBET committed
130
131


Philip ABBET's avatar
Philip ABBET committed
132
133
    def test_input_has_more_data(self):
        assert self.remote_input_a.hasMoreData()
134
135


Philip ABBET's avatar
Philip ABBET committed
136
137
138
    def test_input_next(self):
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
139
140


Philip ABBET's avatar
Philip ABBET committed
141
142
143
144
    def test_input_full_cycle(self):
        assert self.remote_input_a.hasMoreData()
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
145

Philip ABBET's avatar
Philip ABBET committed
146
147
148
        assert self.remote_input_a.hasMoreData()
        self.remote_input_a.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
149

Philip ABBET's avatar
Philip ABBET committed
150
        assert not self.remote_input_a.hasMoreData()
151
152


Philip ABBET's avatar
Philip ABBET committed
153
154
    def test_group_has_more_data(self):
        assert self.remote_group.hasMoreData()
155
156


Philip ABBET's avatar
Philip ABBET committed
157
158
159
160
    def test_group_next(self):
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
161
162


Philip ABBET's avatar
Philip ABBET committed
163
164
165
166
167
    def test_group_full_cycle(self):
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 10)
        nose.tools.eq_(self.remote_input_b.data.value, 100)
168

Philip ABBET's avatar
Philip ABBET committed
169
170
171
172
        assert self.remote_group.hasMoreData()
        self.remote_group.next()
        nose.tools.eq_(self.remote_input_a.data.value, 20)
        nose.tools.eq_(self.remote_input_b.data.value, 200)
173

Philip ABBET's avatar
Philip ABBET committed
174
        assert not self.remote_group.hasMoreData()
175
176
177
178
179
180
181


#----------------------------------------------------------


class TestMessageHandlerErrorHandling(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
182
183
    def setUp(self):
        dataformat = DataFormat(prefix, 'user/single_integer/1')
184

Philip ABBET's avatar
Philip ABBET committed
185
        data_source = MockDataSource_Crash()
186

Philip ABBET's avatar
Philip ABBET committed
187
        input = Input('in', 'user/single_integer/1', data_source)
188

Philip ABBET's avatar
Philip ABBET committed
189
190
        group = InputGroup('channel')
        group.add(input)
191

Philip ABBET's avatar
Philip ABBET committed
192
193
        self.input_list = InputList()
        self.input_list.add(group)
194
195


Philip ABBET's avatar
Philip ABBET committed
196
197
198
199
200
        self.server_context = zmq.Context()
        server_socket = self.server_context.socket(zmq.PAIR)
        address = 'tcp://127.0.0.1'
        port = server_socket.bind_to_random_port(address, min_port=50000)
        address += ':%d' % port
201

Philip ABBET's avatar
Philip ABBET committed
202
        self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket)
203
204


Philip ABBET's avatar
Philip ABBET committed
205
206
207
        self.client_context = zmq.Context()
        client_socket = self.client_context.socket(zmq.PAIR)
        client_socket.connect(address)
208

Philip ABBET's avatar
Philip ABBET committed
209
        self.remote_input = RemoteInput('in', dataformat, client_socket)
210

Philip ABBET's avatar
Philip ABBET committed
211
212
        self.remote_group = InputGroup('channel', restricted_access=False)
        self.remote_group.add(self.remote_input)
213

Philip ABBET's avatar
Philip ABBET committed
214
215
        self.remote_input_list = InputList()
        self.remote_input_list.add(self.remote_group)
216

Philip ABBET's avatar
Philip ABBET committed
217
        self.message_handler.start()
218
219


Philip ABBET's avatar
Philip ABBET committed
220
221
222
223
    def tearDown(self):
        self.message_handler.kill()
        self.message_handler.join()
        self.message_handler = None
Philip ABBET's avatar
Philip ABBET committed
224
225


Philip ABBET's avatar
Philip ABBET committed
226
227
    def test_input_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_input.hasMoreData)
228
229


Philip ABBET's avatar
Philip ABBET committed
230
231
    def test_input_next(self):
        self.assertRaises(RemoteException, self.remote_input.next)
232
233


Philip ABBET's avatar
Philip ABBET committed
234
235
    def test_group_has_more_data(self):
        self.assertRaises(RemoteException, self.remote_group.hasMoreData)
236
237


Philip ABBET's avatar
Philip ABBET committed
238
239
    def test_group_next(self):
        self.assertRaises(RemoteException, self.remote_group.next)