helpers.py 11.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
import os
30
import errno
31
32
33
34

import logging
logger = logging.getLogger(__name__)

35
36
37
38
39
40
41
from . import data
from . import inputs
from . import outputs



def convert_experiment_configuration_to_container(config, proxy_mode):
Philip ABBET's avatar
Philip ABBET committed
42
43
    data = {
        'proxy_mode': proxy_mode,
44
45
46
47
        'algorithm': config['algorithm'],
        'parameters': config['parameters'],
        'channel': config['channel'],
        'uid': os.getuid(),
Philip ABBET's avatar
Philip ABBET committed
48
    }
49

Philip ABBET's avatar
Philip ABBET committed
50
51
    if 'range' in config:
        data['range'] = config['range']
52

Philip ABBET's avatar
Philip ABBET committed
53
54
    data['inputs'] = \
          dict([(k, { 'channel': v['channel'], 'path': v['path'], 'database': v.has_key('database') }) for k,v in config['inputs'].items()])
55

Philip ABBET's avatar
Philip ABBET committed
56
57
58
59
60
    if 'outputs' in config:
        data['outputs'] = \
              dict([(k, { 'channel': v['channel'], 'path': v['path'] }) for k,v in config['outputs'].items()])
    else:
        data['result'] = { 'channel': config['channel'], 'path': config['result']['path'] }
61

Philip ABBET's avatar
Philip ABBET committed
62
    return data
63
64
65
66
67


#----------------------------------------------------------


68
class AccessMode:
Philip ABBET's avatar
Philip ABBET committed
69
70
71
    NONE   = 0
    LOCAL  = 1
    REMOTE = 2
72
73
74


def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
75
76
77
                                     cache_access=AccessMode.NONE,
                                     db_access=AccessMode.NONE,
                                     unpack=True, socket=None,
78
79
                                     databases=None,
                                     no_synchronisation_listeners=False):
80

Philip ABBET's avatar
Philip ABBET committed
81
82
83
    data_sources = []
    views = {}
    input_list = inputs.InputList()
84

Philip ABBET's avatar
Philip ABBET committed
85
86
    # This is used for parallelization purposes
    start_index, end_index = config.get('range', (None, None))
87

Philip ABBET's avatar
Philip ABBET committed
88
    for name, details in config['inputs'].items():
89

Philip ABBET's avatar
Philip ABBET committed
90
91
92
93
        if details.get('database', False):
            if db_access == AccessMode.LOCAL:
                if databases is None:
                    raise IOError("No databases provided")
94

Philip ABBET's avatar
Philip ABBET committed
95
96
97
98
99
                # Retrieve the database
                try:
                    db = databases[details['database']]
                except:
                    raise IOError("Database '%s' not found" % details['database'])
100

Philip ABBET's avatar
Philip ABBET committed
101
102
                # Create of retrieve the database view
                channel = details['channel']
103

Philip ABBET's avatar
Philip ABBET committed
104
105
106
107
108
                if not views.has_key(channel):
                    view = db.view(details['protocol'], details['set'])
                    view.prepare_outputs()
                    view.setup()
                    views[channel] = view
109

Philip ABBET's avatar
Philip ABBET committed
110
111
112
113
114
                    logger.debug("Database view '%s/%s/%s' created: group='%s'" % \
                                    (details['database'], details['protocol'], details['set'],
                                     channel))
                else:
                    view = views[channel]
115

Philip ABBET's avatar
Philip ABBET committed
116
117
                # Creation of the input
                data_source = data.MemoryDataSource(view.done, next_callback=view.next)
118

Philip ABBET's avatar
Philip ABBET committed
119
120
                output = view.outputs[details['output']]
                output.data_sink.data_sources.append(data_source)
121

Philip ABBET's avatar
Philip ABBET committed
122
                input = inputs.Input(name, algorithm.input_map[name], data_source)
123

