experiments.py 18.9 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
André Anjos's avatar
André Anjos committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.cmdline module of the BEAT platform.          #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Usage:
30
  %(prog)s experiments run [--force] <name> [(--docker|--local)]
André Anjos's avatar
André Anjos committed
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  %(prog)s experiments caches [--list | --delete | --checksum] <name>
  %(prog)s experiments list [--remote]
  %(prog)s experiments check [<name>]...
  %(prog)s experiments pull [--force] [<name>]...
  %(prog)s experiments push [--force] [--dry-run] [<name>]...
  %(prog)s experiments diff <name>
  %(prog)s experiments status
  %(prog)s experiments fork <src> <dst>
  %(prog)s experiments rm [--remote] <name>...
  %(prog)s experiments draw [--path=<dir>] [<name>]...
  %(prog)s experiments --help


Commands:
  run       Runs an experiment locally
  caches    Lists all cache files used by this experiment
  list      Lists all the experiments available on the platform
  check     Checks a local experiment for validity
  pull      Downloads the specified experiments from the server
  push      Uploads experiments to the server
  diff      Shows changes between the local experiment and the remote version
  status    Shows (editing) status for all available experiments
  fork      Forks a local experiment
  rm        Deletes a local experiment (unless --remote is specified)
  draw      Creates a visual representation of the experiment


Options:
  --force         Performs operation regardless of conflicts
  --dry-run       Doesn't really perform the task, just comments what would do
  --remote        Only acts on the remote copy of the experiment
  --list          List cache files matching output if they exist
  --delete        Delete cache files matching output if they exist (also,
                  recursively deletes empty directories)
  --checksum      Checksums indexes for cache files
  --help          Display this screen
  --path=<dir>    Use path to write files to disk (instead of the current
                  directory)
69
70
  --local         Uses the local executor to execute the experiment on the local machine (default).
  --docker        Uses the docker executor to execute the experiment using docker containers.
André Anjos's avatar
André Anjos committed
71
72
73
74
75
76
77
78
79
80
81
82

