Commit a0356fee authored by Philip ABBET's avatar Philip ABBET

Refactoring: Move LocalExecutor into beat.core

parent c45e0782
......@@ -91,7 +91,7 @@ import simplejson as json
from . import common
from beat.core import algorithm
from beat.core.execution import Executor
from beat.core.execution import DockerExecutor
from beat.core.dock import Host
from beat.core import hash
......@@ -313,8 +313,8 @@ def execute(prefix, cache, instructions_file):
host = Host()
host.setup(raise_on_errors=False)
executor = Executor(host, prefix, configuration, cache, dataformat_cache,
database_cache, algorithm_cache)
executor = DockerExecutor(host, prefix, configuration, cache, dataformat_cache,
database_cache, algorithm_cache)
if not executor.valid:
logger.error('Invalid configuration:\n * %s' % '\n * '.join(executor.errors))
......
......@@ -82,8 +82,8 @@ import simplejson
from . import common
from beat.core.experiment import Experiment
from beat.core.execution import Executor as DockerExecutor
from .local_execution import Executor as LocalExecutor
from beat.core.execution import DockerExecutor
from beat.core.execution import LocalExecutor
from beat.core.utils import NumpyJSONEncoder
from beat.core.data import CachedDataSource, load_data_index
from beat.core.dock import Host
......@@ -213,36 +213,35 @@ def run_experiment(configuration, name, force, use_docker, use_local):
with executor:
result = executor.process()
if use_docker:
if result['status'] != 0:
logger.error("Block did not execute properly - outputs were reset")
logger.error(" Standard output:\n%s", reindent(result['stdout'], 4))
logger.error(" Standard error:\n%s", reindent(result['stderr'], 4))
logger.error(" Captured user error:\n%s",
reindent(result['user_error'], 4))
logger.error(" Captured system error:\n%s",
reindent(result['system_error'], 4))
print(" Environment: %s" % 'default environment')
return 1
else:
stats = result['statistics']
logger.extra(" CPU time (user, system, total, percent): %s, %s, %s, %d%%",
simplify_time(stats['cpu']['user']),
simplify_time(stats['cpu']['system']),
simplify_time(stats['cpu']['total']),
100. * (stats['cpu']['user'] + stats['cpu']['system']) / stats['cpu']['total'],
)
logger.extra(" Memory usage: %s",
simplify_size(stats['memory']['rss']))
logger.extra(" Cached input read: %s, %s",
simplify_time(stats['data']['time']['read']),
simplify_size(stats['data']['volume']['read']))
logger.extra(" Cached output write: %s, %s",
simplify_time(stats['data']['time']['write']),
simplify_size(stats['data']['volume']['write']))
logger.extra(" Communication time: %s (%d%%)",
simplify_time(stats['data']['network']['wait_time']),
100. * stats['data']['network']['wait_time'] / stats['cpu']['total'])
if result['status'] != 0:
logger.error("Block did not execute properly - outputs were reset")
logger.error(" Standard output:\n%s", reindent(result['stdout'], 4))
logger.error(" Standard error:\n%s", reindent(result['stderr'], 4))
logger.error(" Captured user error:\n%s",
reindent(result['user_error'], 4))
logger.error(" Captured system error:\n%s",
reindent(result['system_error'], 4))
print(" Environment: %s" % 'default environment')
return 1
elif use_docker:
stats = result['statistics']
logger.extra(" CPU time (user, system, total, percent): %s, %s, %s, %d%%",
simplify_time(stats['cpu']['user']),
simplify_time(stats['cpu']['system']),
simplify_time(stats['cpu']['total']),
100. * (stats['cpu']['user'] + stats['cpu']['system']) / stats['cpu']['total'],
)
logger.extra(" Memory usage: %s",
simplify_size(stats['memory']['rss']))
logger.extra(" Cached input read: %s, %s",
simplify_time(stats['data']['time']['read']),
simplify_size(stats['data']['volume']['read']))
logger.extra(" Cached output write: %s, %s",
simplify_time(stats['data']['time']['write']),
simplify_size(stats['data']['volume']['write']))
logger.extra(" Communication time: %s (%d%%)",
simplify_time(stats['data']['network']['wait_time']),
100. * stats['data']['network']['wait_time'] / stats['cpu']['total'])
else:
logger.extra(" Environment: %s" % 'local environment')
......
This diff is collapsed.
......@@ -183,10 +183,9 @@ def test_run_double_triangle_1():
@slow
@nose.tools.with_setup(teardown=cleanup)
@nose.tools.raises(NameError)
def test_run_single_error_1_local():
# When running locally, the module with the error is loaded
# inside the currently running process and will raise a NameError.
# inside the currently running process and will return '1'.
obj = 'user/user/single/1/single_error'
nose.tools.eq_(call('run', obj, '--local', cache=tmp_prefix), 1)
......@@ -202,7 +201,6 @@ def test_run_single_error_1_docker():
@slow
@nose.tools.with_setup(teardown=cleanup)
@nose.tools.raises(NameError)
def test_run_single_error_twice_local():
# This one makes sure our output reset is working properly. Both tries should
# give out the same error.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment