test_dbexecution.py 5.46 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import logging
logger = logging.getLogger(__name__)

# in case you want to see the printouts dynamically, set to ``True``
if False:
Philip ABBET's avatar
Philip ABBET committed
37
38
39
40
41
42
    logger = logging.getLogger() #root logger
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
    logger.addHandler(ch)
43
44

import unittest
Philip ABBET's avatar
Philip ABBET committed
45
import zmq
46
47
48

from ..dbexecution import DBExecutor
from ..inputs import RemoteInput
49
from ..inputs import InputGroup
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from ..database import Database

from . import prefix


CONFIGURATION = {
  'queue': 'queue',
  'inputs': {
    'a': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'a',
      'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'endpoint': 'a',
      'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55',
      'channel': 'integers'
    },
    'b': {
      'set': 'double',
      'protocol': 'double',
      'database': 'integers_db/1',
      'output': 'b',
      'path': '6f/b6/66/68e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8',
      'endpoint': 'b',
      'hash': '6fb66668e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8',
      'channel': 'integers'
    }
  },
  'algorithm': 'user/sum/1',
  'parameters': {},
  'environment': {
82
83
    'name': 'Python 2.7',
    'version': '1.1.0'
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
  },
  'outputs': {
    'sum': {
      'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'endpoint': 'sum',
      'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681',
      'channel': 'integers'
    }
  },
  'nb_slots': 1,
  'channel': 'integers'
}


class HostSide(object):

Philip ABBET's avatar
Philip ABBET committed
100
    def __init__(self, zmq_context):
101

Philip ABBET's avatar
Philip ABBET committed
102
103
104
105
106
        # 0MQ server
        self.socket = zmq_context.socket(zmq.PAIR)
        self.address = 'tcp://127.0.0.1'
        port = self.socket.bind_to_random_port(self.address, min_port=50000)
        self.address += ':%d' % port
107

Philip ABBET's avatar
Philip ABBET committed
108
        database = Database(prefix, 'integers_db/1')
109

Philip ABBET's avatar
Philip ABBET committed
110
111
112
113
        # Creation of the inputs
        input_a_conf = CONFIGURATION['inputs']['a']
        dataformat_name_a = database.set(input_a_conf['protocol'], input_a_conf['set'])['outputs']['a']
        self.input_a = RemoteInput('a', database.dataformats[dataformat_name_a], self.socket)
114

Philip ABBET's avatar
Philip ABBET committed
115
116
117
        input_b_conf = CONFIGURATION['inputs']['b']
        dataformat_name_b = database.set(input_b_conf['protocol'], input_b_conf['set'])['outputs']['b']
        self.input_b = RemoteInput('b', database.dataformats[dataformat_name_b], self.socket)
118

Philip ABBET's avatar
Philip ABBET committed
119
120
121
        self.group = InputGroup('integers', restricted_access=False)
        self.group.add(self.input_a)
        self.group.add(self.input_b)
122
123
124
125
126



class ContainerSide(object):

Philip ABBET's avatar
Philip ABBET committed
127
    def __init__(self, zmq_context, address):
128

Philip ABBET's avatar
Philip ABBET committed
129
130
        dataformat_cache = {}
        database_cache = {}
131

Philip ABBET's avatar
Philip ABBET committed
132
133
134
        self.dbexecutor = DBExecutor(prefix, CONFIGURATION,
            dataformat_cache, database_cache)
        assert self.dbexecutor.valid, '\n  * %s' % '\n  * '.join(self.dbexecutor.errors)
135

Philip ABBET's avatar
Philip ABBET committed
136
137
        self.socket = zmq_context.socket(zmq.PAIR)
        self.socket.connect(address)
138

Philip ABBET's avatar
Philip ABBET committed
139
140
        with self.dbexecutor:
            self.dbexecutor.process(zmq_context, self.socket)
141
142


Philip ABBET's avatar
Philip ABBET committed
143
144
    def wait(self):
        self.dbexecutor.wait()
145
146
147
148
149



class TestExecution(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
150
    def test_success(self):
151

Philip ABBET's avatar
Philip ABBET committed
152
        context = zmq.Context()
153

Philip ABBET's avatar
Philip ABBET committed
154
155
        host = HostSide(context)
        container = ContainerSide(context, host.address)
156

Philip ABBET's avatar
Philip ABBET committed
157
158
        while host.group.hasMoreData():
            host.group.next()
159

Philip ABBET's avatar
Philip ABBET committed
160
        host.socket.send('don')
161

Philip ABBET's avatar
Philip ABBET committed
162
        container.wait()
163