"""

import os
import logging
import glob
import oset
import simplejson

from . import common

from beat.core.experiment import Experiment
83
84
from beat.core.execution import DockerExecutor
from beat.core.execution import LocalExecutor
André Anjos's avatar
André Anjos committed
85
86
from beat.core.utils import NumpyJSONEncoder
from beat.core.data import CachedDataSource, load_data_index
87
from beat.core.dock import Host
88
89
90
91
92
from beat.core.hash import toPath
from beat.core.hash import hashDataset


logger = logging.getLogger(__name__)
André Anjos's avatar
André Anjos committed
93
94


95
def run_experiment(configuration, name, force, use_docker, use_local):
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    '''Run experiments locally'''

    def load_result(executor):
        '''Loads the result of an experiment, in a single go'''

        f = CachedDataSource()
        assert f.setup(os.path.join(executor.cache,
                                    executor.data['result']['path'] + '.data'),
                       executor.prefix)
        data, start, end = f[0]
        return data

    def reindent(s, n):
        '''Re-indents output so it is more visible'''
        margin = n * ' '
        return margin + ('\n' + margin).join(s.split('\n'))

    def simplify_time(s):
        '''Re-writes the time so it is easier to understand it'''

        minute = 60.0
        hour = 60 * minute
        day = 24 * hour

        if s <= minute:
            return "%.2f s" % s
        elif s <= hour:
            minutes = s // minute
            seconds = s - (minute * minutes)
            return "%d m %.2f s" % (minutes, seconds)
        elif s <= day:
            hours = s // hour
            minutes = (s - (hour * hours)) // minute
            seconds = s - (hour * hours + minute * minutes)
            return "%d h %d m %.2f s" % (hours, minutes, seconds)
        else:
            days = s // day
            hours = (s - (day * days)) // hour
            minutes = (s - (day * days + hour * hours)) // minute
            seconds = s - (day * days + hour * hours + minute * minutes)
            return "%d days %d h %d m %.2f s" % (days, hours, minutes, seconds)

    def simplify_size(s):
        '''Re-writes the size so it is easier to understand it'''

        kb = 1024.0
        mb = kb * kb
        gb = kb * mb
        tb = kb * gb

        if s <= kb:
            return "%d bytes" % s
        elif s <= mb:
            return "%.2f kilobytes" % (s / kb)
        elif s <= gb:
            return "%.2f megabytes" % (s / mb)
        elif s <= tb:
            return "%.2f gigabytes" % (s / gb)
        return "%.2f terabytes" % (s / tb)

156
    def index_experiment_databases(cache_path, experiment):
157
158
159
160
161
162
        for block_name, infos in experiment.datasets.items():
            view = infos['database'].view(infos['protocol'], infos['set'])
            filename = toPath(hashDataset(infos['database'].name,
                                          infos['protocol'],
                                          infos['set']),
                              suffix='.db')
163
164
165
166
            database_index_path = os.path.join(cache_path, filename)
            if not os.path.exists(database_index_path):
                logger.info("Index for database %s not found, building it", infos['database'].name)
                view.index(database_index_path)
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

    dataformat_cache = {}
    database_cache = {}
    algorithm_cache = {}
    library_cache = {}

    experiment = Experiment(configuration.path, name,
                            dataformat_cache, database_cache,
                            algorithm_cache, library_cache)

    if not experiment.valid:
        logger.error("Failed to load the experiment `%s':", name)
        for e in experiment.errors:
            logger.error('  * %s', e)
        return 1
André Anjos's avatar
André Anjos committed
182

183
184
185
    if not os.path.exists(configuration.cache):
        os.makedirs(configuration.cache)
        logger.info("Created cache path `%s'", configuration.cache)
186

187
    index_experiment_databases(configuration.cache, experiment)
André Anjos's avatar
André Anjos committed
188

189
    scheduled = experiment.setup()
André Anjos's avatar
André Anjos committed
190

191
    if use_docker:
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
        # load existing environments
        host = Host(raise_on_errors=False)

    # can we execute it?
    for key, value in scheduled.items():

        # checks and sets-up executable
        executable = None  # use the default

        if use_docker:
            env = value['configuration']['environment']
            search_key = '%s (%s)' % (env['name'], env['version'])
            if search_key not in host:
                logger.error("Cannot execute block `%s' on environment `%s': "
                             "environment was not found' - please install it",
                             key, search_key)
                return 1

        if use_docker:
            executor = DockerExecutor(host, configuration.path,
                                      value['configuration'],
                                      configuration.cache, dataformat_cache,
                                      database_cache, algorithm_cache,
                                      library_cache)
        else:
            executor = LocalExecutor(configuration.path,
                                     value['configuration'],
                                     configuration.cache, dataformat_cache,
                                     database_cache, algorithm_cache,
                                     library_cache,
                                     configuration.database_paths)

        if not executor.valid:
            logger.error(
                "Failed to load the execution information for `%s':", key)
            for e in executor.errors:
                logger.error('  * %s', e)
            return 1

        if executor.outputs_exist and not force:
            logger.info("Skipping execution of `%s' for block `%s' "
                        "- outputs exist", executor.algorithm.name, key)
            continue

        logger.info("Running `%s' for block `%s'",
                    executor.algorithm.name, key)
        if executable is not None:
            logger.extra("  -> using executable at `%s'", executable)
        else:
            logger.extra("  -> using fallback (default) environment")

        with executor:
            result = executor.process()

        if result['status'] != 0:
            logger.error("Block did not execute properly - outputs were reset")
            logger.error("  Standard output:\n%s",
                         reindent(result['stdout'], 4))
            logger.error("  Standard error:\n%s",
                         reindent(result['stderr'], 4))
            logger.error("  Captured user error:\n%s",
                         reindent(result['user_error'], 4))
            logger.error("  Captured system error:\n%s",
                         reindent(result['system_error'], 4))
            logger.extra("  Environment: %s" % 'default environment')
            return 1
        elif use_docker:
            stats = result['statistics']
            cpu_stats = stats['cpu']
            data_stats = stats['data']

            logger.extra("  CPU time (user, system, total, percent): "
                         "%s, %s, %s, %d%%",
                         simplify_time(cpu_stats['user']),
                         simplify_time(cpu_stats['system']),
                         simplify_time(cpu_stats['total']),
                         100. * (cpu_stats['user'] + cpu_stats['system']) /
                         cpu_stats['total'])
            logger.extra("  Memory usage: %s",
                         simplify_size(stats['memory']['rss']))
            logger.extra("  Cached input read: %s, %s",
                         simplify_time(data_stats['time']['read']),
                         simplify_size(data_stats['volume']['read']))
            logger.extra("  Cached output write: %s, %s",
                         simplify_time(data_stats['time']['write']),
                         simplify_size(data_stats['volume']['write']))
            logger.extra("  Communication time: %s (%d%%)",
                         simplify_time(data_stats['network']['wait_time']),
                         100. * data_stats['network']['wait_time'] /
                         cpu_stats['total'])
        else:
            logger.extra("  Environment: %s" % 'local environment')

        if executor.analysis:
            data = load_result(executor)
            r = reindent(simplejson.dumps(data.as_dict(), indent=2,
                                          cls=NumpyJSONEncoder), 2)
            logger.info("  Results:\n%s", r)

        logger.extra("  Outputs produced:")
        if executor.analysis:
            logger.extra("    * %s", executor.data['result']['path'])
        else:
            for name, details in executor.data['outputs'].items():
                logger.extra("    * %s", details['path'])

    return 0
André Anjos's avatar
André Anjos committed
299
300
301


def caches(configuration, name, ls, delete, checksum):
302
    '''List all cache files involved in this experiment'''
André Anjos's avatar
André Anjos committed
303

304
305
306
307
    dataformat_cache = {}
    database_cache = {}
    algorithm_cache = {}
    library_cache = {}
André Anjos's avatar
André Anjos committed
308

309
310
311
    experiment = Experiment(configuration.path, name,
                            dataformat_cache, database_cache,
                            algorithm_cache, library_cache)
André Anjos's avatar
André Anjos committed
312

313
314
315
316
317
    if not experiment.valid:
        logger.error("Failed to load the experiment `%s':", name)
        for e in experiment.errors:
            logger.error('  * %s', e)
        return 1
André Anjos's avatar
André Anjos committed
318

319
    scheduled = experiment.setup()
André Anjos's avatar
André Anjos committed
320

321
322
323
324
325
326
327
    s = []
    for key, value in scheduled.items():
        if 'outputs' in value['configuration']:  # normal block
            for name, data in value['configuration']['outputs'].items():
                s.append(data['path'])
        else:  # analyzer
            s.append(value['configuration']['result']['path'])
André Anjos's avatar
André Anjos committed
328

329
330
    # prefix cache path
    s = [os.path.join(configuration.cache, k) for k in s]
André Anjos's avatar
André Anjos committed
331

332
333
    for k in s:
        logger.info("output: `%s'", k)
André Anjos's avatar
André Anjos committed
334

335
336
337
        if ls:
            for g in glob.glob(k + '.*'):
                logger.info(g)
André Anjos's avatar
André Anjos committed
338

339
        if delete:
André Anjos's avatar
André Anjos committed
340

341
342
343
            for g in glob.glob(k + '.*'):
                logger.info("removing `%s'...", g)
                os.unlink(g)
André Anjos's avatar
André Anjos committed
344

345
346
            common.recursive_rmdir_if_empty(
                os.path.dirname(k), configuration.cache)
André Anjos's avatar
André Anjos committed
347

348
349
350
        if checksum:
            assert load_data_index(configuration.cache, k + '.data')
            logger.info("index for `%s' can be loaded and checksums", k)
351

352
    return 0
André Anjos's avatar
André Anjos committed
353
354
355


def pull(webapi, prefix, names, force, indentation, format_cache):
356
    """Copies experiments (and required toolchains/algorithms) from the server.
André Anjos's avatar
André Anjos committed
357

358
    Parameters:
André Anjos's avatar
André Anjos committed
359

360
361
      webapi (object): An instance of our WebAPI class, prepared to access the
        BEAT server of interest
André Anjos's avatar
André Anjos committed
362

363
364
      prefix (str): A string representing the root of the path in which the
        user objects are stored
André Anjos's avatar
André Anjos committed
365

366
367
368
369
370
      names (list): A list of strings, each representing the unique relative
        path of the objects to retrieve or a list of usernames from which to
        retrieve objects. If the list is empty, then we pull all available
        objects of a given type. If no user is set, then pull all public
        objects of a given type.
André Anjos's avatar
André Anjos committed
371

372
373
      force (bool): If set to ``True``, then overwrites local changes with the
        remotely retrieved copies.
André Anjos's avatar
André Anjos committed
374

375
376
377
      indentation (int): The indentation level, useful if this function is
        called recursively while downloading different object types. This is
        normally set to ``0`` (zero).
André Anjos's avatar
André Anjos committed
378
379


380
    Returns:
André Anjos's avatar
André Anjos committed
381

382
383
384
      int: Indicating the exit status of the command, to be reported back to
        the calling process. This value should be zero if everything works OK,
        otherwise, different than zero (POSIX compliance).
