helpers.py 11 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
import os
30
import errno
31
32
33
34

import logging
logger = logging.getLogger(__name__)

35
36
37
38
39
40
41
from . import data
from . import inputs
from . import outputs



def convert_experiment_configuration_to_container(config, proxy_mode):
Philip ABBET's avatar
Philip ABBET committed
42
43
44
45
46
47
48
    data = {
        'proxy_mode': proxy_mode,
      'algorithm': config['algorithm'],
      'parameters': config['parameters'],
      'channel': config['channel'],
      'uid': os.getuid(),
    }
49

Philip ABBET's avatar
Philip ABBET committed
50
51
    if 'range' in config:
        data['range'] = config['range']
52

Philip ABBET's avatar
Philip ABBET committed
53
54
    data['inputs'] = \
          dict([(k, { 'channel': v['channel'], 'path': v['path'], 'database': v.has_key('database') }) for k,v in config['inputs'].items()])
55

Philip ABBET's avatar
Philip ABBET committed
56
57
58
59
60
    if 'outputs' in config:
        data['outputs'] = \
              dict([(k, { 'channel': v['channel'], 'path': v['path'] }) for k,v in config['outputs'].items()])
    else:
        data['result'] = { 'channel': config['channel'], 'path': config['result']['path'] }
61

Philip ABBET's avatar
Philip ABBET committed
62
    return data
63
64
65
66
67


#----------------------------------------------------------


68
class AccessMode:
Philip ABBET's avatar
Philip ABBET committed
69
70
71
    NONE   = 0
    LOCAL  = 1
    REMOTE = 2
72
73
74


def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
75
76
77
78
                                     cache_access=AccessMode.NONE,
                                     db_access=AccessMode.NONE,
                                     unpack=True, socket=None,
                                     databases=None):
79

Philip ABBET's avatar
Philip ABBET committed
80
81
82
    data_sources = []
    views = {}
    input_list = inputs.InputList()
83

Philip ABBET's avatar
Philip ABBET committed
84
85
    # This is used for parallelization purposes
    start_index, end_index = config.get('range', (None, None))
86

Philip ABBET's avatar
Philip ABBET committed
87
    for name, details in config['inputs'].items():
88

Philip ABBET's avatar
Philip ABBET committed
89
90
91
92
        if details.get('database', False):
            if db_access == AccessMode.LOCAL:
                if databases is None:
                    raise IOError("No databases provided")
93

Philip ABBET's avatar
Philip ABBET committed
94
95
96
97
98
                # Retrieve the database
                try:
                    db = databases[details['database']]
                except:
                    raise IOError("Database '%s' not found" % details['database'])
99

Philip ABBET's avatar
Philip ABBET committed
100
101
                # Create of retrieve the database view
                channel = details['channel']
102

Philip ABBET's avatar
Philip ABBET committed
103
104
105
106
107
                if not views.has_key(channel):
                    view = db.view(details['protocol'], details['set'])
                    view.prepare_outputs()
                    view.setup()
                    views[channel] = view
108

Philip ABBET's avatar
Philip ABBET committed
109
110
111
112
113
                    logger.debug("Database view '%s/%s/%s' created: group='%s'" % \
                                    (details['database'], details['protocol'], details['set'],
                                     channel))
                else:
                    view = views[channel]
114

Philip ABBET's avatar
Philip ABBET committed
115
116
                # Creation of the input
                data_source = data.MemoryDataSource(view.done, next_callback=view.next)
117

Philip ABBET's avatar
Philip ABBET committed
118
119
                output = view.outputs[details['output']]
                output.data_sink.data_sources.append(data_source)
120

Philip ABBET's avatar
Philip ABBET committed
121
                input = inputs.Input(name, algorithm.input_map[name], data_source)
122

Philip ABBET's avatar
Philip ABBET committed
123
124
125
                logger.debug("Input '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'" % \
                                (name, channel, algorithm.input_map[name], details['database'],
                                 details['protocol'], details['set'], details['output']))
126

Philip ABBET's avatar
Philip ABBET committed
127
128
129
            elif db_access == AccessMode.REMOTE:
                if socket is None:
                    raise IOError("No socket provided for remote inputs")
130

Philip ABBET's avatar
Philip ABBET committed
131
132
                input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
                                           socket, unpack=unpack)
133

Philip ABBET's avatar
Philip ABBET committed
134
135
                logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s', connected to a database" % \
                                (name, details['channel'], algorithm.input_map[name]))
136

Philip ABBET's avatar
Philip ABBET committed
137
138
139
        elif cache_access == AccessMode.LOCAL:
            data_source = data.CachedDataSource()
            data_sources.append(data_source)
140

Philip ABBET's avatar
Philip ABBET committed
141
            filename = os.path.join(cache_root, details['path'] + '.data')
142

Philip ABBET's avatar
Philip ABBET committed
143
144
145
146
147
148
149
150
151
152
153
154
155
156
            if details['channel'] == config['channel']: # synchronized
                status = data_source.setup(
                          filename=filename,
                          prefix=prefix,
                          force_start_index=start_index,
                          force_end_index=end_index,
                          unpack=True,
                         )
            else:
                status = data_source.setup(
                          filename=filename,
                          prefix=prefix,
                          unpack=True,
                         )
157

Philip ABBET's avatar
Philip ABBET committed
158
159
            if not status:
                raise IOError("cannot load cache file `%s'" % details['path'])
160

Philip ABBET's avatar
Philip ABBET committed
161
            input = inputs.Input(name, algorithm.input_map[name], data_source)
