helpers.py 17 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

36

37
38
39
40
41
42
43
"""
=======
helpers
=======

This module implements various helper methods and classes
"""
44

45
import os
46
import errno
47
48
import logging

49
from .data import CachedDataSource
50
from .data import RemoteDataSource
51
52
53
54
55
56
57
58
59
60
61
from .data import CachedDataSink
from .data import getAllFilenames
from .data_loaders import DataLoaderList
from .data_loaders import DataLoader
from .inputs import InputList
from .inputs import Input
from .inputs import InputGroup
from .outputs import SynchronizationListener
from .outputs import OutputList
from .outputs import Output
from .algorithm import Algorithm
62

63
64
logger = logging.getLogger(__name__)

65

66
# ----------------------------------------------------------
67

Samuel GAIST's avatar
Samuel GAIST committed
68

69
70
71
def parse_inputs(inputs):
    data = {}
    for key, value in inputs.items():
Samuel GAIST's avatar
Samuel GAIST committed
72
73
        data[key] = dict(channel=value["channel"], path=value["path"])
        if "database" in value:
74
            db = dict(
Samuel GAIST's avatar
Samuel GAIST committed
75
76
77
78
79
                database=value["database"],
                protocol=value["protocol"],
                set=value["set"],
                output=value["output"],
            )
80
81
82
            data[key].update(db)
    return data

Samuel GAIST's avatar
Samuel GAIST committed
83

84
85
def convert_loop_to_container(config):
    data = {
Samuel GAIST's avatar
Samuel GAIST committed
86
87
88
89
        "algorithm": config["algorithm"],
        "parameters": config["parameters"],
        "channel": config["channel"],
        "uid": os.getuid(),
90
91
    }

Samuel GAIST's avatar
Samuel GAIST committed
92
    data["inputs"] = parse_inputs(config["inputs"])
93
94
95
96

    return data


97
def convert_experiment_configuration_to_container(config):
Philip ABBET's avatar
Philip ABBET committed
98
    data = {
Samuel GAIST's avatar
Samuel GAIST committed
99
100
101
102
        "algorithm": config["algorithm"],
        "parameters": config["parameters"],
        "channel": config["channel"],
        "uid": os.getuid(),
Philip ABBET's avatar
Philip ABBET committed
103
    }
104

Samuel GAIST's avatar
Samuel GAIST committed
105
106
    if "range" in config:
        data["range"] = config["range"]
107

Samuel GAIST's avatar
Samuel GAIST committed
108
    data["inputs"] = parse_inputs(config["inputs"])
109

Samuel GAIST's avatar
Samuel GAIST committed
110
111
112
113
114
115
116
    if "outputs" in config:
        data["outputs"] = dict(
            [
                (k, {"channel": v["channel"], "path": v["path"]})
                for k, v in config["outputs"].items()
            ]
        )
Philip ABBET's avatar
Philip ABBET committed
117
    else:
Samuel GAIST's avatar
Samuel GAIST committed
118
119
120
121
        data["result"] = {
            "channel": config["channel"],
            "path": config["result"]["path"],
        }
122

Samuel GAIST's avatar
Samuel GAIST committed
123
124
    if "loop" in config:
        data["loop"] = convert_loop_to_container(config["loop"])
125

Philip ABBET's avatar
Philip ABBET committed
126
    return data
127
128


129
# ----------------------------------------------------------
130
131


132
class AccessMode:
133
    """Possible access modes"""
Samuel GAIST's avatar
Samuel GAIST committed
134
135
136

    NONE = 0
    LOCAL = 1
Philip ABBET's avatar
Philip ABBET committed
137
    REMOTE = 2
138
139


Samuel GAIST's avatar
Samuel GAIST committed
140
141
142
143
144
145
146
147
148
149
150
151
def create_inputs_from_configuration(
    config,
    algorithm,
    prefix,
    cache_root,
    cache_access=AccessMode.NONE,
    db_access=AccessMode.NONE,
    unpack=True,
    socket=None,
    databases=None,
    no_synchronisation_listeners=False,
):
152

Philip ABBET's avatar
Philip ABBET committed
153
    views = {}
154
155
    input_list = InputList()
    data_loader_list = DataLoaderList()
156

Philip ABBET's avatar
Philip ABBET committed
157
    # This is used for parallelization purposes
Samuel GAIST's avatar
Samuel GAIST committed
158
    start_index, end_index = config.get("range", (None, None))
159
160

    def _create_local_input(details):
161
        data_source = CachedDataSource()
162