Philip ABBET's avatar
Philip ABBET committed
124
125
126
                logger.debug("Input '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'" % \
                                (name, channel, algorithm.input_map[name], details['database'],
                                 details['protocol'], details['set'], details['output']))
127

Philip ABBET's avatar
Philip ABBET committed
128
129
130
            elif db_access == AccessMode.REMOTE:
                if socket is None:
                    raise IOError("No socket provided for remote inputs")
131

Philip ABBET's avatar
Philip ABBET committed
132
133
                input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
                                           socket, unpack=unpack)
134

Philip ABBET's avatar
Philip ABBET committed
135
136
                logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s', connected to a database" % \
                                (name, details['channel'], algorithm.input_map[name]))
137

Philip ABBET's avatar
Philip ABBET committed
138
139
140
        elif cache_access == AccessMode.LOCAL:
            data_source = data.CachedDataSource()
            data_sources.append(data_source)
141

Philip ABBET's avatar
Philip ABBET committed
142
            filename = os.path.join(cache_root, details['path'] + '.data')
143

Philip ABBET's avatar
Philip ABBET committed
144
145
146
147
148
149
150
151
152
153
154
155
156
157
            if details['channel'] == config['channel']: # synchronized
                status = data_source.setup(
                          filename=filename,
                          prefix=prefix,
                          force_start_index=start_index,
                          force_end_index=end_index,
                          unpack=True,
                         )
            else:
                status = data_source.setup(
                          filename=filename,
                          prefix=prefix,
                          unpack=True,
                         )
158

Philip ABBET's avatar
Philip ABBET committed
159
160
            if not status:
                raise IOError("cannot load cache file `%s'" % details['path'])
161

Philip ABBET's avatar
Philip ABBET committed
162
            input = inputs.Input(name, algorithm.input_map[name], data_source)
163

Philip ABBET's avatar
Philip ABBET committed
164
165
            logger.debug("Input '%s' created: group='%s', dataformat='%s', filename='%s'" % \
                            (name, details['channel'], algorithm.input_map[name], filename))
166

Philip ABBET's avatar
Philip ABBET committed
167
168
169
        elif cache_access == AccessMode.REMOTE:
            if socket is None:
                raise IOError("No socket provided for remote inputs")
170

Philip ABBET's avatar
Philip ABBET committed
171
172
            input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
                                       socket, unpack=unpack)
173

Philip ABBET's avatar
Philip ABBET committed
174
175
            logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s'" % \
                            (name, details['channel'], algorithm.input_map[name]))
176

Philip ABBET's avatar
Philip ABBET committed
177
178
        else:
            continue
179

Philip ABBET's avatar
Philip ABBET committed
180
181
182
        # Synchronization bits
        group = input_list.group(details['channel'])
        if group is None:
183
184
185
186
            synchronization_listener = None
            if not no_synchronisation_listeners:
                synchronization_listener = outputs.SynchronizationListener()

Philip ABBET's avatar
Philip ABBET committed
187
188
            group = inputs.InputGroup(
                      details['channel'],
189
                      synchronization_listener=synchronization_listener,
Philip ABBET's avatar
Philip ABBET committed
190
191
192
193
                      restricted_access=(details['channel'] == config['channel'])
                    )
            input_list.add(group)
            logger.debug("Group '%s' created" % details['channel'])
194

Philip ABBET's avatar
Philip ABBET committed
195
        group.add(input)
196

Philip ABBET's avatar
Philip ABBET committed
197
    return (input_list, data_sources)
198
199
200
201
202
203
204



#----------------------------------------------------------


def create_outputs_from_configuration(config, algorithm, prefix, cache_root, input_list,
205
                                      cache_access=AccessMode.NONE, socket=None):
206

Philip ABBET's avatar
Philip ABBET committed
207
208
    data_sinks = []
    output_list = outputs.OutputList()
209

Philip ABBET's avatar
Philip ABBET committed
210
211
    # This is used for parallelization purposes
    start_index, end_index = config.get('range', (None, None))