162

Philip ABBET's avatar
Philip ABBET committed
163
164
            logger.debug("Input '%s' created: group='%s', dataformat='%s', filename='%s'" % \
                            (name, details['channel'], algorithm.input_map[name], filename))
165

Philip ABBET's avatar
Philip ABBET committed
166
167
168
        elif cache_access == AccessMode.REMOTE:
            if socket is None:
                raise IOError("No socket provided for remote inputs")
169

Philip ABBET's avatar
Philip ABBET committed
170
171
            input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
                                       socket, unpack=unpack)
172

Philip ABBET's avatar
Philip ABBET committed
173
174
            logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s'" % \
                            (name, details['channel'], algorithm.input_map[name]))
175

Philip ABBET's avatar
Philip ABBET committed
176
177
        else:
            continue
178

Philip ABBET's avatar
Philip ABBET committed
179
180
181
182
183
184
185
186
187
188
        # Synchronization bits
        group = input_list.group(details['channel'])
        if group is None:
            group = inputs.InputGroup(
                      details['channel'],
                      synchronization_listener=outputs.SynchronizationListener(),
                      restricted_access=(details['channel'] == config['channel'])
                    )
            input_list.add(group)
            logger.debug("Group '%s' created" % details['channel'])
189

Philip ABBET's avatar
Philip ABBET committed
190
        group.add(input)
191

Philip ABBET's avatar
Philip ABBET committed
192
    return (input_list, data_sources)
193
194
195
196
197
198
199



#----------------------------------------------------------


def create_outputs_from_configuration(config, algorithm, prefix, cache_root, input_list,
200
                                      cache_access=AccessMode.NONE, socket=None):
201

Philip ABBET's avatar
Philip ABBET committed
202
203
    data_sinks = []
    output_list = outputs.OutputList()
204

Philip ABBET's avatar
Philip ABBET committed
205
206
    # This is used for parallelization purposes
    start_index, end_index = config.get('range', (None, None))
207

Philip ABBET's avatar
Philip ABBET committed
208
209
210
211
212
213
214
    # If the algorithm is an analyser
    if 'result' in config:
        output_config = {
            'result': config['result']
        }
    else:
        output_config = config['outputs']
215
216


Philip ABBET's avatar
Philip ABBET committed
217
    for name, details in output_config.items():
218

Philip ABBET's avatar
Philip ABBET committed
219
220
221
222
223
224
        if 'result' in config:
            dataformat_name = 'analysis:' + algorithm.name
            dataformat = algorithm.result_dataformat()
        else:
            dataformat_name = algorithm.output_map[name]
            dataformat =  algorithm.dataformats[dataformat_name]
225
226


Philip ABBET's avatar
Philip ABBET committed
227
        if cache_access == AccessMode.LOCAL:
228

Philip ABBET's avatar
Philip ABBET committed
229
230
231
232
233
234
235
236
237
238
            path = os.path.join(cache_root, details['path'] + '.data')
            dirname = os.path.dirname(path)
            # Make sure that the directory exists while taking care of race
            # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
            try:
                if (len(dirname) > 0):
                    os.makedirs(dirname)
            except OSError as exception:
                if exception.errno != errno.EEXIST:
                    raise
239

Philip ABBET's avatar
Philip ABBET committed
240
241
            data_sink = data.CachedDataSink()
            data_sinks.append(data_sink)
242

Philip ABBET's avatar
Philip ABBET committed
243
244
245
246
247
248
            status = data_sink.setup(
                filename=path,
                dataformat=dataformat,
                encoding='binary',
                max_size=0, # in bytes, for individual file chunks
            )
249

Philip ABBET's avatar
Philip ABBET committed
250
251
            if not status:
                raise IOError("Cannot create cache sink '%s'" % details['path'])
252
253


Philip ABBET's avatar
Philip ABBET committed
254
            synchronization_listener = None
255

Philip ABBET's avatar
Philip ABBET committed
256
257
258
259
            if 'result' not in config:
                input_group = input_list.group(details['channel'])
                if (input_group is not None) and hasattr(input_group, 'synchronization_listener'):
                    synchronization_listener = input_group.synchronization_listener
260

Philip ABBET's avatar
Philip ABBET committed
261
262
263
264
            output_list.add(outputs.Output(name, data_sink,
                synchronization_listener=synchronization_listener,
                force_start_index=start_index or 0)
            )
265

Philip ABBET's avatar
Philip ABBET committed
266
267
268
269
270
271
            if 'result' not in config:
                logger.debug("Output '%s' created: group='%s', dataformat='%s', filename='%s'" % \
                                (name, details['channel'], dataformat_name, path))
            else:
                logger.debug("Output '%s' created: dataformat='%s', filename='%s'" % \
                                (name, dataformat_name, path))
272

Philip ABBET's avatar
Philip ABBET committed
273
274
275
        elif cache_access == AccessMode.REMOTE:
            if socket is None:
                raise IOError("No socket provided for remote outputs")
276

Philip ABBET's avatar
Philip ABBET committed
277
            output_list.add(outputs.RemoteOutput(name, dataformat, socket))
278

Philip ABBET's avatar
Philip ABBET committed
279
280
            logger.debug("RemoteOutput '%s' created: group='%s', dataformat='%s'" % \
                            (name, details['channel'], dataformat_name))
281

Philip ABBET's avatar
Philip ABBET committed
282
283
        else:
            continue
284

Philip ABBET's avatar
Philip ABBET committed
285
    return (output_list, data_sinks)