Commit 15724fb2 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI
Browse files

remove prefix requirement from execution

parent 057a240d
......@@ -163,6 +163,7 @@ class LocalExecutor(BaseExecutor):
algorithm_cache=None,
library_cache=None,
custom_root_folders=None,
use_temp_prefix=True,
):
super(LocalExecutor, self).__init__(
......@@ -186,6 +187,7 @@ class LocalExecutor(BaseExecutor):
self.loop_socket = None
self.zmq_context = None
self.use_temp_prefix = use_temp_prefix
def __cleanup(self, early=False):
if self.loop_executor:
......@@ -274,7 +276,9 @@ class LocalExecutor(BaseExecutor):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
contributions_prefix = os.path.join(prefix, contribution_kind) + os.sep
contributions_prefix = ""
if prefix is not None:
contributions_prefix = os.path.join(prefix, contribution_kind) + os.sep
for first_line, line in enumerate(tb):
if line[0].startswith(contributions_prefix):
......@@ -300,64 +304,81 @@ class LocalExecutor(BaseExecutor):
self.executor_socket = self.zmq_context.socket(zmq.PAIR)
self.executor_socket.connect(self.message_handler.address)
self.working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(self.working_dir, "prefix")
if self.use_temp_prefix:
self.working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(self.working_dir, "prefix")
self.dump_runner_configuration(self.working_dir)
self.algorithm.export(working_prefix)
self.dump_runner_configuration(self.working_dir)
self.algorithm.export(working_prefix)
if self.loop_algorithm:
self.loop_algorithm.export(working_prefix)
self.loop_message_handler = LoopMessageHandler("127.0.0.1")
self.loop_socket = self.zmq_context.socket(zmq.PAIR)
self.loop_socket.connect(self.loop_message_handler.address)
if self.loop_algorithm:
self.loop_algorithm.export(working_prefix)
self.loop_message_handler = LoopMessageHandler("127.0.0.1")
self.loop_socket = self.zmq_context.socket(zmq.PAIR)
self.loop_socket.connect(self.loop_message_handler.address)
self.loop_executor = LoopExecutor(
self.loop_message_handler,
self.loop_executor = LoopExecutor(
self.loop_message_handler,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache,
)
try:
retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval:
self.__cleanup()
error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
if not prepared:
self.__cleanup()
error = "Loop algorithm {} prepare failed".format(
self.algorithm.name
)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process()
self.executor = AlgorithmExecutor(
self.executor_socket,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache,
loop_socket=self.loop_socket,
)
else:
from beat.backend.python.helpers import (
convert_experiment_configuration_to_container,
)
try:
retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval:
self.__cleanup()
error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
if not prepared:
self.__cleanup()
error = "Loop algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process()
self.executor = AlgorithmExecutor(
self.executor_socket,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache,
loop_socket=self.loop_socket,
)
config = convert_experiment_configuration_to_container(self.data)
config["algorithm"] = self.algorithm
self.executor = AlgorithmExecutor.from_dict(
config,
socket=self.executor_socket,
loop_socket=self.loop_socket,
database_cache=self.databases,
cache_root=self.cache,
)
try:
status = self.executor.setup()
......
......@@ -206,6 +206,19 @@ class Experiment(object):
self.prefix = prefix
# initializes the internal object cache
self._init()
# temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
self._load(
data, database_cache, dataformat_cache, algorithm_cache, library_cache
)
def _init(self):
self.toolchain = None
self._label = None
......@@ -219,16 +232,30 @@ class Experiment(object):
self.databases = {}
self.algorithms = {}
return self
# temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
self._load(
data, database_cache, dataformat_cache, algorithm_cache, library_cache
@classmethod
def from_dict(
cls,
data,
toolchain,
dataformat_cache=None,
database_cache=None,
algorithm_cache=None,
library_cache=None,
label=None,
):
self = cls.__new__(cls)._init()
self.data = data
self._label = label
self.toolchain = toolchain
self._validate(
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
return self
def _load(
self, data, database_cache, dataformat_cache, algorithm_cache, library_cache
......@@ -260,16 +287,34 @@ class Experiment(object):
# this runs basic validation, including JSON loading if required
self.data, self.errors = schema.validate("experiment", experiment_data)
# load toolchain
self._load_toolchain(toolchain_data)
if self.errors:
return # don't proceed with the rest of validation
self._validate(
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
)
def _validate(
self,
dataformat_cache,
database_cache,
algorithm_cache,
library_cache,
):
# checks all internal aspects of the experiment
self._check_datasets(database_cache, dataformat_cache)
self._check_blocks(algorithm_cache, dataformat_cache, library_cache)
self._check_loops(algorithm_cache, dataformat_cache, library_cache)
self._check_analyzers(algorithm_cache, dataformat_cache, library_cache)
self._check_global_parameters()
self._load_toolchain(toolchain_data)
if self.errors:
return # stop, if up to here there were problems
......@@ -310,7 +355,9 @@ class Experiment(object):
if dbname not in self.databases:
# load database
if dbname in database_cache:
if not isinstance(dbname, str):
dbname, db = dbname.name, dbname
elif dbname in database_cache:
db = database_cache[dbname]
else:
db = database.Database(self.prefix, dbname, dataformat_cache)
......@@ -366,7 +413,9 @@ class Experiment(object):
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
if not isinstance(algoname, str):
algoname, thisalgo = algoname.name, algoname
elif algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(
......@@ -536,7 +585,9 @@ class Experiment(object):
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
if not isinstance(algoname, str):
thisalgo, algoname = algoname, algoname.name
elif algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(
......@@ -1340,7 +1391,6 @@ class Experiment(object):
dependencies=exec_order[key], configuration=self._configuration(key)
)
# import ipdb; ipdb.set_trace()
for key, value in exec_order.items():
# now compute missing hashes - because we're in execution order,
# there should be no missing input hashes in any of the blocks.
......
......@@ -92,6 +92,7 @@ def setup_package():
for path in prefixes:
sp.check_call(["rsync", "-arz", path, prefix_folder])
sp.check_call(["rsync", "-arz", path, "/home/amir/idiap/git/beat/beat.test.prefix"])
if DOCKER_NETWORK_TEST_ENABLED:
import docker
......
import unittest
import numpy as np
import os
import tempfile
from collections import namedtuple
import logging
import simplejson
from beat.backend.python.algorithm import Algorithm_, Analyzer_
from beat.backend.python.dataformat import DataFormat_
from beat.backend.python.database import Database_, ViewRunner, View
from beat.core.toolchain import Toolchain
from beat.core.experiment import Experiment
from beat.core.hash import hashDataset
from beat.core.hash import toPath
from beat.core.execution import LocalExecutor
from beat.core.data import CachedDataSource
from beat.core.utils import NumpyJSONEncoder
logger = logging.getLogger(__name__)
def index_experiment_databases(cache_path, experiment):
for block_name, infos in experiment.datasets.items():
filename = toPath(
hashDataset(infos["database"].name, infos["protocol"], infos["set"]),
suffix=".db",
)
database_index_path = os.path.join(cache_path, filename)
if not os.path.exists(database_index_path):
print(f"Index for database {infos['database'].name} not found, building it",)
view = infos["database"].view(infos["protocol"], infos["set"])
view.index(database_index_path)
def reindent(s, n):
"""Re-indents output so it is more visible"""
margin = n * " "
return margin + ("\n" + margin).join(s.split("\n"))
def load_result(executor, analyzer):
"""Loads the result of an experiment, in a single go"""
f = CachedDataSource()
f.dataformat = analyzer.result_dataformat()
success = f.setup(
os.path.join(executor.cache, executor.data["result"]["path"] + ".data"),
executor.prefix,
)
if not success:
raise RuntimeError("Failed to setup cached data source")
data, start, end = f[0]
return data
def print_results(executor, analyzer):
data = load_result(executor, analyzer)
r = reindent(simplejson.dumps(data.as_dict(), indent=2, cls=NumpyJSONEncoder), 2)
print(f" Results:\n{r}")
class IrisLDATest(unittest.TestCase):
def setup_data_formats(self):
self.features_type = DataFormat_(
definition={"value": [0, float]}, name="user/1d_float_arrays/1"
)
self.labels_type = DataFormat_(
definition={"value": int}, name="user/integers/1"
)
self.model_type = DataFormat_(definition={"text": str}, name="user/strings/1")
self.scores_type = DataFormat_(
definition={"value": float}, name="user/floats/1"
)
def setup_database(self):
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X, y = iris.data, iris.target
# this will convert our problem to a binary classification problem
y = np.clip(y, 0, 1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=1 / 3, random_state=0, shuffle=True
)
Entry = namedtuple("Entry", ["features", "labels"])
def get_view(X, y, name):
class IrisView(View):
def index(self, root_folder, parameters):
return [Entry(x_, y_) for x_, y_ in zip(X, y)]
def get(self, output, index):
obj = self.objs[index]
return {"value": getattr(obj, output)}
view = ViewRunner(
module=IrisView(),
definition={
"outputs": {
"features": self.features_type,
"labels": self.labels_type,
}
},
name=name,
)
return view
train_view = get_view(X_train, y_train, "train")
test_view = get_view(X_test, y_test, "test")
self.database = Database_(
protocols={"main": {"sets": {"train": train_view, "test": test_view}}},
name="iris_two_class/1",
)
self.validate("database", self.database)
def setup_algorithms(self):
import pickle
import base64
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
class TrainLDA:
def setup(self, parameters):
# Retrieve the value of the parameters
self.estimator = LDA(**parameters)
return True
def __init__(self):
self.features = []
self.labels = []
def process(self, inputs, data_loaders, outputs):
# accumulate features and labels
self.features.append(inputs["features"].data.value)
self.labels.append(inputs["labels"].data.value)
if not (inputs["features"].hasMoreData()):
# train the estimator when all data is accumulated
self.estimator.fit(self.features, self.labels)
# pickle, "stringify", and output estimator
out = pickle.dumps(self.estimator)
out = base64.b64encode(out).decode("ascii")
outputs["model"].write({"text": out})
self.features, self.labels = [], []
return True
self.train_algorithm = Algorithm_(
algorithm=TrainLDA(),
groups=[
{
"inputs": {
"features": {"type": self.features_type},
"labels": {"type": self.labels_type},
},
"outputs": {"model": {"type": self.model_type}},
}
],
type=Algorithm_.SEQUENTIAL,
name="user/train_lda/1",
)
self.validate("train_algorithm", self.train_algorithm)
def load_model(text):
return pickle.loads(base64.b64decode(text))
class TestLDA:
def prepare(self, data_loaders):
# Loads the model at the beginning
loader = data_loaders.loaderOf("model")
for i in range(loader.count()):
view = loader.view("model", i)
data, _, _ = view[0]
self.transformer = load_model(data["model"].text)
return True
def process(self, inputs, data_loaders, outputs):
# N.B.: this will be called for every unit in `features'
out = self.transformer.predict([inputs["features"].data.value])[0]
# Writes the output
outputs["scores"].write({"value": out})
return True
self.test_algorithm = Algorithm_(
algorithm=TestLDA(),
groups=[
{
"name": "main",
"inputs": {"features": {"type": self.features_type}},
"outputs": {"scores": {"type": self.scores_type}},
},
{"name": "model", "inputs": {"model": {"type": self.model_type}}},
],
type=Algorithm_.SEQUENTIAL,
name="user/score_with_lda/1",
)
self.validate("test_algorithm", self.test_algorithm)
def setup_analyzer(self):
import numpy as np
from sklearn.metrics import accuracy_score
class AccuracyAnalyzer:
def __init__(self):
self.scores = []
self.labels = []
def process(self, inputs, data_loaders, output):
# accumulate features and labels
self.scores.append(inputs["scores"].data.value)
self.labels.append(inputs["labels"].data.value)
if not (inputs["scores"].hasMoreData()):
y_true = np.asarray(self.labels)
y_score = np.asarray(self.scores)
acc = accuracy_score(y_true, y_score)
output.write({"accuracy": {"value": acc}})
self.__init__()
return True
self.analyzer = Analyzer_(
algorithm=AccuracyAnalyzer(),
input_groups=[
{
"inputs": {
"scores": {"type": self.scores_type},
"labels": {"type": self.labels_type},
},
},
],
results={"accuracy": {"type": self.scores_type, "display": True}},
type=Algorithm_.SEQUENTIAL,
name="user/analyzer/1",
)
self.validate("analyzer", self.analyzer)
def setup_toolchain(self):
self.toolchain = Toolchain.from_dict(
{
"analyzers": [
{
"inputs": ["labels", "scores"],
"name": "analyzer",
"synchronized_channel": "test",
}
],
"blocks": [
{
"inputs": ["features", "labels"],
"name": "train_lda",
"outputs": ["model"],
"synchronized_channel": "train",
},
{
"inputs": ["features", "model"],
"name": "score_with_lda",
"outputs": ["scores"],
"synchronized_channel": "test",
},
],
"connections": [
{
"channel": "train",
"from": "train.features",
"to": "train_lda.features",
},
{
"channel": "train",
"from": "train.labels",
"to": "train_lda.labels",
},
{
"channel": "train",
"from": "train_lda.model",
"to": "score_with_lda.model",
},
{
"channel": "test",
"from": "test.labels",
"to": "analyzer.labels",
},
{
"channel": "test",
"from": "score_with_lda.scores",
"to": "analyzer.scores",
},
{
"channel": "test",
"from": "test.features",
"to": "score_with_lda.features",
},
],
"datasets": [
{"name": "test", "outputs": ["features", "labels"]},