base.py 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


'''Execution utilities'''

import os
import sys
import glob
import collections

import logging
logger = logging.getLogger(__name__)

import simplejson

from .. import schema
from .. import database
from .. import algorithm
from .. import stats

from beat.backend.python.helpers import convert_experiment_configuration_to_container


class BaseExecutor(object):
  """Executors runs the code given an execution block information


  Parameters:

    prefix (str): Establishes the prefix of your installation.

    data (dict, str): The piece of data representing the block to be executed.
      It must validate against the schema defined for execution blocks. If a
      string is passed, it is supposed to be a fully qualified absolute path to
      a JSON file containing the block execution information.

    cache (str, optional): If your cache is not located under
      ``<prefix>/cache``, then specify a full path here. It will be used
      instead.

    dataformat_cache (dict, optional): A dictionary mapping dataformat names to
      loaded dataformats. This parameter is optional and, if passed, may
      greatly speed-up database loading times as dataformats that are already
      loaded may be re-used. If you use this parameter, you must guarantee that
      the cache is refreshed as appropriate in case the underlying dataformats
      change.

    database_cache (dict, optional): A dictionary mapping database names to
      loaded databases. This parameter is optional and, if passed, may
      greatly speed-up database loading times as databases that are already
      loaded may be re-used. If you use this parameter, you must guarantee that
      the cache is refreshed as appropriate in case the underlying databases
      change.

    algorithm_cache (dict, optional): A dictionary mapping algorithm names to
      loaded algorithms. This parameter is optional and, if passed, may
      greatly speed-up database loading times as algorithms that are already
      loaded may be re-used. If you use this parameter, you must guarantee that
      the cache is refreshed as appropriate in case the underlying algorithms
      change.

    library_cache (dict, optional): A dictionary mapping library names to
      loaded libraries. This parameter is optional and, if passed, may greatly
      speed-up library loading times as libraries that are already loaded may
      be re-used. If you use this parameter, you must guarantee that the cache
      is refreshed as appropriate in case the underlying libraries change.


  Attributes:

    cache (str): The path to the cache currently being used

    errors (list): A list containing errors found while loading this execution
      block.

    data (dict): The original data for this executor, as loaded by our JSON
      decoder.

    algorithm (beat.core.algorithm.Algorithm): An object representing the
      algorithm to be run.

    databases (dict): A dictionary in which keys are strings with database
      names and values are :py:class:`database.Database`, representing the
      databases required for running this block. The dictionary may be empty
      in case all inputs are taken from the file cache.

    views (dict): A dictionary in which the keys are tuples pointing to the
      ``(<database-name>, <protocol>, <set>)`` and the value is a setup view
      for that particular combination of details. The dictionary may be empty
      in case all inputs are taken from the file cache.

    input_list (beat.core.inputs.InputList): A list of inputs that will be
      served to the algorithm.

    output_list (beat.core.outputs.OutputList): A list of outputs that the
      algorithm will produce.

    data_sources (list): A list with all data-sources created by our execution
      loader.

    data_sinks (list): A list with all data-sinks created by our execution
      loader. These are useful for clean-up actions in case of problems.

  """

  def __init__(self, prefix, data, cache=None, dataformat_cache=None,
          database_cache=None, algorithm_cache=None, library_cache=None,
          custom_root_folders=None):

    # Initialisations
    self.prefix = prefix
    self.cache = cache or os.path.join(self.prefix, 'cache')
    self.algorithm = None
    self.databases = {}
    self.input_list = None
    self.output_list = None
    self.data_sinks = []
    self.data_sources = []
    self.errors = []
    self.data = data

    # Check that the cache path exists
    if not os.path.exists(self.cache):
      raise IOError("Cache path `%s' does not exist" % self.cache)

    # Check the custom root folders
    if custom_root_folders is not None:
      if not isinstance(custom_root_folders, collections.Mapping):
        raise TypeError("The custom root folders must be in dictionary format")
    else:
      custom_root_folders = {}

    # Temporary caches, if the user has not set them, for performance
    database_cache = database_cache if database_cache is not None else {}
    dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
    algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
    library_cache = library_cache if library_cache is not None else {}

    # Basic validation of the data declaration, including JSON loading if required
    if not isinstance(data, dict):
      if not os.path.exists(data):
        self.errors.append('File not found: %s' % data)
        return

    self.data, self.errors = schema.validate('execution', data)
    if self.errors:
      return

    # Load the algorithm (using the algorithm cache if possible)
    if self.data['algorithm'] in algorithm_cache:
      self.algorithm = algorithm_cache[self.data['algorithm']]
    else:
      self.algorithm = algorithm.Algorithm(self.prefix, self.data['algorithm'],
                                           dataformat_cache, library_cache)
      algorithm_cache[self.algorithm.name] = self.algorithm

    if not self.algorithm.valid:
      self.errors += self.algorithm.errors
      return

    # Load the databases (if any is required)
    for name, details in self.data['inputs'].items():
      if 'database' in details:

        if details['database'] not in self.databases:

          if details['database'] in database_cache:
            db = database_cache[details['database']]
          else:
            db = database.Database(self.prefix, details['database'],
                                   dataformat_cache)

            name = "database/%s" % db.name
            if name in custom_root_folders:
              db.data['root_folder'] = custom_root_folders[name]

            database_cache[db.name] = db

          self.databases[db.name] = db

          if not db.valid:
            self.errors += db.errors


  def __enter__(self):
    """Prepares inputs and outputs for the processing task

    Raises:

      IOError: in case something cannot be properly setup

    """

    self._prepare_inputs()
    self._prepare_outputs()

    return self


  def __exit__(self, exc_type, exc_value, traceback):
    """Closes all sinks and disconnects inputs and outputs
    """

    for sink in self.data_sinks:
      # we save the output only if no valid error has been thrown
      # n.b.: a system exit will raise SystemExit which is not an Exception
      if not isinstance(exc_type, Exception):
        sink.close()
      sink.reset()

    self.input_list = None
    self.output_list = None
    self.data_sinks = []
    self.data_sources = []


  def _prepare_inputs(self):
    """Prepares all input required by the execution."""

    raise NotImplementedError()


  def _prepare_outputs(self):
    """Prepares all output required by the execution."""

    raise NotImplementedError()


  def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
              timeout_in_minutes=0):
    """Executes the user algorithm code

    Parameters:

      virtual_memory_in_megabytes (int, Optional): The amount of virtual memory
        (in Megabytes) available for the job. If set to zero, no limit will be
        applied.

      max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed
        in a system. This number must be an integer number between 0 and
        ``100*number_of_cores`` in your system. For instance, if your system
        has 2 cores, this number can go between 0 and 200. If it is <= 0, then
        we don't track CPU usage.

      timeout_in_minutes (int): The number of minutes to wait for the user
        process to execute. After this amount of time, the user process is
        killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
        be applied.

    Returns:

      dict: A dictionary which is JSON formattable containing the summary of
        this block execution.

    """

    raise NotImplementedError()


  @property
  def valid(self):
    """A boolean that indicates if this executor is valid or not"""

    return not bool(self.errors)


  @property
  def analysis(self):
    """A boolean that indicates if the current block is an analysis block"""
    return 'result' in self.data


  @property
  def outputs_exist(self):
    """Returns ``True`` if outputs this block is supposed to produce exists."""

    if self.analysis:
      path = os.path.join(self.cache, self.data['result']['path']) + '*'
      if not glob.glob(path): return False

    else:
      for name, details in self.data['outputs'].items():
        path = os.path.join(self.cache, details['path']) + '*'
        if not glob.glob(path): return False

    # if you get to this point all outputs already exist
    return True


  @property
  def io_statistics(self):
    """Summarize current I/O statistics looking at data sources and sinks, inputs and outputs

    Returns:

      dict: A dictionary summarizing current I/O statistics
    """

    return stats.io_statistics(self.data, self.input_list, self.output_list)


  def __str__(self):
    return simplejson.dumps(self.data, indent=4)


  def write(self, path):
    """Writes contents to precise filesystem location"""

    with open(path, 'wt') as f:
      f.write(str(self))


  def dump_runner_configuration(self, directory):
    """Exports contents useful for a backend runner to run the algorithm"""

    data = convert_experiment_configuration_to_container(self.data, self.proxy_mode)

    with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
      simplejson.dump(data, f, indent=2)

    tmp_prefix = os.path.join(directory, 'prefix')
    if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix)

    self.algorithm.export(tmp_prefix)


  def dump_databases_provider_configuration(self, directory):
    """Exports contents useful for a backend runner to run the algorithm"""

    with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
      simplejson.dump(self.data, f, indent=2)

    tmp_prefix = os.path.join(directory, 'prefix')
    if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix)

    for db in self.databases.values():
      db.export(tmp_prefix)