André Anjos's avatar
André Anjos committed
385

386
    """
André Anjos's avatar
André Anjos committed
387

388
389
    from .algorithms import pull as algorithms_pull
    from .databases import pull as databases_pull
André Anjos's avatar
André Anjos committed
390

391
392
393
    status, names = common.pull(webapi, prefix, 'experiment', names,
                                ['declaration', 'description'], force,
                                indentation)
André Anjos's avatar
André Anjos committed
394

395
396
397
398
    if status != 0:
        logger.error(
            "could not find any matching experiments - widen your search")
        return status
André Anjos's avatar
André Anjos committed
399

400
401
402
403
404
405
406
407
408
409
410
    # see what dataformats one needs to pull
    databases = oset.oset()
    toolchains = oset.oset()
    algorithms = oset.oset()
    for name in names:
        try:
            obj = Experiment(prefix, name)
            if obj.toolchain:
                toolchains.add(obj.toolchain.name)
            databases |= obj.databases.keys()
            algorithms |= obj.algorithms.keys()
André Anjos's avatar
André Anjos committed
411

412
413
        except Exception as e:
            logger.error("loading `%s': %s...", name, str(e))
André Anjos's avatar
André Anjos committed
414

415
416
417
418
419
420
421
422
423
424
    # downloads any formats to which we depend on
    format_cache = {}
    library_cache = {}
    tc_status, _ = common.pull(webapi, prefix, 'toolchain',
                               toolchains, ['declaration', 'description'],
                               force, indentation + 2)
    db_status = databases_pull(webapi, prefix, databases, force,
                               indentation + 2, format_cache)
    algo_status = algorithms_pull(webapi, prefix, algorithms, force,
                                  indentation + 2, format_cache, library_cache)
André Anjos's avatar
André Anjos committed
425

426
    return status + tc_status + db_status + algo_status
André Anjos's avatar
André Anjos committed
427
428
429
430


def process(args):

431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
    config = args['config']
    names = args['<name>']
    force = args['--force']
    if args['run']:
        return run_experiment(config, names[0], force,
                              args['--docker'], args['--local'])

    if args['caches']:
        return caches(config, names[0], args['--list'],
                      args['--delete'], args['--checksum'])

    elif args['list']:
        if args['--remote']:
            with common.make_webapi(config) as webapi:
                return common.display_remote_list(webapi, 'experiment')
        else:
            return common.display_local_list(config.path, 'experiment')

    elif args['check']:
        return common.check(config.path, 'experiment', names)

    elif args['pull']:
        with common.make_webapi(config) as webapi:
            return pull(webapi, config.path, names, force,
                        0, {})

    elif args['push']:
        with common.make_webapi(config) as webapi:
            return common.push(webapi, config.path, 'experiment',
                               names,
                               ['name', 'declaration',
                                'toolchain', 'description'],
                               {}, force, args['--dry-run'], 0)

    elif args['diff']:
        with common.make_webapi(config) as webapi:
            return common.diff(webapi, config.path, 'experiment',
                               names[0], ['declaration', 'description'])

    elif args['status']:
        with common.make_webapi(config) as webapi:
            return common.status(webapi, config.path, 'experiment')[0]

    elif args['fork']:
        return common.fork(config.path, 'experiment',
                           args['<src>'], args['<dst>'])

    elif args['rm']:
        if args['--remote']:
            with common.make_webapi(config) as webapi:
                return common.delete_remote(webapi, 'experiment', names)
        else:
            return common.delete_local(config.path, 'experiment', names)

    elif args['draw']:
        return common.dot_diagram(config.path, 'experiment',
                                  names, args['--path'], [])

    # Should not happen
    logger.error("unrecognized `experiments' subcommand")
    return 1