algorithm.py 17.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Validation for algorithms"""

import os
import sys

import six
import numpy
import simplejson

from . import dataformat
from . import library
from . import schema
from . import prototypes
from . import utils

44
45
46
from beat.backend.python.algorithm import Storage
from beat.backend.python.algorithm import Runner
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
André Anjos's avatar
André Anjos committed
47
48
49



50
class Algorithm(BackendAlgorithm):
André Anjos's avatar
André Anjos committed
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  """Algorithms represent runnable components within the platform.

  This class can only parse the meta-parameters of the algorithm (i.e., input
  and output declaration, grouping, synchronization details, parameters and
  splittability). The actual algorithm is not directly treated by this class -
  it can, however, provide you with a loader for actually running the
  algorithmic code (see :py:meth:`Algorithm.runner`).


  Parameters:

    prefix (str): Establishes the prefix of your installation.

    data (object, optional): The piece of data representing the algorithm. It
      must validate against the schema defined for algorithms. If a string is
      passed, it is supposed to be a valid path to an algorithm in the
      designated prefix area. If a tuple is passed (or a list), then we
      consider that the first element represents the algorithm declaration,
      while the second, the code for the algorithm (either in its source format
      or as a binary blob). If ``None`` is passed, loads our default prototype
      for algorithms (source code will be in Python).

    dataformat_cache (dict, optional): A dictionary mapping dataformat names to
      loaded dataformats. This parameter is optional and, if passed, may
      greatly speed-up algorithm loading times as dataformats that are already
      loaded may be re-used.

    library_cache (dict, optional): A dictionary mapping library names to
      loaded libraries. This parameter is optional and, if passed, may greatly
      speed-up library loading times as libraries that are already loaded may
      be re-used.


  Attributes:

    name (str): The algorithm name

    description (str): The short description string, loaded from the JSON
      file if one was set.

    documentation (str): The full-length docstring for this object.

    storage (object): A simple object that provides information about file
      paths for this algorithm

    dataformats (dict): A dictionary containing all pre-loaded dataformats used
      by this algorithm. Data format objects will be of type
      :py:class:`beat.core.dataformat.DataFormat`.

    libraries (dict): A mapping object defining other libraries this algorithm
      needs to load so it can work properly.

    uses (dict): A mapping object defining the required library import name
      (keys) and the full-names (values).

    parameters (dict): A dictionary containing all pre-defined parameters that
      this algorithm accepts.

    splittable (bool): A boolean value that indicates if this algorithm is
      automatically parallelizeable by our backend.

    input_map (dict): A dictionary where the key is the input name and the
      value, its type. All input names (potentially from different groups) are
      comprised in this dictionary.

    output_map (dict): A dictionary where the key is the output name and the
      value, its type. All output names (potentially from different groups) are
      comprised in this dictionary.

    results (dict): If this algorithm is actually an analyzer (i.e., there are
      no formal outputs, but results that must be saved by the platform), then
      this dictionary contains the names and data types of those elements.

    groups (dict): A list containing dictionaries with inputs and outputs
      belonging to the same synchronization group.

    errors (list): A list containing errors found while loading this
      algorithm.

    data (dict): The original data for this algorithm, as loaded by our JSON
      decoder.

    code (str): The code that is associated with this algorithm, loaded as a
      text (or binary) file.

  """

  def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
139
    super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
André Anjos's avatar
André Anjos committed
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188


  def _load(self, data, dataformat_cache, library_cache):
    """Loads the algorithm"""

    self.errors = []
    self.data = None
    self.code = None

    self._name = None
    self.storage = None
    self.dataformats = {} # preloaded dataformats
    self.libraries = {} # preloaded libraries
    code = None

    if data is None: #loads prototype and validates it

      data = None
      code = None

    elif isinstance(data, (tuple, list)): #user has passed individual info

      data, code = data #break down into two components


    if isinstance(data, six.string_types): #user has passed a file pointer

      self._name = data
      self.storage = Storage(self.prefix, self._name)
      if not self.storage.json.exists():
        self.errors.append('Algorithm declaration file not found: %s' % data)
        return

      data = self.storage.json.path #loads data from JSON declaration


    # At this point, `data' can be a dictionary or ``None``
    if data is None: # loads the default declaration for an algorithm
      self.data, self.errors = prototypes.load('algorithm')
      assert not self.errors, "\n  * %s" % "\n  *".join(self.errors)
    else: # just assign it
      # this runs basic validation, including JSON loading if required
      self.data, self.errors = schema.validate('algorithm', data)


    if self.errors: return #don't proceed with the rest of validation

    if self.storage is not None: #loading from the disk, check code
      if not self.storage.code.exists():
189
190
191
192
        if self.data['language'] != 'cxx':
          self.errors.append('Algorithm code not found: %s' % \
                  self.storage.code.path)
          return
André Anjos's avatar
André Anjos committed
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
      else:
        code = self.storage.code.load()


    # At this point, `code' can be a string (or a binary blob) or ``None``
    if code is None: # loads the default code for an algorithm
      self.code = prototypes.binary_load('algorithm.py')
      self.data['language'] = 'python'

    else: # just assign it - notice that in this case, no language is set
      self.code = code


    if self.errors: return #don't proceed with the rest of validation


    # if no errors so far, make sense out of the declaration data
    self.groups = self.data['groups']

    # now we check for consistence
    self._check_endpoint_uniqueness()

    # create maps for easy access to data
    self.input_map = dict([(k,v['type']) for g in self.groups \
            for k,v in g['inputs'].items()])
    self.output_map = dict([(k,v['type']) for g in self.groups \
            for k,v in g.get('outputs', {}).items()])

    self._validate_required_dataformats(dataformat_cache)
    self._convert_parameter_types()

    # finally, the libraries
    self._validate_required_libraries(library_cache)
    self._check_language_consistence()


  def _check_endpoint_uniqueness(self):
    """Checks for name clashes accross input/output groups
    """

    all_input_names = []
    for group in self.groups: all_input_names.extend(group['inputs'].keys())
    if len(set(all_input_names)) != len(all_input_names):
      self.errors.append("repeated input name in algorithm `%s' " \
              "declaration: %s" % (self.name, ', '.join(all_input_names)))

    # all outputs must have unique names
    all_output_names = []
    for group in self.groups:
      if 'outputs' not in group: continue
      all_output_names.extend(group['outputs'].keys())
    if len(set(all_output_names)) != len(all_output_names):
      self.errors.append("repeated output name in algorithm `%s' " \
              "declaration: %s" % (self.name, ', '.join(all_output_names)))


  def _validate_required_dataformats(self, dataformat_cache):
    """Makes sure we can load all requested formats
    """

    for group in self.groups:

      for name, input in group['inputs'].items():
        if input['type'] in self.dataformats: continue

        if dataformat_cache and input['type'] in dataformat_cache: #reuse
          thisformat = dataformat_cache[input['type']]
        else: #load it
          thisformat = dataformat.DataFormat(self.prefix, input['type'])
          if dataformat_cache is not None: #update it
            dataformat_cache[input['type']] = thisformat

        self.dataformats[input['type']] = thisformat

        if thisformat.errors:
          self.errors.append("found error validating data format `%s' " \
                  "for input `%s' on algorithm `%s': %s" % \
                  (input['type'], name, self.name,
                      '\n'.join(thisformat.errors)))

      if 'outputs' not in group: continue

      for name, output in group['outputs'].items():
        if output['type'] in self.dataformats: continue

        if dataformat_cache and output['type'] in dataformat_cache: #reuse
          thisformat = dataformat_cache[output['type']]
        else: #load it
          thisformat = dataformat.DataFormat(self.prefix, output['type'])
          if dataformat_cache is not None: #update it
            dataformat_cache[output['type']] = thisformat

        self.dataformats[output['type']] = thisformat

        if thisformat.errors:
          self.errors.append("found error validating data format `%s' " \
                  "for output `%s' on algorithm `%s': %s" % \
                  (output['type'], name, self.name,
                      '\n'.join(thisformat.errors)))

    if self.results:

      for name, result in self.results.items():

        if result['type'].find('/') != -1:

          if result['type'] in self.dataformats: continue

          if dataformat_cache and result['type'] in dataformat_cache: #reuse
            thisformat = dataformat_cache[result['type']]
          else:
            thisformat = dataformat.DataFormat(self.prefix, result['type'])
            if dataformat_cache is not None: #update it
              dataformat_cache[result['type']] = thisformat

          self.dataformats[result['type']] = thisformat

          if thisformat.errors:
            self.errors.append("found error validating data format `%s' " \
                    "for result `%s' on algorithm `%s': %s" % \
                    (result['type'], name, self.name,
                        '\n'.join(thisformat.errors)))


  def _convert_parameter_types(self):
    """Converts types to numpy equivalents, checks defaults, ranges and choices
    """

    def _try_convert(name, tp, value, desc):
      try:
        return tp.type(value)
      except Exception as e:
        self.errors.append("%s for parameter `%s' cannot be cast to type " \
                "`%s': %s" % (desc, name, tp.name, e))

    if self.parameters is None: return

    for name, parameter in self.parameters.items():
      if parameter['type'] == 'string':
        parameter['type'] = numpy.dtype('str')
      else:
        parameter['type'] = numpy.dtype(parameter['type'])

      if 'range' in parameter:
        parameter['range'][0] = _try_convert(name, parameter['type'],
            parameter['range'][0], 'start of range')
        parameter['range'][1] = _try_convert(name, parameter['type'],
            parameter['range'][1], 'end of range')
        if parameter['range'][0] >= parameter['range'][1]:
          self.errors.append("range for parameter `%s' has a start greater " \
                  "then the end value (%r >= %r)" % \
                  (name, parameter['range'][0], parameter['range'][1]))

      if 'choice' in parameter:
        for i, choice in enumerate(parameter['choice']):
          parameter['choice'][i] = _try_convert(name, parameter['type'],
              parameter['choice'][i], 'choice[%d]' % i)

      if 'default' in parameter:
        parameter['default'] = _try_convert(name, parameter['type'],
            parameter['default'], 'default')

        if 'range' in parameter: #check range
          if parameter['default'] < parameter['range'][0] or \
                  parameter['default'] > parameter['range'][1]:
            self.errors.append("default for parameter `%s' (%r) is not " \
              "within parameter range [%r, %r]" % (name, parameter['default'],
                  parameter['range'][0], parameter['range'][1]))

        if 'choice' in parameter: #check choices
          if parameter['default'] not in parameter['choice']:
            self.errors.append("default for parameter `%s' (%r) is not " \
              "a valid choice `[%s]'" % (name, parameter['default'],
                  ', '.join(['%r' % k for k in parameter['choice']])))


  def _validate_required_libraries(self, library_cache):

    # all used libraries must be loadable; cannot use self as a library

    if self.uses:

      for name, value in self.uses.items():

        self.libraries[value] = library_cache.setdefault(value,
                library.Library(self.prefix, value, library_cache))

        if not self.libraries[value].valid:
          self.errors.append("referred library `%s' (%s) is not valid" % \
                  (self.libraries[value].name, name))


  def _check_language_consistence(self):

    # all used libraries must be programmed with the same language
    if self.language == 'unknown': return #bail out on unknown language

    if self.uses:

      for name, library in self.uses.items():

        if library not in self.libraries: continue #invalid

        if self.libraries[library].data is None:
          self.errors.append("language for used library `%s' cannot be " \
                  "inferred as the library was not properly loaded" % \
                  (library,))
          continue

        if self.libraries[library].language != self.language:
          self.errors.append("language for used library `%s' (`%s') " \
                  "differs from current language for this algorithm (`%s')" % \
                  (library, self.libraries[library].language, self.language))


  def json_dumps(self, indent=4):
    """Dumps the JSON declaration of this object in a string


    Parameters:

      indent (int): The number of indentation spaces at every indentation level


    Returns:

      str: The JSON representation for this object

    """

    return simplejson.dumps(self.data, indent=indent,
        cls=utils.NumpyJSONEncoder)


  def __str__(self):

    return self.json_dumps()


  def write(self, storage=None):
    """Writes contents to prefix location

    Parameters:

      storage (Storage, optional): If you pass a new storage, then this object
        will be written to that storage point rather than its default.

    """

    if self.data['language'] == 'unknown':
      raise RuntimeError("algorithm has no programming language set")

    if storage is None:
      if not self._name:
        raise RuntimeError("algorithm has no name")
      storage = self.storage #overwrite

    storage.save(str(self), self.code, self.description)


  def export(self, prefix):
    """Recursively exports itself into another prefix

    Dataformats and associated libraries are also copied.


    Parameters:

      prefix (str): A path to a prefix that must different then my own.


    Returns:

      None


    Raises:

      RuntimeError: If prefix and self.prefix point to the same directory.

    """

    if not self._name:
      raise RuntimeError("dataformat has no name")

    if not self.valid:
      raise RuntimeError("dataformat is not valid")

    if os.path.samefile(prefix, self.prefix):
      raise RuntimeError("Cannot export algorithm to the same prefix (%s == " \
              "%s)" % (prefix, self.prefix))

    for k in self.libraries.values(): k.export(prefix)
    for k in self.dataformats.values(): k.export(prefix)
    self.write(Storage(prefix, self.name, self.language))