Commit c41e4adb authored by Philip ABBET's avatar Philip ABBET
Browse files

Merge branch 'local-executor' into 'master'

Local executor

See merge request !9
parents eb756f3f af5070cd
Pipeline #11770 passed with stage
in 8 minutes and 28 seconds
......@@ -21,3 +21,12 @@ opsnr.stt
.DS_Store
src/
html/
.beat/
algorithms/
cache/
databases/
dataformats/
experiments/
libraries/
toolchains/
.noseids
......@@ -15,7 +15,7 @@ build:
- ./bin/buildout
- ./bin/python ${PREFIX}/bin/coverage run --source=${CI_PROJECT_NAME} ${PREFIX}/bin/nosetests -sv ${CI_PROJECT_NAME}
- ./bin/python ${PREFIX}/bin/coverage report
- ./bin/python ${PREFIX}/bin/sphinx-apidoc --separate -d 2 --output=doc/api ${CI_PROJECT_NAMESPACE}
- ./bin/python ${PREFIX}/bin/sphinx-apidoc --separate -d 2 --output=doc/api beat
- ./bin/python ${PREFIX}/bin/sphinx-build doc html
tags:
- docker-build
......@@ -191,14 +191,15 @@ class Configuration(object):
if not os.path.exists(c): return
with open(c, 'rt') as f:
user_data = simplejson.load(f)
for k in DEFAULTS:
if k in user_data: self.__data[k] = user_data[k]
for k in user_data:
if self._is_valid_key(k): self.__data[k] = user_data[k]
except simplejson.JSONDecodeError:
print("WARNING: invalid state file at `%s' - removing and " \
"re-starting..." % c)
from beat.core.utils import safe_rmfile
safe_rmfile(c)
raise
# print("WARNING: invalid state file at `%s' - removing and " \
# "re-starting..." % c)
# from beat.core.utils import safe_rmfile
# safe_rmfile(c)
@property
......@@ -210,16 +211,23 @@ class Configuration(object):
return self.__data['cache']
@property
def database_paths(self):
'''A dict of paths for databases'''
return dict((k, self.__data[k]) for k in self.__data if self.is_database_key(k))
def set(self, key, value):
'''Sets or resets a field in the configuration'''
if key not in DEFAULTS:
if not self._is_valid_key(key):
print("ERROR: don't know about parameter `%s'" % key)
sys.exit(1)
if value is not None:
self.__data[key] = value
else:
elif key in DEFAULTS:
self.__data[key] = DEFAULTS[key]
self.save()
......@@ -249,6 +257,11 @@ class Configuration(object):
with os.fdopen(os.open(c, os.O_WRONLY | os.O_CREAT, 0600), 'wt') as f:
simplejson.dump(self.__data, f, indent=4)
def _is_valid_key(self, key):
return key in DEFAULTS or self.is_database_key(key)
def is_database_key(self, key):
return key.startswith('database/')
def __str__(self):
......@@ -264,6 +277,10 @@ class Configuration(object):
value = self.__data[key]
value = "`%s'" % value if value is not None else '<unset>'
retval.append(" * %-15s: %s" % (key, value))
for key in sorted([k for k in self.__data if self.is_database_key(k)]):
value = self.__data[key]
value = "`%s'" % value if value is not None else '<unset>'
retval.append(" * %-15s: %s" % (key, value))
for key in sorted([k for k in DOC if k.startswith('color')]):
value = self.__data[key]
color, on_color, attrs = colorlog_to_termcolor(value)
......
......@@ -27,7 +27,7 @@
"""Usage:
%(prog)s experiments run [--force] <name>
%(prog)s experiments run [--force] <name> [(--docker|--local)]
%(prog)s experiments caches [--list | --delete | --checksum] <name>
%(prog)s experiments list [--remote]
%(prog)s experiments check [<name>]...
......@@ -66,6 +66,8 @@ Options:
--help Display this screen
--path=<dir> Use path to write files to disk (instead of the current
directory)
--local Uses the local executor to execute the experiment on the local machine (default).
--docker Uses the docker executor to execute the experiment using docker containers.
"""
......@@ -80,13 +82,14 @@ import simplejson
from . import common
from beat.core.experiment import Experiment
from beat.core.execution import Executor
from beat.core.execution import Executor as DockerExecutor
from .local_execution import Executor as LocalExecutor
from beat.core.utils import NumpyJSONEncoder
from beat.core.data import CachedDataSource, load_data_index
from beat.core.dock import Host
def run_experiment(configuration, name, force):
def run_experiment(configuration, name, force, use_docker, use_local):
'''Run experiments locally'''
def load_result(executor):
......@@ -160,10 +163,11 @@ def run_experiment(configuration, name, force):
scheduled = experiment.setup()
# load existing environments
host = Host()
host.setup(raise_on_errors=False)
environments = host.environments
if use_docker:
# load existing environments
host = Host()
host.setup(raise_on_errors=False)
environments = host.environments
# can we execute it?
results = []
......@@ -172,17 +176,24 @@ def run_experiment(configuration, name, force):
# checks and sets-up executable
executable = None #use the default
env = value['configuration']['environment']
search_key = '%s (%s)' % (env['name'], env['version'])
if search_key not in environments:
logger.error("Cannot execute block `%s' on environment `%s': " \
"environment was not found' - please install it",
key, search_key)
return 1
if use_docker:
env = value['configuration']['environment']
search_key = '%s (%s)' % (env['name'], env['version'])
if search_key not in environments:
logger.error("Cannot execute block `%s' on environment `%s': " \
"environment was not found' - please install it",
key, search_key)
return 1
if use_docker:
executor = DockerExecutor(configuration.path, value['configuration'],
configuration.cache, dataformat_cache, database_cache,
algorithm_cache, library_cache)
else:
executor = LocalExecutor(configuration.path, value['configuration'],
configuration.cache, dataformat_cache, database_cache,
algorithm_cache, library_cache, configuration.database_paths)
executor = Executor(configuration.path, value['configuration'],
configuration.cache, dataformat_cache, database_cache,
algorithm_cache, library_cache)
if not executor.valid:
logger.error("Failed to load the execution information for `%s':", key)
......@@ -201,44 +212,50 @@ def run_experiment(configuration, name, force):
logger.extra(" -> using fallback (default) environment")
with executor:
result = executor.process(host)
if result['status'] != 0:
logger.error("Block did not execute properly - outputs were reset")
logger.error(" Standard output:\n%s", reindent(result['stdout'], 4))
logger.error(" Standard error:\n%s", reindent(result['stderr'], 4))
logger.error(" Captured user error:\n%s",
reindent(result['user_error'], 4))
logger.error(" Captured system error:\n%s",
reindent(result['system_error'], 4))
return 1
if use_docker:
result = executor.process(host)
else:
result = executor.process()
if use_docker:
if result['status'] != 0:
logger.error("Block did not execute properly - outputs were reset")
logger.error(" Standard output:\n%s", reindent(result['stdout'], 4))
logger.error(" Standard error:\n%s", reindent(result['stderr'], 4))
logger.error(" Captured user error:\n%s",
reindent(result['user_error'], 4))
logger.error(" Captured system error:\n%s",
reindent(result['system_error'], 4))
print(" Environment: %s" % 'default environment')
return 1
else:
stats = result['statistics']
logger.extra(" CPU time (user, system, total, percent): %s, %s, %s, %d%%",
simplify_time(stats['cpu']['user']),
simplify_time(stats['cpu']['system']),
simplify_time(stats['cpu']['total']),
100. * (stats['cpu']['user'] + stats['cpu']['system']) / stats['cpu']['total'],
)
logger.extra(" Memory usage: %s",
simplify_size(stats['memory']['rss']))
logger.extra(" Cached input read: %s, %s",
simplify_time(stats['data']['time']['read']),
simplify_size(stats['data']['volume']['read']))
logger.extra(" Cached output write: %s, %s",
simplify_time(stats['data']['time']['write']),
simplify_size(stats['data']['volume']['write']))
logger.extra(" Communication time: %s (%d%%)",
simplify_time(stats['data']['network']['wait_time']),
100. * stats['data']['network']['wait_time'] / stats['cpu']['total'])
else:
logger.extra(" Environment: %s" % 'local environment')
logger.extra(" Environment: %s" % 'default environment')
if executor.analysis:
data = load_result(executor)
r = reindent(simplejson.dumps(data.as_dict(), indent=2,
cls=NumpyJSONEncoder), 2)
logger.info(" Results:\n%s", r)
stats = result['statistics']
logger.extra(" CPU time (user, system, total, percent): %s, %s, %s, %d%%",
simplify_time(stats['cpu']['user']),
simplify_time(stats['cpu']['system']),
simplify_time(stats['cpu']['total']),
100. * (stats['cpu']['user'] + stats['cpu']['system']) / stats['cpu']['total'],
)
logger.extra(" Memory usage: %s",
simplify_size(stats['memory']['rss']))
logger.extra(" Cached input read: %s, %s",
simplify_time(stats['data']['time']['read']),
simplify_size(stats['data']['volume']['read']))
logger.extra(" Cached output write: %s, %s",
simplify_time(stats['data']['time']['write']),
simplify_size(stats['data']['volume']['write']))
logger.extra(" Communication time: %s (%d%%)",
simplify_time(stats['data']['network']['wait_time']),
100. * stats['data']['network']['wait_time'] / stats['cpu']['total'])
logger.extra(" Outputs produced:")
if executor.analysis:
logger.extra(" * %s", executor.data['result']['path'])
......@@ -246,6 +263,7 @@ def run_experiment(configuration, name, force):
for name, details in executor.data['outputs'].items():
logger.extra(" * %s", details['path'])
return 0
......@@ -374,7 +392,7 @@ def pull(webapi, prefix, names, force, indentation, format_cache):
def process(args):
if args['run']:
return run_experiment(args['config'], args['<name>'][0], args['--force'])
return run_experiment(args['config'], args['<name>'][0], args['--force'], args['--docker'], args['--local'])
if args['caches']:
return caches(args['config'], args['<name>'][0], args['--list'],
......
This diff is collapsed.
......@@ -31,6 +31,7 @@
import os
import nose.tools
from nose.tools import assert_raises
import simplejson
from . import tmp_prefix
......@@ -83,6 +84,41 @@ def test_set_token():
assert contents['token'] == token_value
@nose.tools.with_setup(teardown=cleanup)
def test_set_atnt_db():
db_config = 'database/atnt'
db_path = './atnt_db'
nose.tools.eq_(call('config', 'set', db_config, db_path), 0)
config = os.path.join(tmp_prefix, '.beat', 'config.json')
assert os.path.exists(config)
with open(config, 'rt') as f: contents = simplejson.load(f)
assert contents[db_config] == db_path
@nose.tools.with_setup(teardown=cleanup)
def test_set_get_atnt_db():
db_config = 'database/atnt'
db_path = './atnt_db'
nose.tools.eq_(call('config', 'set', db_config, db_path), 0)
nose.tools.eq_(call('config', 'get', db_config), 0)
@nose.tools.with_setup(teardown=cleanup)
def test_set_bad_config_key():
db_config = 'fail'
with assert_raises(SystemExit) as c:
call('config', 'set', db_config, db_config)
assert c.exception.code == 1
@nose.tools.with_setup(teardown=cleanup)
@nose.tools.raises(KeyError)
def test_get_bad_config_key():
db_config = 'fail'
nose.tools.eq_(call('config', 'get', db_config), 1)
@nose.tools.with_setup(teardown=cleanup)
def test_get_token():
nose.tools.eq_(call('config', 'get', 'token'), 0)
......
......@@ -183,20 +183,42 @@ def test_run_double_triangle_1():
@slow
@nose.tools.with_setup(teardown=cleanup)
def test_run_single_error_1():
@nose.tools.raises(NameError)
def test_run_single_error_1_local():
# When running locally, the module with the error is loaded
# inside the currently running process and will raise a NameError.
obj = 'user/user/single/1/single_error'
nose.tools.eq_(call('run', obj, cache=tmp_prefix), 1)
nose.tools.eq_(call('run', obj, '--local', cache=tmp_prefix), 1)
@slow
@nose.tools.with_setup(teardown=cleanup)
def test_run_single_error_twice():
def test_run_single_error_1_docker():
# When running on docker, the module is loaded in the docker
# container and the local process will return '1'.
obj = 'user/user/single/1/single_error'
nose.tools.eq_(call('run', obj, '--docker', cache=tmp_prefix), 1)
@slow
@nose.tools.with_setup(teardown=cleanup)
@nose.tools.raises(NameError)
def test_run_single_error_twice_local():
# This one makes sure our output reset is working properly. Both tries should
# give out the same error.
obj = 'user/user/single/1/single_error'
nose.tools.eq_(call('run', obj, '--local', cache=tmp_prefix), 1)
nose.tools.eq_(call('run', obj, '--local', cache=tmp_prefix), 1)
@slow
@nose.tools.with_setup(teardown=cleanup)
def test_run_single_error_twice_docker():
# This one makes sure our output reset is working properly. Both tries should
# give out the same error.
obj = 'user/user/single/1/single_error'
nose.tools.eq_(call('run', obj, cache=tmp_prefix), 1)
nose.tools.eq_(call('run', obj, cache=tmp_prefix), 1)
nose.tools.eq_(call('run', obj, '--docker', cache=tmp_prefix), 1)
nose.tools.eq_(call('run', obj, '--docker', cache=tmp_prefix), 1)
@nose.tools.with_setup(teardown=cleanup)
......
......@@ -10,6 +10,7 @@ develop = .
newest = false
eggs = beat.cmdline
beat.backend.python
beat.core
ipdb
[sources]
......
......@@ -46,7 +46,7 @@ needs_sphinx = '1.3'
extensions = [
'sphinx.ext.todo',
'sphinx.ext.coverage',
'sphinx.ext.pngmath',
'sphinx.ext.imgmath',
'sphinx.ext.ifconfig',
'sphinx.ext.autodoc',
'sphinx.ext.autosummary',
......@@ -69,7 +69,7 @@ autosummary_generate = True
# If we are on OSX, the 'dvipng' path maybe different
dvipng_osx = '/opt/local/libexec/texlive/binaries/dvipng'
if os.path.exists(dvipng_osx): pngmath_dvipng = dvipng_osx
if os.path.exists(dvipng_osx): imgmath_dvipng = dvipng_osx
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
......
......@@ -72,6 +72,24 @@ flag:
...
When running an experiment via the ``beat`` application using the local
executor (the default executor, also behind the ``--local`` flag), ``beat``
will look into your configuration for any options set by the user that follow
the format ``database/<db name>/<db version>``. ``beat`` expects that this
option points to a string representing the path to the root folder of the
actual database files for the given database.
For example, the AT&T "Database of Faces" is available on the BEAT platform
as the "atnt" database. The third version of the "atnt" database would be
referenced as "atnt/3". The object "atnt/3" has a root folder defined on
the BEAT platform already, and changing this locally would mean creating a
new version of the database.
Instead, you may override that path by setting the configuration option
``database/atnt/3`` to your local path to the database files.
Assuming your username is "user" and you extracted the database files to
``~/Downloads/atnt_db``, you can set ``database/atnt/3`` to
``/home/user/Downloads/atnt_db``, and ``beat`` will find the database files.
You may explore different configuration options with the ``--help`` flag of
``beat config``:
......
......@@ -21,7 +21,7 @@
.. with the BEAT platform. If not, see http://www.gnu.org/licenses/. ..
.. _beat-core-experiments-cmdline:
.. _beat-cmdline-experiments:
Experiments
-----------
......@@ -42,325 +42,60 @@ The commands available for experiments are:
:cwd: ..
.. _beat-core-experiments-running:
.. _beat-cmdline-experiments-running:
How to run an experiment?
.........................
The ``run_toolchain.py`` script can be used to perform the experiment defined
in a toolchain. It is the ideal way to debug an algorithm, since this script
doesn't try to do any advanced trick like the Scheduler (multi-processing,
optimizations, sandboxing, ...).
For example, we execute a simple toolchain with two processing blocks (found in
``src/beat.core/beat/core/test/toolchains/integers_addition2.json``):
.. code-block:: sh
$ ./bin/run_toolchain.py --prefix=src/beat.core/beat/core/test/ integers_addition2
Processing block 'addition1'...
Algorithm: sum
Inputs:
- a (single_integer): beat/src/beat.core/beat/core/test/databases/integers/output1.data
- b (single_integer): beat/src/beat.core/beat/core/test/databases/integers/output2.data
Outputs:
- sum (single_integer): beat/src/beat.core/beat/core/test/cache/addition1/sum.data
Processing block 'addition2'...
Algorithm: sum
Inputs:
- a (single_integer): beat/src/beat.core/beat/core/test/cache/addition1/sum.data
- b (single_integer): beat/src/beat.core/beat/core/test/databases/integers/output3.data
Outputs:
- sum (single_integer): beat/src/beat.core/beat/core/test/cache/addition2/sum.data
DONE
Results available at:
- addition2.sum: beat/src/beat.core/beat/core/test/cache/addition2/sum.data
The command ``beat experiments run <name>`` can be used to run the experiment
defined in an experiment definition file. It is the ideal way to debug an
experiment, since by default ``beat`` will use the local executor, which provides
a simple environment with PDB support without advanced features
(multi-processing, optimizations, sandboxing, multiple environments, etc.).
Here, the ``--prefix`` option is used to tell the scripts where all our data
formats, toolchains and algorithms are located, and ``integers_addition2`` is
the name of the toolchain we want to check (note that we don't add the
``.json`` extension, as this is the name of the toolchain, not the filename!).
formats, toolchains and algorithms are located. This option can be set
in your configuration file (see ``beat config``).
This script displays for each block the files containing the data to use as
This command displays for each block the files containing the data to use as
input, and the files generated by the outputs of the block.
By default, files are generated in binary format, but you can force them to be
in a more readable JSON format with the ``--json`` flag:
.. code-block:: sh
$ ./bin/run_toolchain.py --prefix=src/beat.core/beat/core/test/ --json integers_addition2
The default behavior is to not regenerate data files already present in the
cache. You can force the script to not take the content of the cache into
account with the ``--force`` flag:
.. code-block:: sh
$ ./bin/run_toolchain.py --prefix=src/beat.core/beat/core/test/ --force integers_addition2
.. _beat-core-experiments-displaydata:
account with the ``--force`` flag.
How to examine the content of a data file?
..........................................
The ``display_data.py`` script can be used to examine the content of a data
file generated by the execution of a toolchain.
For example, we look at the content of one of the data file used by the tests
of beat.core (found in
``src/beat.core/beat/core/test/data/single_integer.data``):
.. code-block:: sh
$ ./bin/display_data.py --prefix=src/beat.core/beat/core/test data/single_integer_delayed.data
Data format: single_integer
----------------------------------------------
Indexes: 0-1
{
"value": 0
}
----------------------------------------------
Indexes: 2-3
{
"value": 1
}
----------------------------------------------
Indexes: 4-5
{
"value": 2
}
----------------------------------------------
Indexes: 6-7
{
"value": 3
}
----------------------------------------------
Indexes: 8-9
{
"value": 4
}
----------------------------------------------
Indexes: 10-11
{
"value": 5
}
----------------------------------------------
Indexes: 12-13
{
"value": 6
}
----------------------------------------------
Indexes: 14-15
{
"value": 7
}
----------------------------------------------
Indexes: 16-17
{
"value": 8
}
----------------------------------------------
Indexes: 18-19
{
"value": 9
}
The script tells us that the data correspond to the data format
``single_integer``, and displays each entry (with the indexes it correspond to)
in a JSON representation.
Executors
=========
"Executors" are modules that execute each block in an experiment. On the BEAT
platform, there is only the one executor, which executes the experiment using
Docker containers with advanced scheduling and security features. When
developing using ``beat.cmdline``, however, you have the option of using either
the BEAT platform's executor, behind the ``--docker`` flag, or the "local"
executor, provided in this project. The local executor, as explained above, is
much simpler, aimed at providing a smooth development experience. However,
there are two important tradeoffs:
.. _beat-core-experiments-example:
- Lower performance for non-trivial experiments, as it runs everything
synchronously in one process on the CPU.
- No multiple environments, as the Python environment that built
``beat.cmdline`` is used. This means that many BEAT experiments that
rely on different/multiple environments will not work.
Putting it all together: a complete example
...........................................
If you want to use the local executor, pay attention to the python environment
used to call `buildout` in your copy of ``beat.cmdline``. The suggested way
to use Bob libraries while developing on the local executor is to use install
``zc.buildout`` in a Python2.7 conda environment with Bob installed. Using
the ``buildout`` command from the environment will make the entire environment
available to ``beat.cmdline`` even when the environment is not active.
.. _beat-core-experiments-example-figure:
.. figure:: img/toolchain-example.*
A complete toolchain that train and test a face detector
.. _beat-cmdline-experiments-displaydata:
The following example describes the toolchain visible at :num:`figure
#beat-core-toolchains-example-figure`, a complete toolchain that: