helpers.py 17.5 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################

36

37
38
39
40
41
42
43
"""
=======
helpers
=======

This module implements various helper methods and classes
"""
44

45
import os
46
import errno
47
48
import logging

49
from .data import CachedDataSource
50
from .data import RemoteDataSource
51
52
53
54
55
56
57
58
59
60
from .data import CachedDataSink
from .data import getAllFilenames
from .data_loaders import DataLoaderList
from .data_loaders import DataLoader
from .inputs import InputList
from .inputs import Input
from .inputs import InputGroup
from .outputs import SynchronizationListener
from .outputs import OutputList
from .outputs import Output
61
from .outputs import RemotelySyncedOutput
62
from .algorithm import Algorithm
63

64
65
logger = logging.getLogger(__name__)

66

67
# ----------------------------------------------------------
68

Samuel GAIST's avatar
Samuel GAIST committed
69

70
71
72
def parse_inputs(inputs):
    data = {}
    for key, value in inputs.items():
Samuel GAIST's avatar
Samuel GAIST committed
73
74
        data[key] = dict(channel=value["channel"], path=value["path"])
        if "database" in value:
75
            db = dict(
Samuel GAIST's avatar
Samuel GAIST committed
76
77
78
79
80
                database=value["database"],
                protocol=value["protocol"],
                set=value["set"],
                output=value["output"],
            )
81
82
83
            data[key].update(db)
    return data

Samuel GAIST's avatar
Samuel GAIST committed
84

85
86
def convert_loop_to_container(config):
    data = {
Samuel GAIST's avatar
Samuel GAIST committed
87
88
89
90
        "algorithm": config["algorithm"],
        "parameters": config["parameters"],
        "channel": config["channel"],
        "uid": os.getuid(),
91
92
    }

Samuel GAIST's avatar
Samuel GAIST committed
93
    data["inputs"] = parse_inputs(config["inputs"])
94
95
96
97

    return data


98
def convert_experiment_configuration_to_container(config):
Philip ABBET's avatar
Philip ABBET committed
99
    data = {
Samuel GAIST's avatar
Samuel GAIST committed
100
101
102
103
        "algorithm": config["algorithm"],
        "parameters": config["parameters"],
        "channel": config["channel"],
        "uid": os.getuid(),
Philip ABBET's avatar
Philip ABBET committed
104
    }
105

Samuel GAIST's avatar
Samuel GAIST committed
106
107
    if "range" in config:
        data["range"] = config["range"]
108

Samuel GAIST's avatar
Samuel GAIST committed
109
    data["inputs"] = parse_inputs(config["inputs"])
110

Samuel GAIST's avatar
Samuel GAIST committed
111
112
113
114
115
116
117
    if "outputs" in config:
        data["outputs"] = dict(
            [
                (k, {"channel": v["channel"], "path": v["path"]})
                for k, v in config["outputs"].items()
            ]
        )
Philip ABBET's avatar
Philip ABBET committed
118
    else:
Samuel GAIST's avatar
Samuel GAIST committed
119
120
121
122
        data["result"] = {
            "channel": config["channel"],
            "path": config["result"]["path"],
        }
123

Samuel GAIST's avatar
Samuel GAIST committed
124
125
    if "loop" in config:
        data["loop"] = convert_loop_to_container(config["loop"])
126

Philip ABBET's avatar
Philip ABBET committed
127
    return data
128
129


130
# ----------------------------------------------------------
131
132


133
class AccessMode:
134
    """Possible access modes"""
Samuel GAIST's avatar
Samuel GAIST committed
135
136
137

    NONE = 0
    LOCAL = 1
Philip ABBET's avatar
Philip ABBET committed
138
    REMOTE = 2
139
140


Samuel GAIST's avatar
Samuel GAIST committed
141
142
143
144
145
146
147
148
149
150
151
152
def create_inputs_from_configuration(
    config,
    algorithm,
    prefix,
    cache_root,
    cache_access=AccessMode.NONE,
    db_access=AccessMode.NONE,
    unpack=True,
    socket=None,
    databases=None,
    no_synchronisation_listeners=False,
):
153

Philip ABBET's avatar
Philip ABBET committed
154
    views = {}
155
156
    input_list = InputList()
    data_loader_list = DataLoaderList()
157

Philip ABBET's avatar
Philip ABBET committed
158
    # This is used for parallelization purposes
Samuel GAIST's avatar
Samuel GAIST committed
159
    start_index, end_index = config.get("range", (None, None))
160
161

    def _create_local_input(details):
162
        data_source = CachedDataSource()
163

Samuel GAIST's avatar
Samuel GAIST committed
164
        filename = os.path.join(cache_root, details["path"] + ".data")
165

