base.py 17.4 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
35
36


37
38
39
40
41
42
43
44
45
"""
====
base
====

Execution utilities
"""


46
47
48
49
import os
import glob
import collections
import logging
50
import simplejson as json
51
52
53
54
55
56
57
58
59

from .. import schema
from .. import database
from .. import algorithm
from .. import stats

from beat.backend.python.helpers import convert_experiment_configuration_to_container


Samuel GAIST's avatar
Samuel GAIST committed
60
61
62
logger = logging.getLogger(__name__)


63
class BaseExecutor(object):
Philip ABBET's avatar
Philip ABBET committed
64
    """Executors runs the code given an execution block information
65
66


Philip ABBET's avatar
Philip ABBET committed
67
    Parameters:
68

Philip ABBET's avatar
Philip ABBET committed
69
      prefix (str): Establishes the prefix of your installation.
70

Philip ABBET's avatar
Philip ABBET committed
71
72
73
74
      data (dict, str): The piece of data representing the block to be executed.
        It must validate against the schema defined for execution blocks. If a
        string is passed, it is supposed to be a fully qualified absolute path to
        a JSON file containing the block execution information.
75

André Anjos's avatar
André Anjos committed
76
      cache (:py:class:`str`, Optional): If your cache is not located under
Philip ABBET's avatar
Philip ABBET committed
77
78
        ``<prefix>/cache``, then specify a full path here. It will be used
        instead.
79

André Anjos's avatar
André Anjos committed
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
      dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
        dataformat names to loaded dataformats. This parameter is optional and,
        if passed, may greatly speed-up database loading times as dataformats
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying dataformats change.

      database_cache (:py:class:`dict`, Optional): A dictionary mapping
        database names to loaded databases. This parameter is optional and, if
        passed, may greatly speed-up database loading times as databases that
        are already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying databases change.

      algorithm_cache (:py:class:`dict`, Optional): A dictionary mapping
        algorithm names to loaded algorithms. This parameter is optional and,
        if passed, may greatly speed-up database loading times as algorithms
        that are already loaded may be re-used. If you use this parameter, you
        must guarantee that the cache is refreshed as appropriate in case the
        underlying algorithms change.

      library_cache (:py:class:`dict`, Optional): A dictionary mapping library
        names to loaded libraries. This parameter is optional and, if passed,
        may greatly speed-up library loading times as libraries that are
        already loaded may be re-used. If you use this parameter, you must
        guarantee that the cache is refreshed as appropriate in case the
        underlying libraries change.
107
108


Philip ABBET's avatar
Philip ABBET committed
109
    Attributes:
110

Philip ABBET's avatar
Philip ABBET committed
111
      cache (str): The path to the cache currently being used
112

Philip ABBET's avatar
Philip ABBET committed
113
114
      errors (list): A list containing errors found while loading this execution
        block.
115

Philip ABBET's avatar
Philip ABBET committed
116
117
      data (dict): The original data for this executor, as loaded by our JSON
        decoder.
118

Philip ABBET's avatar
Philip ABBET committed
119
120
      algorithm (beat.core.algorithm.Algorithm): An object representing the
        algorithm to be run.
121

Philip ABBET's avatar
Philip ABBET committed
122
      databases (dict): A dictionary in which keys are strings with database
André Anjos's avatar
André Anjos committed
123
        names and values are :py:class:`.database.Database`, representing the
Philip ABBET's avatar
Philip ABBET committed
124
125
        databases required for running this block. The dictionary may be empty
        in case all inputs are taken from the file cache.
126

Philip ABBET's avatar
Philip ABBET committed
127
128
129
130
      views (dict): A dictionary in which the keys are tuples pointing to the
        ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
        for that particular combination of details. The dictionary may be empty
        in case all inputs are taken from the file cache.
131

132
133
      input_list (beat.backend.python.inputs.InputList): A list of inputs that
        will be served to the algorithm.
134

135
136
      output_list (beat.backend.python.outputs.OutputList): A list of outputs
        that the algorithm will produce.
137

Philip ABBET's avatar
Philip ABBET committed
138
139
      data_sources (list): A list with all data-sources created by our execution
        loader.
140

Philip ABBET's avatar
Philip ABBET committed
141
142
      data_sinks (list): A list with all data-sinks created by our execution
        loader. These are useful for clean-up actions in case of problems.
143

Philip ABBET's avatar
Philip ABBET committed
144
    """
145

Samuel GAIST's avatar
Samuel GAIST committed
146
147
148
149
150
151
152
153
154
155
156
    def __init__(
        self,
        prefix,
        data,
        cache=None,
        dataformat_cache=None,
        database_cache=None,
        algorithm_cache=None,
        library_cache=None,
        custom_root_folders=None,
    ):
Philip ABBET's avatar
Philip ABBET committed
157
158
159

        # Initialisations
        self.prefix = prefix
Samuel GAIST's avatar
Samuel GAIST committed
160
        self.cache = cache or os.path.join(self.prefix, "cache")
Philip ABBET's avatar
Philip ABBET committed
161
        self.algorithm = None
162
        self.loop_algorithm = None
Philip ABBET's avatar
Philip ABBET committed
163
164
        self.databases = {}
        self.input_list = None
165
        self.data_loaders = None
Philip ABBET's avatar
Philip ABBET committed
166
167
168
169
        self.output_list = None
        self.data_sinks = []
        self.errors = []
        self.data = data
170
        self.debug = False
171

Philip ABBET's avatar
Philip ABBET committed
172
173
174
        # Check that the cache path exists
        if not os.path.exists(self.cache):
            raise IOError("Cache path `%s' does not exist" % self.cache)
175

Philip ABBET's avatar
Philip ABBET committed
176
177
178
179
180
181
        # Check the custom root folders
        if custom_root_folders is not None:
            if not isinstance(custom_root_folders, collections.Mapping):
                raise TypeError("The custom root folders must be in dictionary format")
        else:
            custom_root_folders = {}
182

Philip ABBET's avatar
Philip ABBET committed
183
184
185
186
187
        # Temporary caches, if the user has not set them, for performance
        database_cache = database_cache if database_cache is not None else {}
        dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
        algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
        library_cache = library_cache if library_cache is not None else {}
188

Philip ABBET's avatar
Philip ABBET committed
189
190
191
        # Basic validation of the data declaration, including JSON loading if required
        if not isinstance(data, dict):
            if not os.path.exists(data):
Samuel GAIST's avatar
Samuel GAIST committed
192
                self.errors.append("File not found: %s" % data)
Philip ABBET's avatar
Philip ABBET committed
193
                return
194

Samuel GAIST's avatar
Samuel GAIST committed
195
        self.data, self.errors = schema.validate("execution", data)
Philip ABBET's avatar
Philip ABBET committed
196
197
        if self.errors:
            return
198

Philip ABBET's avatar
Philip ABBET committed
199
        # Load the algorithm (using the algorithm cache if possible)
Samuel GAIST's avatar
Samuel GAIST committed
200
201
        if self.data["algorithm"] in algorithm_cache:
            self.algorithm = algorithm_cache[self.data["algorithm"]]
Philip ABBET's avatar
Philip ABBET committed
202
        else:
Samuel GAIST's avatar
Samuel GAIST committed
203
204
205
            self.algorithm = algorithm.Algorithm(
                self.prefix, self.data["algorithm"], dataformat_cache, library_cache
            )
Philip ABBET's avatar
Philip ABBET committed
206
            algorithm_cache[self.algorithm.name] = self.algorithm
207

Philip ABBET's avatar
Philip ABBET committed
208
209
210
        if not self.algorithm.valid:
            self.errors += self.algorithm.errors
            return
211

Samuel GAIST's avatar
Samuel GAIST committed
212
213
214
215
        if "loop" in self.data:
            loop = self.data["loop"]
            if loop["algorithm"] in algorithm_cache:
                self.loop_algorithm = algorithm_cache[loop["algorithm"]]
216
            else:
Samuel GAIST's avatar
Samuel GAIST committed
217
218
219
                self.loop_algorithm = algorithm.Algorithm(
                    self.prefix, loop["algorithm"], dataformat_cache, library_cache
                )
220
221
                algorithm_cache[self.loop_algorithm.name] = self.loop_algorithm

Samuel GAIST's avatar
Samuel GAIST committed
222
223
224
225
                if len(loop["inputs"]) != len(self.loop_algorithm.input_map):
                    self.errors.append(
                        "The number of inputs of the loop algorithm doesn't correspond"
                    )
226

Samuel GAIST's avatar
Samuel GAIST committed
227
                for name in self.data["inputs"].keys():
228
                    if name not in self.algorithm.input_map.keys():
Samuel GAIST's avatar
Samuel GAIST committed
229
230
231
                        self.errors.append(
                            "The input '%s' doesn't exist in the loop algorithm" % name
                        )
232

Philip ABBET's avatar
Philip ABBET committed
233
        # Check that the mapping in coherent
Samuel GAIST's avatar
Samuel GAIST committed
234
235
236
237
238
239
240
241
242
243
244
245
246
        if len(self.data["inputs"]) != len(self.algorithm.input_map):
            self.errors.append(
                "The number of inputs of the algorithm doesn't correspond"
            )

        if "outputs" in self.data and (
            len(self.data["outputs"]) != len(self.algorithm.output_map)
        ):
            self.errors.append(
                "The number of outputs of the algorithm doesn't correspond"
            )

        for name in self.data["inputs"].keys():
Philip ABBET's avatar
Philip ABBET committed
247
            if name not in self.algorithm.input_map.keys():
Samuel GAIST's avatar
Samuel GAIST committed
248
249
250
                self.errors.append(
                    "The input '%s' doesn't exist in the algorithm" % name
                )
251

Samuel GAIST's avatar
Samuel GAIST committed
252
253
        if "outputs" in self.data:
            for name in self.data["outputs"].keys():
Philip ABBET's avatar
Philip ABBET committed
254
                if name not in self.algorithm.output_map.keys():
Samuel GAIST's avatar
Samuel GAIST committed
255
256
257
                    self.errors.append(
                        "The output '%s' doesn't exist in the algorithm" % name
                    )
258

Samuel GAIST's avatar
Samuel GAIST committed
259
260
        if "loop" in self.data:
            for name in ["request", "answer"]:
261
                if name not in self.algorithm.loop_map.keys():
Samuel GAIST's avatar
Samuel GAIST committed
262
263
264
                    self.errors.append(
                        "The loop '%s' doesn't exist in the algorithm" % name
                    )
265

Philip ABBET's avatar
Philip ABBET committed
266
267
        if self.errors:
            return
268

Philip ABBET's avatar
Philip ABBET committed
269
        # Load the databases (if any is required)
Samuel GAIST's avatar
Samuel GAIST committed
270
271
272
273
274
275
276
277
278
279
280
        self._update_db_cache(
            self.data["inputs"], custom_root_folders, database_cache, dataformat_cache
        )

        if "loop" in self.data:
            self._update_db_cache(
                self.data["loop"]["inputs"],
                custom_root_folders,
                database_cache,
                dataformat_cache,
            )
281

Philip ABBET's avatar
Philip ABBET committed
282
283
    def __enter__(self):
        """Prepares inputs and outputs for the processing task
284

Philip ABBET's avatar
Philip ABBET committed
285
        Raises:
286

Philip ABBET's avatar
Philip ABBET committed
287
          IOError: in case something cannot be properly setup
288

Philip ABBET's avatar
Philip ABBET committed
289
        """
290