Samuel GAIST's avatar
Samuel GAIST committed
163
        filename = os.path.join(cache_root, details["path"] + ".data")
164

Samuel GAIST's avatar
Samuel GAIST committed
165
        if details["channel"] == config["channel"]:  # synchronized
166
            status = data_source.setup(
Samuel GAIST's avatar
Samuel GAIST committed
167
168
169
170
171
172
                filename=filename,
                prefix=prefix,
                start_index=start_index,
                end_index=end_index,
                unpack=True,
            )
173
        else:
Samuel GAIST's avatar
Samuel GAIST committed
174
            status = data_source.setup(filename=filename, prefix=prefix, unpack=True)
175
176

        if not status:
Samuel GAIST's avatar
Samuel GAIST committed
177
            raise IOError("cannot load cache file `%s'" % details["path"])
178
179
180

        input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
181
182
183
184
        logger.debug(
            "Input '%s' created: group='%s', dataformat='%s', filename='%s'"
            % (name, details["channel"], algorithm.input_map[name], filename)
        )
185
186
187

        return input

188
    def _get_data_loader_for(details):
Samuel GAIST's avatar
Samuel GAIST committed
189
        data_loader = data_loader_list[details["channel"]]
190
        if data_loader is None:
Samuel GAIST's avatar
Samuel GAIST committed
191
            data_loader = DataLoader(details["channel"])
192
193
            data_loader_list.add(data_loader)

Samuel GAIST's avatar
Samuel GAIST committed
194
            logger.debug("Data loader created: group='%s'" % details["channel"])
195

196
197
198
199
200
        return data_loader

    def _create_data_source(details):
        data_loader = _get_data_loader_for(details)

Samuel GAIST's avatar
Samuel GAIST committed
201
        filename = os.path.join(cache_root, details["path"] + ".data")
202
203
204

        data_source = CachedDataSource()
        result = data_source.setup(
205
206
207
208
209
210
211
212
            filename=filename,
            prefix=prefix,
            start_index=start_index,
            end_index=end_index,
            unpack=True,
        )

        if not result:
213
            raise IOError("cannot load cache file `%s'" % filename)
214

215
        data_loader.add(name, data_source)
216

Samuel GAIST's avatar
Samuel GAIST committed
217
218
219
220
        logger.debug(
            "Input '%s' added to data loader: group='%s', dataformat='%s', filename='%s'"
            % (name, details["channel"], algorithm.input_map[name], filename)
        )
221

Samuel GAIST's avatar
Samuel GAIST committed
222
    for name, details in config["inputs"].items():
223

224
225
        input = None

Samuel GAIST's avatar
Samuel GAIST committed
226
        if details.get("database", None) is not None:
Philip ABBET's avatar
Philip ABBET committed
227
228
229
            if db_access == AccessMode.LOCAL:
                if databases is None:
                    raise IOError("No databases provided")
230

Philip ABBET's avatar
Philip ABBET committed
231
232
                # Retrieve the database
                try:
Samuel GAIST's avatar
Samuel GAIST committed
233
234
235
                    db = databases[details["database"]]
                except IndexError:
                    raise IOError("Database '%s' not found" % details["database"])
236

Philip ABBET's avatar
Philip ABBET committed
237
                # Create of retrieve the database view
Samuel GAIST's avatar
Samuel GAIST committed
238
                channel = details["channel"]
239

240
                if channel not in views:
Samuel GAIST's avatar
Samuel GAIST committed
241
242
243
244
245
246
247
                    view = db.view(details["protocol"], details["set"])
                    view.setup(
                        os.path.join(cache_root, details["path"]),
                        pack=False,
                        start_index=start_index,
                        end_index=end_index,
                    )
248

Philip ABBET's avatar
Philip ABBET committed
249
                    views[channel] = view
250

Samuel GAIST's avatar
Samuel GAIST committed
251
252
253
254
255
256
257
258
259
                    logger.debug(
                        "Database view '%s/%s/%s' created: group='%s'"
                        % (
                            details["database"],
                            details["protocol"],
                            details["set"],
                            channel,
                        )
                    )
Philip ABBET's avatar
Philip ABBET committed
260
261
                else:
                    view = views[channel]
262

Samuel GAIST's avatar
Samuel GAIST committed
263
                data_source = view.data_sources[details["output"]]
264

Samuel GAIST's avatar
Samuel GAIST committed
265
                if (algorithm.type == Algorithm.LEGACY) or (
266
                    (algorithm.is_sequential)
Samuel GAIST's avatar
Samuel GAIST committed
267
268
                    and (details["channel"] == config["channel"])
                ):
269
270
                    input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
271
272
273
274
275
276
277
278
279
280
281
282
                    logger.debug(
                        "Input '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'"
                        % (
                            name,
                            details["channel"],
                            algorithm.input_map[name],
                            details["database"],
                            details["protocol"],
                            details["set"],
                            details["output"],
                        )
                    )
283
284
285
                else:
                    data_loader = _get_data_loader_for(details)
                    data_loader.add(name, data_source)
286

Samuel GAIST's avatar
Samuel GAIST committed
287
288
289
290
291
292
293
294
295
296
297
298
                    logger.debug(
                        "DatabaseOutputDataSource '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'"
                        % (
                            name,
                            channel,
                            algorithm.input_map[name],
                            details["database"],
                            details["protocol"],
                            details["set"],
                            details["output"],
                        )
                    )
299

Philip ABBET's avatar
Philip ABBET committed
300
301
            elif db_access == AccessMode.REMOTE:
                if socket is None:
302
303
304
305
306
307
308
309
                    raise IOError("No socket provided for remote data sources")

                data_source = RemoteDataSource()
                result = data_source.setup(
                    socket=socket,
                    input_name=name,
                    dataformat_name=algorithm.input_map[name],
                    prefix=prefix,
Samuel GAIST's avatar
Samuel GAIST committed
310
                    unpack=True,
311
312
313
314
                )

                if not result:
                    raise IOError("cannot setup remote data source '%s'" % name)
315

Samuel GAIST's avatar
Samuel GAIST committed
316
                if (algorithm.type == Algorithm.LEGACY) or (
317
                    (algorithm.is_sequential)
Samuel GAIST's avatar
Samuel GAIST committed
318
319
                    and (details["channel"] == config["channel"])
                ):
320
321
                    input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
322
323
324
325
326
327
328
329
330
                    logger.debug(
                        "Input '%s' created: group='%s', dataformat='%s', database-file='%s'"
                        % (
                            name,
                            details["channel"],
                            algorithm.input_map[name],
                            details["path"],
                        )
                    )
331
332
333
334
                else:
                    data_loader = _get_data_loader_for(details)
                    data_loader.add(name, data_source)

Samuel GAIST's avatar
Samuel GAIST committed
335
336
337
338
                    logger.debug(
                        "RemoteDataSource '%s' created: group='%s', dataformat='%s', connected to a database"
                        % (name, details["channel"], algorithm.input_map[name])
                    )
339

Philip ABBET's avatar
Philip ABBET committed
340
        elif cache_access == AccessMode.LOCAL:
341

342
343
            if algorithm.type == Algorithm.LEGACY:
                input = _create_local_input(details)
344

345
            elif algorithm.is_sequential:
Samuel GAIST's avatar
Samuel GAIST committed
346
                if details["channel"] == config["channel"]:  # synchronized
347
348
                    input = _create_local_input(details)
                else:
349
                    _create_data_source(details)
350

Samuel GAIST's avatar
Samuel GAIST committed
351
            else:  # Algorithm autonomous types
352
                _create_data_source(details)
353

Philip ABBET's avatar
Philip ABBET committed
354
355
        else:
            continue
356

Philip ABBET's avatar
Philip ABBET committed
357
        # Synchronization bits
358
        if input is not None:
Samuel GAIST's avatar
Samuel GAIST committed
359
            group = input_list.group(details["channel"])
360
361
362
363
            if group is None:
                synchronization_listener = None
                if not no_synchronisation_listeners:
                    synchronization_listener = SynchronizationListener()
364

365
                group = InputGroup(
Samuel GAIST's avatar
Samuel GAIST committed
366
367
368
369
                    details["channel"],
                    synchronization_listener=synchronization_listener,
                    restricted_access=(details["channel"] == config["channel"]),
                )
370
                input_list.add(group)
Samuel GAIST's avatar
Samuel GAIST committed
371
                logger.debug("Group '%s' created" % details["channel"])
372

373
            group.add(input)
374

375
    return (input_list, data_loader_list)
376
377


378
# ----------------------------------------------------------
379
380


Samuel GAIST's avatar
Samuel GAIST committed
381
382
383
def create_outputs_from_configuration(
    config, algorithm, prefix, cache_root, input_list=None, data_loaders=None
):
384

Philip ABBET's avatar
Philip ABBET committed
385
    data_sinks = []
386
    output_list = OutputList()
387

Philip ABBET's avatar
Philip ABBET committed
388
    # This is used for parallelization purposes
Samuel GAIST's avatar
Samuel GAIST committed
389
    start_index, end_index = config.get("range", (None, None))
390

Philip ABBET's avatar
Philip ABBET committed
391
    # If the algorithm is an analyser
Samuel GAIST's avatar
Samuel GAIST committed
392
393
    if "result" in config:
        output_config = {"result": config["result"]}
Philip ABBET's avatar
Philip ABBET committed
394
    else:
Samuel GAIST's avatar
Samuel GAIST committed
395
        output_config = config["outputs"]
396

Philip ABBET's avatar
Philip ABBET committed
397
    for name, details in output_config.items():
398

399
400
        synchronization_listener = None

Samuel GAIST's avatar
Samuel GAIST committed
401
402
        if "result" in config:
            dataformat_name = "analysis:" + algorithm.name
Philip ABBET's avatar
Philip ABBET committed
403
404
405
            dataformat = algorithm.result_dataformat()
        else:
            dataformat_name = algorithm.output_map[name]
Samuel GAIST's avatar
Samuel GAIST committed
406
            dataformat = algorithm.dataformats[dataformat_name]
407

408
        if input_list is not None:
Samuel GAIST's avatar
Samuel GAIST committed
409
            input_group = input_list.group(config["channel"])
410
411
            if input_group is not None:
                synchronization_listener = input_group.synchronization_listener
412

Samuel GAIST's avatar
Samuel GAIST committed
413
        path = os.path.join(cache_root, details["path"] + ".data")
414
415
416
417
        dirname = os.path.dirname(path)
        # Make sure that the directory exists while taking care of race
        # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
        try:
Samuel GAIST's avatar
Samuel GAIST committed
418
            if len(dirname) > 0:
419
420
421
422
423
424
                os.makedirs(dirname)
        except OSError as exception:
            if exception.errno != errno.EEXIST:
                raise

        if start_index is None:
425
426
            input_path = None

Samuel GAIST's avatar
Samuel GAIST committed
427
428
            for k, v in config["inputs"].items():
                if v["channel"] != config["channel"]:
429
430
                    continue

Samuel GAIST's avatar
Samuel GAIST committed
431
432
                if "database" not in v:
                    input_path = os.path.join(cache_root, v["path"] + ".data")
433
434
                    break

435
            if input_path is not None:
Samuel GAIST's avatar
Samuel GAIST committed
436
437
438
439
440
441
442
443
                (
                    data_filenames,
                    indices_filenames,
                    data_checksum_filenames,
                    indices_checksum_filenames,
                ) = getAllFilenames(input_path)

                end_indices = [int(x.split(".")[-2]) for x in indices_filenames]
444
445
446
447
448
449
                end_indices.sort()

                start_index = 0
                end_index = end_indices[-1]

            else:
Samuel GAIST's avatar
Samuel GAIST committed
450
451
                for k, v in config["inputs"].items():
                    if v["channel"] != config["channel"]:
452
                        continue
453

454
                    start_index = 0
455

456
457
458
459
460
461
                    if (input_list is not None) and (input_list[k] is not None):
                        end_index = input_list[k].data_source.last_data_index()
                        break
                    elif data_loaders is not None:
                        end_index = data_loaders.main_loader.data_index_end
                        break
462
463
464
465
466
467
468
469
470

        data_sink = CachedDataSink()
        data_sinks.append(data_sink)

        status = data_sink.setup(
            filename=path,
            dataformat=dataformat,
            start_index=start_index,
            end_index=end_index,
Samuel GAIST's avatar
Samuel GAIST committed
471
            encoding="binary",
472
473
474
        )

        if not status:
Samuel GAIST's avatar
Samuel GAIST committed
475
476
477
478
479
480
481
482
483
            raise IOError("Cannot create cache sink '%s'" % details["path"])

        output_list.add(
            Output(
                name,
                data_sink,
                synchronization_listener=synchronization_listener,
                force_start_index=start_index,
            )
484
        )
485

Samuel GAIST's avatar
Samuel GAIST committed
486
487
488
489
490
        if "result" not in config:
            logger.debug(
                "Output '%s' created: group='%s', dataformat='%s', filename='%s'"
                % (name, details["channel"], dataformat_name, path)
            )
Philip ABBET's avatar
Philip ABBET committed
491
        else:
Samuel GAIST's avatar
Samuel GAIST committed
492
493
494
495
            logger.debug(
                "Output '%s' created: dataformat='%s', filename='%s'"
                % (name, dataformat_name, path)
            )
496

Philip ABBET's avatar
Philip ABBET committed
497
    return (output_list, data_sinks)