Samuel GAIST's avatar
Samuel GAIST committed
166
        if details["channel"] == config["channel"]:  # synchronized
167
            status = data_source.setup(
Samuel GAIST's avatar
Samuel GAIST committed
168
169
170
171
172
173
                filename=filename,
                prefix=prefix,
                start_index=start_index,
                end_index=end_index,
                unpack=True,
            )
174
        else:
Samuel GAIST's avatar
Samuel GAIST committed
175
            status = data_source.setup(filename=filename, prefix=prefix, unpack=True)
176
177

        if not status:
Samuel GAIST's avatar
Samuel GAIST committed
178
            raise IOError("cannot load cache file `%s'" % details["path"])
179
180
181

        input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
182
183
184
185
        logger.debug(
            "Input '%s' created: group='%s', dataformat='%s', filename='%s'"
            % (name, details["channel"], algorithm.input_map[name], filename)
        )
186
187
188

        return input

189
    def _get_data_loader_for(details):
Samuel GAIST's avatar
Samuel GAIST committed
190
        data_loader = data_loader_list[details["channel"]]
191
        if data_loader is None:
Samuel GAIST's avatar
Samuel GAIST committed
192
            data_loader = DataLoader(details["channel"])
193
194
            data_loader_list.add(data_loader)

Samuel GAIST's avatar
Samuel GAIST committed
195
            logger.debug("Data loader created: group='%s'" % details["channel"])
196

197
198
199
200
201
        return data_loader

    def _create_data_source(details):
        data_loader = _get_data_loader_for(details)

Samuel GAIST's avatar
Samuel GAIST committed
202
        filename = os.path.join(cache_root, details["path"] + ".data")
203
204
205

        data_source = CachedDataSource()
        result = data_source.setup(
206
207
208
209
210
211
212
213
            filename=filename,
            prefix=prefix,
            start_index=start_index,
            end_index=end_index,
            unpack=True,
        )

        if not result:
214
            raise IOError("cannot load cache file `%s'" % filename)
215

216
        data_loader.add(name, data_source)
217

Samuel GAIST's avatar
Samuel GAIST committed
218
219
220
221
        logger.debug(
            "Input '%s' added to data loader: group='%s', dataformat='%s', filename='%s'"
            % (name, details["channel"], algorithm.input_map[name], filename)
        )
222

Samuel GAIST's avatar
Samuel GAIST committed
223
    for name, details in config["inputs"].items():
224

225
226
        input = None

Samuel GAIST's avatar
Samuel GAIST committed
227
        if details.get("database", None) is not None:
Philip ABBET's avatar
Philip ABBET committed
228
229
230
            if db_access == AccessMode.LOCAL:
                if databases is None:
                    raise IOError("No databases provided")
231

Philip ABBET's avatar
Philip ABBET committed
232
233
                # Retrieve the database
                try:
Samuel GAIST's avatar
Samuel GAIST committed
234
235
236
                    db = databases[details["database"]]
                except IndexError:
                    raise IOError("Database '%s' not found" % details["database"])
237

Philip ABBET's avatar
Philip ABBET committed
238
                # Create of retrieve the database view
Samuel GAIST's avatar
Samuel GAIST committed
239
                channel = details["channel"]
240

241
                if channel not in views:
Samuel GAIST's avatar
Samuel GAIST committed
242
243
244
245
246
247
248
                    view = db.view(details["protocol"], details["set"])
                    view.setup(
                        os.path.join(cache_root, details["path"]),
                        pack=False,
                        start_index=start_index,
                        end_index=end_index,
                    )
249

Philip ABBET's avatar
Philip ABBET committed
250
                    views[channel] = view
251

Samuel GAIST's avatar
Samuel GAIST committed
252
253
254
255
256
257
258
259
260
                    logger.debug(
                        "Database view '%s/%s/%s' created: group='%s'"
                        % (
                            details["database"],
                            details["protocol"],
                            details["set"],
                            channel,
                        )
                    )
Philip ABBET's avatar
Philip ABBET committed
261
262
                else:
                    view = views[channel]
263

Samuel GAIST's avatar
Samuel GAIST committed
264
                data_source = view.data_sources[details["output"]]
265

Samuel GAIST's avatar
Samuel GAIST committed
266
                if (algorithm.type == Algorithm.LEGACY) or (
267
                    (algorithm.is_sequential)
Samuel GAIST's avatar
Samuel GAIST committed
268
269
                    and (details["channel"] == config["channel"])
                ):
270
271
                    input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
272
273
274
275
276
277
278
279
280
281
282
283
                    logger.debug(
                        "Input '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'"
                        % (
                            name,
                            details["channel"],
                            algorithm.input_map[name],
                            details["database"],
                            details["protocol"],
                            details["set"],
                            details["output"],
                        )
                    )
284
285
286
                else:
                    data_loader = _get_data_loader_for(details)
                    data_loader.add(name, data_source)
287

Samuel GAIST's avatar
Samuel GAIST committed
288
289
290
291
292
293
294
295
296
297
298
299
                    logger.debug(
                        "DatabaseOutputDataSource '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'"
                        % (
                            name,
                            channel,
                            algorithm.input_map[name],
                            details["database"],
                            details["protocol"],
                            details["set"],
                            details["output"],
                        )
                    )
300

Philip ABBET's avatar
Philip ABBET committed
301
302
            elif db_access == AccessMode.REMOTE:
                if socket is None:
303
304
305
306
307
308
309
310
                    raise IOError("No socket provided for remote data sources")

                data_source = RemoteDataSource()
                result = data_source.setup(
                    socket=socket,
                    input_name=name,
                    dataformat_name=algorithm.input_map[name],
                    prefix=prefix,
Samuel GAIST's avatar
Samuel GAIST committed
311
                    unpack=True,
312
313
314
315
                )

                if not result:
                    raise IOError("cannot setup remote data source '%s'" % name)
316

Samuel GAIST's avatar
Samuel GAIST committed
317
                if (algorithm.type == Algorithm.LEGACY) or (
318
                    (algorithm.is_sequential)
Samuel GAIST's avatar
Samuel GAIST committed
319
320
                    and (details["channel"] == config["channel"])
                ):
321
322
                    input = Input(name, algorithm.input_map[name], data_source)

Samuel GAIST's avatar
Samuel GAIST committed
323
324
325
326
327
328
329
330
331
                    logger.debug(
                        "Input '%s' created: group='%s', dataformat='%s', database-file='%s'"
                        % (
                            name,
                            details["channel"],
                            algorithm.input_map[name],
                            details["path"],
                        )
                    )
332
333
334
335
                else:
                    data_loader = _get_data_loader_for(details)
                    data_loader.add(name, data_source)

Samuel GAIST's avatar
Samuel GAIST committed
336
337
338
339
                    logger.debug(
                        "RemoteDataSource '%s' created: group='%s', dataformat='%s', connected to a database"
                        % (name, details["channel"], algorithm.input_map[name])
                    )
340

Philip ABBET's avatar
Philip ABBET committed
341
        elif cache_access == AccessMode.LOCAL:
342

343
344
            if algorithm.type == Algorithm.LEGACY:
                input = _create_local_input(details)
345

346
            elif algorithm.is_sequential:
Samuel GAIST's avatar
Samuel GAIST committed
347
                if details["channel"] == config["channel"]:  # synchronized
348
349
                    input = _create_local_input(details)
                else:
350
                    _create_data_source(details)
351

Samuel GAIST's avatar
Samuel GAIST committed
352
            else:  # Algorithm autonomous types
353
                _create_data_source(details)
354

Philip ABBET's avatar
Philip ABBET committed
355
356
        else:
            continue
357

Philip ABBET's avatar
Philip ABBET committed
358
        # Synchronization bits
359
        if input is not None:
Samuel GAIST's avatar
Samuel GAIST committed
360
            group = input_list.group(details["channel"])
361
362
363
364
            if group is None:
                synchronization_listener = None
                if not no_synchronisation_listeners:
                    synchronization_listener = SynchronizationListener()
365

366
                group = InputGroup(
Samuel GAIST's avatar
Samuel GAIST committed
367
368
369
370
                    details["channel"],
                    synchronization_listener=synchronization_listener,
                    restricted_access=(details["channel"] == config["channel"]),
                )
371
                input_list.add(group)
Samuel GAIST's avatar
Samuel GAIST committed
372
                logger.debug("Group '%s' created" % details["channel"])
373

374
            group.add(input)
375

376
    return (input_list, data_loader_list)
377
378


379
# ----------------------------------------------------------
380
381


Samuel GAIST's avatar
Samuel GAIST committed
382
def create_outputs_from_configuration(
383
384
385
386
387
388
389
    config,
    algorithm,
    prefix,
    cache_root,
    input_list=None,
    data_loaders=None,
    loop_socket=None,
Samuel GAIST's avatar
Samuel GAIST committed
390
):
391

Philip ABBET's avatar
Philip ABBET committed
392
    data_sinks = []
393
    output_list = OutputList()
394

Philip ABBET's avatar
Philip ABBET committed
395
    # This is used for parallelization purposes
Samuel GAIST's avatar
Samuel GAIST committed
396
    start_index, end_index = config.get("range", (None, None))
397

Philip ABBET's avatar
Philip ABBET committed
398
    # If the algorithm is an analyser
Samuel GAIST's avatar
Samuel GAIST committed
399
400
    if "result" in config:
        output_config = {"result": config["result"]}
Philip ABBET's avatar
Philip ABBET committed
401
    else:
Samuel GAIST's avatar
Samuel GAIST committed
402
        output_config = config["outputs"]
403

Philip ABBET's avatar
Philip ABBET committed
404
    for name, details in output_config.items():
405

406
407
        synchronization_listener = None

Samuel GAIST's avatar
Samuel GAIST committed
408
409
        if "result" in config:
            dataformat_name = "analysis:" + algorithm.name
Philip ABBET's avatar
Philip ABBET committed
410
411
412
            dataformat = algorithm.result_dataformat()
        else:
            dataformat_name = algorithm.output_map[name]
Samuel GAIST's avatar
Samuel GAIST committed
413
            dataformat = algorithm.dataformats[dataformat_name]
414

415
        if input_list is not None:
Samuel GAIST's avatar
Samuel GAIST committed
416
            input_group = input_list.group(config["channel"])
417
418
            if input_group is not None:
                synchronization_listener = input_group.synchronization_listener
419

Samuel GAIST's avatar
Samuel GAIST committed
420
        path = os.path.join(cache_root, details["path"] + ".data")
421
422
423
424
        dirname = os.path.dirname(path)
        # Make sure that the directory exists while taking care of race
        # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
        try:
Samuel GAIST's avatar
Samuel GAIST committed
425
            if len(dirname) > 0:
426
427
428
429
430
431
                os.makedirs(dirname)
        except OSError as exception:
            if exception.errno != errno.EEXIST:
                raise

        if start_index is None:
432
433
            input_path = None

Samuel GAIST's avatar
Samuel GAIST committed
434
435
            for k, v in config["inputs"].items():
                if v["channel"] != config["channel"]:
436
437
                    continue

Samuel GAIST's avatar
Samuel GAIST committed
438
439
                if "database" not in v:
                    input_path = os.path.join(cache_root, v["path"] + ".data")
440
441
                    break

442
            if input_path is not None:
Samuel GAIST's avatar
Samuel GAIST committed
443
444
445
446
447
448
449
450
                (
                    data_filenames,
                    indices_filenames,
                    data_checksum_filenames,
                    indices_checksum_filenames,
                ) = getAllFilenames(input_path)

                end_indices = [int(x.split(".")[-2]) for x in indices_filenames]
451
452
453
454
455
456
                end_indices.sort()

                start_index = 0
                end_index = end_indices[-1]

            else:
Samuel GAIST's avatar
Samuel GAIST committed
457
458
                for k, v in config["inputs"].items():
                    if v["channel"] != config["channel"]:
459
                        continue
460

461
                    start_index = 0
462

463
464
465
466
467
468
                    if (input_list is not None) and (input_list[k] is not None):
                        end_index = input_list[k].data_source.last_data_index()
                        break
                    elif data_loaders is not None:
                        end_index = data_loaders.main_loader.data_index_end
                        break
469
470
471
472
473
474
475
476
477

        data_sink = CachedDataSink()
        data_sinks.append(data_sink)

        status = data_sink.setup(
            filename=path,
            dataformat=dataformat,
            start_index=start_index,
            end_index=end_index,
Samuel GAIST's avatar
Samuel GAIST committed
478
            encoding="binary",
479
480
481
        )

        if not status:
Samuel GAIST's avatar
Samuel GAIST committed
482
483
            raise IOError("Cannot create cache sink '%s'" % details["path"])

484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
        if loop_socket is not None:
            output_list.add(
                RemotelySyncedOutput(
                    name,
                    data_sink,
                    loop_socket,
                    synchronization_listener=synchronization_listener,
                    force_start_index=start_index,
                )
            )
        else:
            output_list.add(
                Output(
                    name,
                    data_sink,
                    synchronization_listener=synchronization_listener,
                    force_start_index=start_index,
                )
Samuel GAIST's avatar
Samuel GAIST committed
502
            )
503

Samuel GAIST's avatar
Samuel GAIST committed
504
505
506
507
508
        if "result" not in config:
            logger.debug(
                "Output '%s' created: group='%s', dataformat='%s', filename='%s'"
                % (name, details["channel"], dataformat_name, path)
            )
Philip ABBET's avatar
Philip ABBET committed
509
        else:
Samuel GAIST's avatar
Samuel GAIST committed
510
511
512
513
            logger.debug(
                "Output '%s' created: dataformat='%s', filename='%s'"
                % (name, dataformat_name, path)
            )
514

Philip ABBET's avatar
Philip ABBET committed
515
    return (output_list, data_sinks)