291
292
        logger.info("Start the execution of '%s'", self.algorithm.name)

Philip ABBET's avatar
Philip ABBET committed
293
        return self
294

Philip ABBET's avatar
Philip ABBET committed
295
296
297
    def __exit__(self, exc_type, exc_value, traceback):
        """Closes all sinks and disconnects inputs and outputs
        """
298

Philip ABBET's avatar
Philip ABBET committed
299
300
301
302
303
        for sink in self.data_sinks:
            # we save the output only if no valid error has been thrown
            # n.b.: a system exit will raise SystemExit which is not an Exception
            if not isinstance(exc_type, Exception):
                sink.close()
304

Philip ABBET's avatar
Philip ABBET committed
305
        self.input_list = None
306
        self.data_loaders = []
Philip ABBET's avatar
Philip ABBET committed
307
308
        self.output_list = None
        self.data_sinks = []
309

Samuel GAIST's avatar
Samuel GAIST committed
310
311
312
    def _update_db_cache(
        self, inputs, custom_root_folders, database_cache, dataformat_cache
    ):
313
314
315
        """ Update the database cache based on the input list given"""

        for name, details in inputs.items():
Samuel GAIST's avatar
Samuel GAIST committed
316
            if "database" in details:
317

Samuel GAIST's avatar
Samuel GAIST committed
318
                if details["database"] not in self.databases:
319

Samuel GAIST's avatar
Samuel GAIST committed
320
321
                    if details["database"] in database_cache:
                        db = database_cache[details["database"]]
322
                    else:
Samuel GAIST's avatar
Samuel GAIST committed
323
324
325
                        db = database.Database(
                            self.prefix, details["database"], dataformat_cache
                        )
326
327
328

                        name = "database/%s" % db.name
                        if name in custom_root_folders:
Samuel GAIST's avatar
Samuel GAIST committed
329
                            db.data["root_folder"] = custom_root_folders[name]
330
331
332
333
334
335
336
337

                        database_cache[db.name] = db

                    self.databases[db.name] = db

                    if not db.valid:
                        self.errors += db.errors

Philip ABBET's avatar
Philip ABBET committed
338
339
    def _prepare_inputs(self):
        """Prepares all input required by the execution."""
340

Philip ABBET's avatar
Philip ABBET committed
341
        raise NotImplementedError()
342

Philip ABBET's avatar
Philip ABBET committed
343
344
    def _prepare_outputs(self):
        """Prepares all output required by the execution."""
345

Philip ABBET's avatar
Philip ABBET committed
346
        raise NotImplementedError()
347

Samuel GAIST's avatar
Samuel GAIST committed
348
349
350
    def process(
        self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
    ):
Philip ABBET's avatar
Philip ABBET committed
351
        """Executes the user algorithm code
352

Philip ABBET's avatar
Philip ABBET committed
353
        Parameters:
354

André Anjos's avatar
André Anjos committed
355
356
357
          virtual_memory_in_megabytes (:py:class:`int`, Optional): The amount
            of virtual memory (in Megabytes) available for the job. If set to
            zero, no limit will be applied.
358

André Anjos's avatar
André Anjos committed
359
360
361
362
363
          max_cpu_percent (:py:class:`int`, Optional): The maximum amount of
            CPU usage allowed in a system. This number must be an integer
            number between 0 and ``100*number_of_cores`` in your system. For
            instance, if your system has 2 cores, this number can go between 0
            and 200. If it is <= 0, then we don't track CPU usage.
364

Philip ABBET's avatar
Philip ABBET committed
365
366
          timeout_in_minutes (int): The number of minutes to wait for the user
            process to execute. After this amount of time, the user process is
André Anjos's avatar
André Anjos committed
367
368
            killed with ``signal.SIGKILL``. If set to zero, no timeout will be
            applied.
369

Philip ABBET's avatar
Philip ABBET committed
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
        Returns:

          dict: A dictionary which is JSON formattable containing the summary of
            this block execution.

        """

        raise NotImplementedError()

    @property
    def valid(self):
        """A boolean that indicates if this executor is valid or not"""

        return not bool(self.errors)

    @property
    def analysis(self):
        """A boolean that indicates if the current block is an analysis block"""
