#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.cmdline module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### """Usage: %(prog)s algorithms list [--remote] %(prog)s algorithms check []... %(prog)s algorithms pull [--force] []... %(prog)s algorithms push [--force] [--dry-run] []... %(prog)s algorithms diff %(prog)s algorithms status %(prog)s algorithms create ... %(prog)s algorithms version %(prog)s algorithms fork %(prog)s algorithms rm [--remote] ... %(prog)s algorithms execute %(prog)s algorithms execute --examples %(prog)s algorithms --help Commands: list Lists all the algorithms available on the platform check Checks a local algorithm for validity pull Downloads the specified algorithms from the server push Uploads algorithms to the server diff Shows changes between the local algorithm and the remote version status Shows (editing) status for all available algorithms create Creates a new local algorithm version Creates a new version of an existing algorithm fork Forks a local algorithm rm Deletes a local algorithm (unless --remote is specified) execute Execute an algorithm following instructions in a JSON file Options: --force Performs operation regardless of conflicts --dry-run Doesn't really perform the task, just comments what would do --remote Only acts on the remote copy of the algorithm --examples Display some example JSON instruction files --help Display this screen Arguments for 'execute': JSON file containing the instructions Examples: To list all algorithms available locally: $ %(prog)s al list To list all algorithms available at the platform: $ %(prog)s al list --remote """ import logging logger = logging.getLogger(__name__) import os import sys import docopt import simplejson as json from . import common from beat.core import algorithm from beat.core.execution import Executor from beat.core.dock import Host from beat.core import hash from beat.backend.python.database import Storage as DatabaseStorage from beat.backend.python.algorithm import Storage as AlgorithmStorage def pull(webapi, prefix, names, force, indentation, format_cache, lib_cache): """Copies algorithms (and required libraries/dataformats) from the server. Parameters: webapi (object): An instance of our WebAPI class, prepared to access the BEAT server of interest prefix (str): A string representing the root of the path in which the user objects are stored names (list): A list of strings, each representing the unique relative path of the objects to retrieve or a list of usernames from which to retrieve objects. If the list is empty, then we pull all available objects of a given type. If no user is set, then pull all public objects of a given type. force (bool): If set to ``True``, then overwrites local changes with the remotely retrieved copies. indentation (int): The indentation level, useful if this function is called recursively while downloading different object types. This is normally set to ``0`` (zero). format_cache (dict): A dictionary containing all dataformats already downloaded. lib_cache (dict): A dictionary containing all libraries already downloaded. Returns: int: Indicating the exit status of the command, to be reported back to the calling process. This value should be zero if everything works OK, otherwise, different than zero (POSIX compliance). """ from .dataformats import pull as dataformats_pull from .libraries import pull as libraries_pull status, names = common.pull(webapi, prefix, 'algorithm', names, ['declaration', 'code', 'description'], force, indentation) if status != 0: return status # see what dataformats one needs to pull indent = indentation * ' ' dataformats = [] libraries = [] for name in names: obj = algorithm.Algorithm(prefix, name) dataformats.extend(obj.dataformats.keys()) libraries.extend(obj.libraries.keys()) # downloads any formats to which we depend on df_status = dataformats_pull(webapi, prefix, dataformats, force, indentation + 2, format_cache) lib_status = libraries_pull(webapi, prefix, libraries, force, indentation + 2, lib_cache) return status + df_status + lib_status def print_examples(): print """ To feed data from a database to an algorithm: ============================================= { "algorithm": "//", "channel": "main", "inputs": { "": { "database": "/", "protocol": "", "set": "", "output": "", "channel": "main" } }, "outputs": { "": { "channel": "train" } }, "parameters": { "": }, "environment": { "name": "", "version": "" } } To feed data from a file in the cache: ====================================== { "algorithm": "//", "channel": "main", "inputs": { "": { "hash": "", "channel": "main" } }, "outputs": { "": { "channel": "train" } }, "parameters": { "": }, "environment": { "name": "", "version": "" } } To execute an analyzer: ======================= { "algorithm": "//", "channel": "main", "inputs": { "": { "hash": "", "channel": "main" } }, "parameters": { "": }, "environment": { "name": "", "version": "" } } """ def execute(prefix, cache, instructions_file): try: # Load the JSON configuration if not os.path.exists(instructions_file): raise IOError("JSON instructions file `%s' not found" % instructions_file) with open(instructions_file, 'r') as f: configuration = json.load(f) # Add missing configuration fields configuration['queue'] = 'unused' configuration['nb_slots'] = 1 if not configuration.has_key('parameters'): configuration['parameters'] = {} for name, cfg in configuration['inputs'].items(): cfg['endpoint'] = name if cfg.has_key('database'): # Connected to a database output db = DatabaseStorage(prefix, cfg['database']) cfg['hash'] = hash.hashDatasetOutput(db.hash(), cfg['protocol'], cfg['set'], cfg['output']) cfg['path'] = hash.toPath(cfg['hash'], '') else: cfg['path'] = hash.toPath(cfg['hash'], '') algo = AlgorithmStorage(prefix, configuration['algorithm']) if configuration.has_key('outputs'): # Standard algorithm for name, cfg in configuration['outputs'].items(): cfg['endpoint'] = name cfg['hash'] = hash.hashBlockOutput( 'block', configuration['algorithm'], algo.hash(), configuration['parameters'], configuration['environment'], dict([(k, v['hash']) for k, v in configuration['inputs'].items()]), name, ) cfg['path'] = hash.toPath(cfg['hash'], '') else: # Analyzer configuration['result'] = {} configuration['result']['hash'] = hash.hashAnalyzer( 'block', configuration['algorithm'], algo.hash(), configuration['parameters'], configuration['environment'], dict([(k, v['hash']) for k, v in configuration['inputs'].items()]), ) configuration['result']['path'] = hash.toPath(configuration['result']['hash'], '') # Sets up the execution dataformat_cache = {} database_cache = {} algorithm_cache = {} host = Host() host.setup(raise_on_errors=False) executor = Executor(prefix, configuration, cache, dataformat_cache, database_cache, algorithm_cache) if not executor.valid: logger.error('Invalid configuration:\n * %s' % '\n * '.join(executor.errors)) return 1 # Execute the algorithm with executor: result = executor.process(host) # Display the results if configuration.has_key('outputs'): # Standard algorithm print 'Outputs of the algorithms available at:' for name, cfg in configuration['outputs'].items(): print ' - %s: %s' % (name, cfg['path']) else: print 'Results of the analyzer available at: %s' % configuration['result']['path'] except Exception as e: import traceback logger.error(traceback.format_exc()) return 1 return 0 def process(args): if args['list']: if args['--remote']: with common.make_webapi(args['config']) as webapi: return common.display_remote_list(webapi, 'algorithm') else: return common.display_local_list(args['config'].path, 'algorithm') elif args['check']: return common.check(args['config'].path, 'algorithm', args['']) elif args['pull']: with common.make_webapi(args['config']) as webapi: return pull(webapi, args['config'].path, args[''], args['--force'], 0, {}, {}) elif args['push']: with common.make_webapi(args['config']) as webapi: return common.push(webapi, args['config'].path, 'algorithm', args[''], ['name', 'declaration', 'code', 'description'], {}, args['--force'], args['--dry-run'], 0) elif args['diff']: with common.make_webapi(args['config']) as webapi: return common.diff(webapi, args['config'].path, 'algorithm', args[''][0], ['declaration', 'code', 'description']) elif args['status']: with common.make_webapi(args['config']) as webapi: return common.status(webapi, args['config'].path, 'algorithm')[0] elif args['create']: return common.create(args['config'].path, 'algorithm', args['']) elif args['version']: return common.new_version(args['config'].path, 'algorithm', args[''][0]) elif args['fork']: return common.fork(args['config'].path, 'algorithm', args[''], args['']) elif args['execute']: if args['--examples']: print_examples() return 0 return execute(args['config'].path, args['config'].cache, args['']) elif args['rm']: if args['--remote']: with common.make_webapi(args['config']) as webapi: return common.delete_remote(webapi, 'algorithm', args['']) else: return common.delete_local(args['config'].path, 'algorithm', args['']) # Should not happen logger.error("unrecognized `algorithms' subcommand") return 1