Commit af22b1ba authored by Philip ABBET's avatar Philip ABBET

[algorithms] Add the command 'execute'

parent e3b5b4d6
......@@ -37,6 +37,8 @@
%(prog)s algorithms version <name>
%(prog)s algorithms fork <src> <dst>
%(prog)s algorithms rm [--remote] <name>...
%(prog)s algorithms execute <instructions>
%(prog)s algorithms execute --examples
%(prog)s algorithms --help
......@@ -51,13 +53,19 @@ Commands:
version Creates a new version of an existing algorithm
fork Forks a local algorithm
rm Deletes a local algorithm (unless --remote is specified)
execute Execute an algorithm following instructions in a JSON file
Options:
--force Performs operation regardless of conflicts
--dry-run Doesn't really perform the task, just comments what would do
--remote Only acts on the remote copy of the algorithm
--help Display this screen
--force Performs operation regardless of conflicts
--dry-run Doesn't really perform the task, just comments what would do
--remote Only acts on the remote copy of the algorithm
--examples Display some example JSON instruction files
--help Display this screen
Arguments for 'execute':
<instructions> JSON file containing the instructions
Examples:
......@@ -75,9 +83,21 @@ Examples:
import logging
logger = logging.getLogger(__name__)
import os
import sys
import docopt
import simplejson as json
from . import common
from beat.core import algorithm
from beat.core.execution import Executor
from beat.core.dock import Host
from beat.core import hash
from beat.backend.python.database import Storage as DatabaseStorage
from beat.backend.python.algorithm import Storage as AlgorithmStorage
def pull(webapi, prefix, names, force, indentation, format_cache, lib_cache):
......@@ -147,6 +167,180 @@ def pull(webapi, prefix, names, force, indentation, format_cache, lib_cache):
return status + df_status + lib_status
def print_examples():
print """
To feed data from a database to an algorithm:
=============================================
{
"algorithm": "<username>/<algorithm>/<version>",
"channel": "main",
"inputs": {
"<input_name>": {
"database": "<database>/<version>",
"protocol": "<protocol>",
"set": "<set>",
"output": "<output_name>",
"channel": "main"
}
},
"outputs": {
"<output_name>": {
"channel": "train"
}
},
"parameters": {
"<parameter_name>": <value>
},
"environment": {
"name": "<environment_name>",
"version": "<environment_version>"
}
}
To feed data from a file in the cache:
======================================
{
"algorithm": "<username>/<algorithm>/<version>",
"channel": "main",
"inputs": {
"<input_name>": {
"hash": "<hash>",
"channel": "main"
}
},
"outputs": {
"<output_name>": {
"channel": "train"
}
},
"parameters": {
"<parameter_name>": <value>
},
"environment": {
"name": "<environment_name>",
"version": "<environment_version>"
}
}
To execute an analyzer:
=======================
{
"algorithm": "<username>/<algorithm>/<version>",
"channel": "main",
"inputs": {
"<input_name>": {
"hash": "<hash>",
"channel": "main"
}
},
"parameters": {
"<parameter_name>": <value>
},
"environment": {
"name": "<environment_name>",
"version": "<environment_version>"
}
}
"""
def execute(prefix, cache, instructions_file):
try:
# Load the JSON configuration
if not os.path.exists(instructions_file):
raise IOError("JSON instructions file `%s' not found" % instructions_file)
with open(instructions_file, 'r') as f:
configuration = json.load(f)
# Add missing configuration fields
configuration['queue'] = 'unused'
configuration['nb_slots'] = 1
if not configuration.has_key('parameters'):
configuration['parameters'] = {}
for name, cfg in configuration['inputs'].items():
cfg['endpoint'] = name
if cfg.has_key('database'): # Connected to a database output
db = DatabaseStorage(prefix, cfg['database'])
cfg['hash'] = hash.hashDatasetOutput(db.hash(), cfg['protocol'], cfg['set'], cfg['output'])
cfg['path'] = hash.toPath(cfg['hash'], '')
else:
cfg['path'] = hash.toPath(cfg['hash'], '')
algo = AlgorithmStorage(prefix, configuration['algorithm'])
if configuration.has_key('outputs'): # Standard algorithm
for name, cfg in configuration['outputs'].items():
cfg['endpoint'] = name
cfg['hash'] = hash.hashBlockOutput(
'block',
configuration['algorithm'],
algo.hash(),
configuration['parameters'],
configuration['environment'],
dict([(k, v['hash']) for k, v in configuration['inputs'].items()]),
name,
)
cfg['path'] = hash.toPath(cfg['hash'], '')
else: # Analyzer
configuration['result'] = {}
configuration['result']['hash'] = hash.hashAnalyzer(
'block',
configuration['algorithm'],
algo.hash(),
configuration['parameters'],
configuration['environment'],
dict([(k, v['hash']) for k, v in configuration['inputs'].items()]),
)
configuration['result']['path'] = hash.toPath(configuration['result']['hash'], '')
# Sets up the execution
dataformat_cache = {}
database_cache = {}
algorithm_cache = {}
host = Host()
host.setup(raise_on_errors=False)
executor = Executor(prefix, configuration, cache, dataformat_cache,
database_cache, algorithm_cache)
if not executor.valid:
logger.error('Invalid configuration:\n * %s' % '\n * '.join(executor.errors))
return 1
# Execute the algorithm
with executor:
result = executor.process(host)
# Display the results
if configuration.has_key('outputs'): # Standard algorithm
print 'Outputs of the algorithms available at:'
for name, cfg in configuration['outputs'].items():
print ' - %s: %s' % (name, cfg['path'])
else:
print 'Results of the analyzer available at: %s' % configuration['result']['path']
except Exception as e:
import traceback
logger.error(traceback.format_exc())
return 1
return 0
def process(args):
if args['list']:
......@@ -190,6 +384,13 @@ def process(args):
return common.fork(args['config'].path, 'algorithm',
args['<src>'], args['<dst>'])
elif args['execute']:
if args['--examples']:
print_examples()
return 0
return execute(args['config'].path, args['config'].cache, args['<instructions>'])
elif args['rm']:
if args['--remote']:
with common.make_webapi(args['config']) as webapi:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment