Commit d5d67929 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[experiments] per-commit cleanup

parent 85b35556
......@@ -61,31 +61,37 @@ logger = logging.getLogger(__name__)
def run_experiment(configuration, name, force, use_docker, use_local, quiet):
'''Run experiments locally'''
"""Run experiments locally"""
def load_result(executor):
'''Loads the result of an experiment, in a single go'''
"""Loads the result of an experiment, in a single go"""
f = CachedDataSource()
assert f.setup(os.path.join(executor.cache,
executor.data['result']['path'] + '.data'),
executor.prefix)
success = f.setup(
os.path.join(executor.cache, executor.data["result"]["path"] + ".data"),
executor.prefix,
)
if not success:
raise RuntimeError("Failed to setup cached data source")
data, start, end = f[0]
return data
def print_results(executor):
data = load_result(executor)
r = reindent(simplejson.dumps(data.as_dict(), indent=2,
cls=NumpyJSONEncoder), 2)
r = reindent(
simplejson.dumps(data.as_dict(), indent=2, cls=NumpyJSONEncoder), 2
)
logger.info(" Results:\n%s", r)
def reindent(s, n):
'''Re-indents output so it is more visible'''
margin = n * ' '
return margin + ('\n' + margin).join(s.split('\n'))
"""Re-indents output so it is more visible"""
margin = n * " "
return margin + ("\n" + margin).join(s.split("\n"))
def simplify_time(s):
'''Re-writes the time so it is easier to understand it'''
"""Re-writes the time so it is easier to understand it"""
minute = 60.0
hour = 60 * minute
......@@ -110,7 +116,7 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
return "%d days %d h %d m %.2f s" % (days, hours, minutes, seconds)
def simplify_size(s):
'''Re-writes the size so it is easier to understand it'''
"""Re-writes the size so it is easier to understand it"""
kb = 1024.0
mb = kb * kb
......@@ -129,14 +135,17 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
def index_experiment_databases(cache_path, experiment):
for block_name, infos in experiment.datasets.items():
view = infos['database'].view(infos['protocol'], infos['set'])
filename = toPath(hashDataset(infos['database'].name,
infos['protocol'],
infos['set']),
suffix='.db')
view = infos["database"].view(infos["protocol"], infos["set"])
filename = toPath(
hashDataset(infos["database"].name, infos["protocol"], infos["set"]),
suffix=".db",
)
database_index_path = os.path.join(cache_path, filename)
if not os.path.exists(database_index_path):
logger.info("Index for database %s not found, building it", infos['database'].name)
logger.info(
"Index for database %s not found, building it",
infos["database"].name,
)
view.index(database_index_path)
dataformat_cache = {}
......@@ -144,14 +153,19 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
algorithm_cache = {}
library_cache = {}
experiment = Experiment(configuration.path, name,
dataformat_cache, database_cache,
algorithm_cache, library_cache)
experiment = Experiment(
configuration.path,
name,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
if not experiment.valid:
logger.error("Failed to load the experiment `%s':", name)
for e in experiment.errors:
logger.error(' * %s', e)
logger.error(" * %s", e)
return 1
if not os.path.exists(configuration.cache):
......@@ -173,45 +187,58 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
executable = None # use the default
if use_docker:
env = value['configuration']['environment']
search_key = '%s (%s)' % (env['name'], env['version'])
env = value["configuration"]["environment"]
search_key = "%s (%s)" % (env["name"], env["version"])
if search_key not in host:
logger.error("Cannot execute block `%s' on environment `%s': "
logger.error(
"Cannot execute block `%s' on environment `%s': "
"environment was not found' - please install it",
key, search_key)
key,
search_key,
)
return 1
if use_docker:
executor = DockerExecutor(host, configuration.path,
value['configuration'],
configuration.cache, dataformat_cache,
database_cache, algorithm_cache,
library_cache)
executor = DockerExecutor(
host,
configuration.path,
value["configuration"],
configuration.cache,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
else:
executor = LocalExecutor(configuration.path,
value['configuration'],
configuration.cache, dataformat_cache,
database_cache, algorithm_cache,
executor = LocalExecutor(
configuration.path,
value["configuration"],
configuration.cache,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
configuration.database_paths)
configuration.database_paths,
)
if not executor.valid:
logger.error(
"Failed to load the execution information for `%s':", key)
logger.error("Failed to load the execution information for `%s':", key)
for e in executor.errors:
logger.error(' * %s', e)
logger.error(" * %s", e)
return 1
if executor.outputs_exist and not force:
logger.info("Skipping execution of `%s' for block `%s' "
"- outputs exist", executor.algorithm.name, key)
logger.info(
"Skipping execution of `%s' for block `%s' " "- outputs exist",
executor.algorithm.name,
key,
)
if executor.analysis and not quiet:
logger.extra(" Outputs produced:")
print_results(executor)
continue
logger.info("Running `%s' for block `%s'",
executor.algorithm.name, key)
logger.info("Running `%s' for block `%s'", executor.algorithm.name, key)
if executable is not None:
logger.extra(" -> using executable at `%s'", executable)
else:
......@@ -220,49 +247,53 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
with executor:
result = executor.process()
if result['status'] != 0:
if result["status"] != 0:
logger.error("Block did not execute properly - outputs were reset")
logger.error(" Standard output:\n%s",
reindent(result['stdout'], 4))
logger.error(" Standard error:\n%s",
reindent(result['stderr'], 4))
logger.error(" Captured user error:\n%s",
reindent(result['user_error'], 4))
logger.error(" Captured system error:\n%s",
reindent(result['system_error'], 4))
logger.extra(" Environment: %s" % 'default environment')
logger.error(" Standard output:\n%s", reindent(result["stdout"], 4))
logger.error(" Standard error:\n%s", reindent(result["stderr"], 4))
logger.error(
" Captured user error:\n%s", reindent(result["user_error"], 4)
)
logger.error(
" Captured system error:\n%s", reindent(result["system_error"], 4)
)
logger.extra(" Environment: %s" % "default environment")
return 1
elif use_docker:
stats = result['statistics']
cpu_stats = stats['cpu']
data_stats = stats['data']
stats = result["statistics"]
cpu_stats = stats["cpu"]
data_stats = stats["data"]
cpu_total = cpu_stats['total']
cpu_total = cpu_stats["total"]
# Likely means that GPU was used
if not cpu_total:
cpu_total = 1.0
logger.extra(" CPU time (user, system, total, percent): "
"%s, %s, %s, %d%%",
simplify_time(cpu_stats['user']),
simplify_time(cpu_stats['system']),
logger.extra(
" CPU time (user, system, total, percent): " "%s, %s, %s, %d%%",
simplify_time(cpu_stats["user"]),
simplify_time(cpu_stats["system"]),
simplify_time(cpu_total),
100. * (cpu_stats['user'] + cpu_stats['system']) /
cpu_total)
logger.extra(" Memory usage: %s",
simplify_size(stats['memory']['rss']))
logger.extra(" Cached input read: %s, %s",
simplify_time(data_stats['time']['read']),
simplify_size(data_stats['volume']['read']))
logger.extra(" Cached output write: %s, %s",
simplify_time(data_stats['time']['write']),
simplify_size(data_stats['volume']['write']))
logger.extra(" Communication time: %s (%d%%)",
simplify_time(data_stats['network']['wait_time']),
100. * data_stats['network']['wait_time'] /
cpu_total)
100.0 * (cpu_stats["user"] + cpu_stats["system"]) / cpu_total,
)
logger.extra(" Memory usage: %s", simplify_size(stats["memory"]["rss"]))
logger.extra(
" Cached input read: %s, %s",
simplify_time(data_stats["time"]["read"]),
simplify_size(data_stats["volume"]["read"]),
)
logger.extra(
" Cached output write: %s, %s",
simplify_time(data_stats["time"]["write"]),
simplify_size(data_stats["volume"]["write"]),
)
logger.extra(
" Communication time: %s (%d%%)",
simplify_time(data_stats["network"]["wait_time"]),
100.0 * data_stats["network"]["wait_time"] / cpu_total,
)
else:
logger.extra(" Environment: %s" % 'local environment')
logger.extra(" Environment: %s" % "local environment")
if not quiet:
if executor.analysis:
......@@ -270,10 +301,10 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
logger.extra(" Outputs produced:")
if executor.analysis:
logger.extra(" * %s", executor.data['result']['path'])
logger.extra(" * %s", executor.data["result"]["path"])
else:
for name, details in executor.data['outputs'].items():
logger.extra(" * %s", details['path'])
for name, details in executor.data["outputs"].items():
logger.extra(" * %s", details["path"])
else:
logger.info("Done")
......@@ -281,21 +312,26 @@ def run_experiment(configuration, name, force, use_docker, use_local, quiet):
def caches_impl(configuration, name, ls, delete, checksum):
'''List all cache files involved in this experiment'''
"""List all cache files involved in this experiment"""
dataformat_cache = {}
database_cache = {}
algorithm_cache = {}
library_cache = {}
experiment = Experiment(configuration.path, name,
dataformat_cache, database_cache,
algorithm_cache, library_cache)
experiment = Experiment(
configuration.path,
name,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
if not experiment.valid:
logger.error("Failed to load the experiment `%s':", name)
for e in experiment.errors:
logger.error(' * %s', e)
logger.error(" * %s", e)
return 1
scheduled = experiment.setup()
......@@ -303,45 +339,47 @@ def caches_impl(configuration, name, ls, delete, checksum):
block_list = []
for key, value in scheduled.items():
block = {
'name': key,
'algorithm': value['configuration']['algorithm'],
'is_analyser': False,
'paths': []
"name": key,
"algorithm": value["configuration"]["algorithm"],
"is_analyser": False,
"paths": [],
}
if 'outputs' in value['configuration']: # normal block
for name, data in value['configuration']['outputs'].items():
block['paths'].append(data['path'])
if "outputs" in value["configuration"]: # normal block
for name, data in value["configuration"]["outputs"].items():
block["paths"].append(data["path"])
else: # analyzer
block['is_analyser'] = True
block['paths'].append(value['configuration']['result']['path'])
block["is_analyser"] = True
block["paths"].append(value["configuration"]["result"]["path"])
block_list.append(block)
for block in block_list:
block_type = 'analyzer' if block['is_analyser'] else 'algorithm'
logger.info("block: `%s'", block['name'])
logger.info(" %s: `%s'", block_type, block['algorithm'])
block_type = "analyzer" if block["is_analyser"] else "algorithm"
logger.info("block: `%s'", block["name"])
logger.info(" %s: `%s'", block_type, block["algorithm"])
for path in block['paths']:
for path in block["paths"]:
# prefix cache path
path = os.path.join(configuration.cache, path)
logger.info(" output: `%s'", path)
if ls:
for file in glob.glob(path + '.*'):
logger.info(' %s' % file)
for file in glob.glob(path + ".*"):
logger.info(" %s" % file)
if delete:
for file in glob.glob(path + '.*'):
for file in glob.glob(path + ".*"):
logger.info("removing `%s'...", file)
os.unlink(file)
common.recursive_rmdir_if_empty(
os.path.dirname(path), configuration.cache)
os.path.dirname(path), configuration.cache
)
if checksum:
assert load_data_index(configuration.cache, path + '.data')
if not load_data_index(configuration.cache, path + ".data"):
logger.error("Failed to load data index for {}".format(path))
logger.info("index for `%s' can be loaded and checksums", path)
return 0
......@@ -383,13 +421,21 @@ def pull_impl(webapi, prefix, names, force, indentation, format_cache):
from .algorithms import pull_impl as algorithms_pull
from .databases import pull_impl as databases_pull
status, names = common.pull(webapi, prefix, 'experiment', names,
['declaration', 'description'], force,
indentation)
if indentation == 0:
indentation = 4
status, names = common.pull(
webapi,
prefix,
"experiment",
names,
["declaration", "description"],
force,
indentation,
)
if status != 0:
logger.error(
"could not find any matching experiments - widen your search")
logger.error("could not find any matching experiments - widen your search")
return status
# see what dataformats one needs to pull
......@@ -410,18 +456,37 @@ def pull_impl(webapi, prefix, names, force, indentation, format_cache):
# downloads any formats to which we depend on
format_cache = {}
library_cache = {}
tc_status, _ = common.pull(webapi, prefix, 'toolchain',
toolchains, ['declaration', 'description'],
force, indentation + 2)
db_status = databases_pull(webapi, prefix, databases, force,
indentation + 2, format_cache)
algo_status = algorithms_pull(webapi, prefix, algorithms, force,
indentation + 2, format_cache, library_cache)
tc_status, _ = common.pull(
webapi,
prefix,
"toolchain",
toolchains,
["declaration", "description"],
force,
indentation,
)
db_status = databases_pull(
webapi, prefix, databases, force, indentation, format_cache
)
algo_status = algorithms_pull(
webapi, prefix, algorithms, force, indentation, format_cache, library_cache
)
return status + tc_status + db_status + algo_status
def plot_impl(webapi, configuration, prefix, names, remote_results, show, force, indentation, format_cache, outputfolder=None):
def plot_impl(
webapi,
configuration,
prefix,
names,
remote_results,
show,
force,
indentation,
format_cache,
outputfolder=None,
):
"""Plots experiments from the server.
Parameters:
......@@ -463,7 +528,10 @@ def plot_impl(webapi, configuration, prefix, names, remote_results, show, force,
"""
status = 0
RESULTS_SIMPLE_TYPE_NAMES = ('int32', 'float32', 'bool', 'string')
RESULTS_SIMPLE_TYPE_NAMES = ("int32", "float32", "bool", "string")
if indentation == 0:
indentation = 4
if remote_results:
if outputfolder is None:
......@@ -477,7 +545,11 @@ def plot_impl(webapi, configuration, prefix, names, remote_results, show, force,
for name in names:
if not remote_results:
if outputfolder is None:
output_folder = os.path.join(configuration.path, common.TYPE_PLURAL['experiment'], name.rsplit('/', 1)[0])
output_folder = os.path.join(
configuration.path,
common.TYPE_PLURAL["experiment"],
name.rsplit("/", 1)[0],
)
else:
# check if directory exists else create
if not os.path.isdir(outputfolder):
......@@ -486,22 +558,46 @@ def plot_impl(webapi, configuration, prefix, names, remote_results, show, force,
check_plottable = False
if not os.path.exists(configuration.cache) or remote_results:
experiment = simplejson.loads(simplejson.dumps(common.fetch_object(webapi, "experiment", name, ['results'])))
results = experiment['results']['analysis']
experiment = simplejson.loads(
simplejson.dumps(
common.fetch_object(webapi, "experiment", name, ["results"])
)
)
results = experiment["results"]["analysis"]
for key, value in results.iteritems():
# remove non plottable results
if value['type'] not in RESULTS_SIMPLE_TYPE_NAMES:
output_name = name.rsplit('/', 1)[1] + '_' + key + '.png'
if value["type"] not in RESULTS_SIMPLE_TYPE_NAMES:
output_name = name.rsplit("/", 1)[1] + "_" + key + ".png"
output_name = os.path.join(output_folder, output_name)
pl_status = plotters_pull(webapi, configuration.path, [value['type']], force, indentation + 2, {})
plot_status = plotters_plot(webapi, configuration.path, [value['type']], show, False, False, value['value'],
output_name, None, indentation + 2, format_cache)
pl_status = plotters_pull(
webapi,
configuration.path,
[value["type"]],
force,
indentation,
{},
)
plot_status = plotters_plot(
webapi,
configuration.path,
[value["type"]],
show,
False,
False,
value["value"],
output_name,
None,
indentation,
format_cache,
)
status += pl_status
status += plot_status
check_plottable = True
else:
# make sure experiment exists locally or pull it
experiments = pull_impl(webapi, configuration.path, [name], force, indentation, format_cache)
pull_impl(
webapi, configuration.path, [name], force, indentation, format_cache
)
# get information from cache
dataformat_cache = {}
......@@ -509,43 +605,77 @@ def plot_impl(webapi, configuration, prefix, names, remote_results, show, force,
algorithm_cache = {}
library_cache = {}
experiment = Experiment(configuration.path, name,
dataformat_cache, database_cache,
algorithm_cache, library_cache)
experiment = Experiment(
configuration.path,
name,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
scheduled = experiment.setup()
for key, value in scheduled.items():
executor = LocalExecutor(configuration.path,
value['configuration'],
configuration.cache, dataformat_cache,
database_cache, algorithm_cache,
executor = LocalExecutor(
configuration.path,
value["configuration"],
configuration.cache,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
configuration.database_paths)
configuration.database_paths,
)
if 'result' in executor.data:
if "result" in executor.data:
f = CachedDataSource()
assert f.setup(os.path.join(executor.cache,
executor.data['result']['path'] + '.data'),
executor.prefix)
success = f.setup(
os.path.join(
executor.cache, executor.data["result"]["path"] + ".data"
),
executor.prefix,
)
if not success:
raise RuntimeError("Failed to setup cached data source")
data, start, end = f[0]
for the_data in data.as_dict():
attr = getattr(data, the_data)
if attr.__class__.__name__.startswith('plot'):
datatype = attr.__class__.__name__.replace('_','/')
if attr.__class__.__name__.startswith("plot"):
datatype = attr.__class__.__name__.replace("_", "/")
# remove non plottable results
if datatype not in RESULTS_SIMPLE_TYPE_NAMES:
output_name = name.rsplit('/', 1)[1] + '_' + the_data + '.png'
output_name = (
name.rsplit("/", 1)[1] + "_" + the_data + ".png"
)
output_name = os.path.join(output_folder, output_name)
pl_status = plotters_pull(webapi, configuration.path, [datatype], force, indentation + 2, {})
plot_status = plotters_plot(webapi, configuration.path, [datatype], show, False, False,
data.as_dict()[the_data], output_name, None, indentation + 2, format_cache)