Samuel GAIST's avatar
Samuel GAIST committed
388
        return "result" in self.data
Philip ABBET's avatar
Philip ABBET committed
389
390
391
392
393
394

    @property
    def outputs_exist(self):
        """Returns ``True`` if outputs this block is supposed to produce exists."""

        if self.analysis:
Samuel GAIST's avatar
Samuel GAIST committed
395
396
397
            path = os.path.join(self.cache, self.data["result"]["path"]) + "*"
            if not glob.glob(path):
                return False
Philip ABBET's avatar
Philip ABBET committed
398
399

        else:
Samuel GAIST's avatar
Samuel GAIST committed
400
401
402
403
            for name, details in self.data["outputs"].items():
                path = os.path.join(self.cache, details["path"]) + "*"
                if not glob.glob(path):
                    return False
Philip ABBET's avatar
Philip ABBET committed
404
405
406
407
408
409
410
411
412
413
414
415

        # if you get to this point all outputs already exist
        return True

    @property
    def io_statistics(self):
        """Summarize current I/O statistics looking at data sources and sinks, inputs and outputs

        Returns:

          dict: A dictionary summarizing current I/O statistics
        """
416

Philip ABBET's avatar
Philip ABBET committed
417
        return stats.io_statistics(self.data, self.input_list, self.output_list)
418

Philip ABBET's avatar
Philip ABBET committed
419
    def __str__(self):
420
        return json.dumps(self.data, indent=4)
421

Philip ABBET's avatar
Philip ABBET committed
422
423
    def write(self, path):
        """Writes contents to precise filesystem location"""
424

Samuel GAIST's avatar
Samuel GAIST committed
425
        with open(path, "wt") as f:
Philip ABBET's avatar
Philip ABBET committed
426
            f.write(str(self))
427

Philip ABBET's avatar
Philip ABBET committed
428
429
    def dump_runner_configuration(self, directory):
        """Exports contents useful for a backend runner to run the algorithm"""
430

431
        data = convert_experiment_configuration_to_container(self.data)
432

Samuel GAIST's avatar
Samuel GAIST committed
433
        with open(os.path.join(directory, "configuration.json"), "wb") as f:
434
            json_data = json.dumps(data, indent=2)
Samuel GAIST's avatar
Samuel GAIST committed
435
            f.write(json_data.encode("utf-8"))
436

Samuel GAIST's avatar
Samuel GAIST committed
437
        tmp_prefix = os.path.join(directory, "prefix")
438
439
        if not os.path.exists(tmp_prefix):
            os.makedirs(tmp_prefix)
440

Philip ABBET's avatar
Philip ABBET committed
441
        self.algorithm.export(tmp_prefix)
442

443
444
445
        if self.loop_algorithm:
            self.loop_algorithm.export(tmp_prefix)

Philip ABBET's avatar
Philip ABBET committed
446
447
    def dump_databases_provider_configuration(self, directory):
        """Exports contents useful for a backend runner to run the algorithm"""
448

Samuel GAIST's avatar
Samuel GAIST committed
449
        with open(os.path.join(directory, "configuration.json"), "wb") as f:
450
            json_data = json.dumps(self.data, indent=2)
Samuel GAIST's avatar
Samuel GAIST committed
451
            f.write(json_data.encode("utf-8"))
452

Samuel GAIST's avatar
Samuel GAIST committed
453
454
455
        tmp_prefix = os.path.join(directory, "prefix")
        if not os.path.exists(tmp_prefix):
            os.makedirs(tmp_prefix)
456

Philip ABBET's avatar
Philip ABBET committed
457
458
        for db in self.databases.values():
            db.export(tmp_prefix)