212

Philip ABBET's avatar
Philip ABBET committed
213
214
215
216
217
218
219
    # If the algorithm is an analyser
    if 'result' in config:
        output_config = {
            'result': config['result']
        }
    else:
        output_config = config['outputs']
220
221


Philip ABBET's avatar
Philip ABBET committed
222
    for name, details in output_config.items():
223

224
225
        synchronization_listener = None

Philip ABBET's avatar
Philip ABBET committed
226
227
228
229
230
231
        if 'result' in config:
            dataformat_name = 'analysis:' + algorithm.name
            dataformat = algorithm.result_dataformat()
        else:
            dataformat_name = algorithm.output_map[name]
            dataformat =  algorithm.dataformats[dataformat_name]
232

233
234
235
            input_group = input_list.group(details['channel'])
            if input_group is not None:
                synchronization_listener = input_group.synchronization_listener
236

Philip ABBET's avatar
Philip ABBET committed
237
        if cache_access == AccessMode.LOCAL:
238

Philip ABBET's avatar
Philip ABBET committed
239
240
241
242
243
244
245
246
247
248
            path = os.path.join(cache_root, details['path'] + '.data')
            dirname = os.path.dirname(path)
            # Make sure that the directory exists while taking care of race
            # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
            try:
                if (len(dirname) > 0):
                    os.makedirs(dirname)
            except OSError as exception:
                if exception.errno != errno.EEXIST:
                    raise
249

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
            if start_index is None:
                for k, v in config['inputs'].items():
                    if v['channel'] == config['channel']:
                        input_path = os.path.join(cache_root, v['path'] + '.data')
                        break

                (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
                        data.getAllFilenames(input_path)

                end_indices = [ int(x.split('.')[-2]) for x in indices_filenames ]
                end_indices.sort()

                start_index = 0
                end_index = end_indices[-1]

Philip ABBET's avatar
Philip ABBET committed
265
266
            data_sink = data.CachedDataSink()
            data_sinks.append(data_sink)
267

Philip ABBET's avatar
Philip ABBET committed
268
269
270
            status = data_sink.setup(
                filename=path,
                dataformat=dataformat,
271
272
273
                start_index=start_index,
                end_index=end_index,
                encoding='binary'
Philip ABBET's avatar
Philip ABBET committed
274
            )
275

Philip ABBET's avatar
Philip ABBET committed
276
277
            if not status:
                raise IOError("Cannot create cache sink '%s'" % details['path'])
278

Philip ABBET's avatar
Philip ABBET committed
279
280
            output_list.add(outputs.Output(name, data_sink,
                synchronization_listener=synchronization_listener,
281
                force_start_index=start_index)
Philip ABBET's avatar
Philip ABBET committed
282
            )
283

Philip ABBET's avatar
Philip ABBET committed
284
285
286
287
288
289
            if 'result' not in config:
                logger.debug("Output '%s' created: group='%s', dataformat='%s', filename='%s'" % \
                                (name, details['channel'], dataformat_name, path))
            else:
                logger.debug("Output '%s' created: dataformat='%s', filename='%s'" % \
                                (name, dataformat_name, path))
290

Philip ABBET's avatar
Philip ABBET committed
291
292
293
        elif cache_access == AccessMode.REMOTE:
            if socket is None:
                raise IOError("No socket provided for remote outputs")
294

295
296
297
298
            output_list.add(outputs.RemoteOutput(name, dataformat, socket,
                synchronization_listener=synchronization_listener,
                force_start_index=start_index or 0)
            )
299

Philip ABBET's avatar
Philip ABBET committed
300
301
            logger.debug("RemoteOutput '%s' created: group='%s', dataformat='%s'" % \
                            (name, details['channel'], dataformat_name))
302

Philip ABBET's avatar
Philip ABBET committed
303
304
        else:
            continue
305

Philip ABBET's avatar
Philip ABBET committed
306
    return (output_list, data_sinks)