Commit 46484fa8 authored by André Anjos's avatar André Anjos 💬

Merge branch 'soft_loop' into '1.7.x'

Soft loop

See merge request !43
parents 937d5d21 3fbc1172
Pipeline #25367 passed with stage
in 20 minutes and 50 seconds
......@@ -4,12 +4,12 @@
# Definition of global variables (all stages)
variables:
CONDA_ROOT: "${CI_PROJECT_DIR}/miniconda"
DOCKER_REGISTRY: docker.idiap.ch
# Definition of our build pipeline order
stages:
- build
- docker
- deploy
- pypi
......@@ -23,6 +23,9 @@ stages:
- chmod 755 _ci/install.sh
- ./_ci/install.sh _ci master #installs ci support scripts
- ./_ci/before_build.sh
- docker info
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN docker.idiap.ch
- ./scripts/before_test.sh build #pulls required docker images and compiles
script:
- ./_ci/build.sh
after_script:
......@@ -37,8 +40,7 @@ stages:
.build_linux_template: &linux_build_job
<<: *build_job
tags:
- docker
image: continuumio/conda-concourse-ci
- docker-build
artifacts:
expire_in: 1 week
paths:
......@@ -63,23 +65,6 @@ stages:
key: "macosx-cache"
# Docker host based testing (must be run inside dind or docker-enabled host)
.docker_test_linux_template: &linux_docker_job
stage: docker
before_script:
# safe keep artifacts as before_build.sh will erase those...
- mv ${CONDA_ROOT}/conda-bld .
- ./_ci/install.sh _ci master #updates ci support scripts
- ./_ci/before_build.sh
- mv conda-bld ${CONDA_ROOT}
- ./scripts/before_test.sh
script:
- export BEAT_DOCKER_TESTS=true
- BOB_TEST_ONLY=true ./_ci/build.sh
after_script:
- ./_ci/after_build.sh
build_linux_36:
<<: *linux_build_job
variables:
......@@ -100,17 +85,6 @@ build_macosx_36:
PYTHON_VERSION: "3.6"
# Docker host based testing
docker_linux_36:
<<: *linux_docker_job
variables:
PYTHON_VERSION: "3.6"
dependencies:
- build_linux_36
tags:
- docker-build
# Deploy targets
.deploy_template: &deploy_job
stage: deploy
......
......@@ -223,6 +223,8 @@ class Algorithm(BackendAlgorithm):
for k,v in g['inputs'].items()])
self.output_map = dict([(k,v['type']) for g in self.groups \
for k,v in g.get('outputs', {}).items()])
self.loop_map = dict([(k,v['type']) for g in self.groups \
for k,v in g.get('loop', {}).items()])
self._validate_required_dataformats(dataformat_cache)
self._convert_parameter_types()
......
......@@ -182,7 +182,7 @@ class Database(BackendDatabase):
self.errors.append("found error validating data format `%s' " \
"for output `%s' on set `%s' of protocol `%s': %s" % \
(value, key, _set['name'], protocol['name'],
str(dataformat.errors))
"\n".join(dataformat.errors))
)
# all view names must be relative to the database root path
......
......@@ -145,6 +145,7 @@ class BaseExecutor(object):
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
self.algorithm = None
self.loop_algorithm = None
self.databases = {}
self.input_list = None
self.data_loaders = None
......@@ -185,14 +186,34 @@ class BaseExecutor(object):
if self.data['algorithm'] in algorithm_cache:
self.algorithm = algorithm_cache[self.data['algorithm']]
else:
self.algorithm = algorithm.Algorithm(self.prefix, self.data['algorithm'],
dataformat_cache, library_cache)
self.algorithm = algorithm.Algorithm(self.prefix,
self.data['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.algorithm.name] = self.algorithm
if not self.algorithm.valid:
self.errors += self.algorithm.errors
return
if 'loop' in self.data:
loop = self.data['loop']
if loop['algorithm'] in algorithm_cache:
self.loop_algorithm = algorithm_cache[loop['algorithm']]
else:
self.loop_algorithm = algorithm.Algorithm(self.prefix,
loop['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.loop_algorithm.name] = self.loop_algorithm
if len(loop['inputs']) != len(self.loop_algorithm.input_map):
self.errors.append("The number of inputs of the loop algorithm doesn't correspond")
for name in self.data['inputs'].keys():
if name not in self.algorithm.input_map.keys():
self.errors.append("The input '%s' doesn't exist in the loop algorithm" % name)
# Check that the mapping in coherent
if len(self.data['inputs']) != len(self.algorithm.input_map):
self.errors.append("The number of inputs of the algorithm doesn't correspond")
......@@ -209,31 +230,19 @@ class BaseExecutor(object):
if name not in self.algorithm.output_map.keys():
self.errors.append("The output '%s' doesn't exist in the algorithm" % name)
if 'loop' in self.data:
for name in ['request', 'answer']:
if name not in self.algorithm.loop_map.keys():
self.errors.append("The loop '%s' doesn't exist in the algorithm" % name)
if self.errors:
return
# Load the databases (if any is required)
for name, details in self.data['inputs'].items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
self._update_db_cache(self.data['inputs'], custom_root_folders, database_cache, dataformat_cache)
if not db.valid:
self.errors += db.errors
if 'loop' in self.data:
self._update_db_cache(self.data['loop']['inputs'], custom_root_folders, database_cache, dataformat_cache)
def __enter__(self):
......@@ -247,9 +256,6 @@ class BaseExecutor(object):
logger.info("Start the execution of '%s'", self.algorithm.name)
# self._prepare_inputs()
# self._prepare_outputs()
return self
......@@ -269,6 +275,32 @@ class BaseExecutor(object):
self.data_sinks = []
def _update_db_cache(self, inputs, custom_root_folders, database_cache, dataformat_cache):
""" Update the database cache based on the input list given"""
for name, details in inputs.items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
if not db.valid:
self.errors += db.errors
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
......@@ -380,6 +412,9 @@ class BaseExecutor(object):
self.algorithm.export(tmp_prefix)
if self.loop_algorithm:
self.loop_algorithm.export(tmp_prefix)
def dump_databases_provider_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
......
This diff is collapsed.
......@@ -36,10 +36,8 @@ Execution utilities
import os
import sys
import glob
import errno
import tempfile
import subprocess
import shutil
import zmq
import time
......@@ -48,20 +46,17 @@ logger = logging.getLogger(__name__)
import simplejson
from .. import schema
from .. import database
from .. import algorithm
from .. import inputs
from .. import outputs
from .. import data
from .. import stats
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import AccessMode
from beat.backend.python.execution import AlgorithmExecutor
from beat.backend.python.execution import MessageHandler
from beat.backend.python.execution import LoopExecutor
from beat.backend.python.execution import LoopMessageHandler
class LocalExecutor(BaseExecutor):
"""LocalExecutor runs the code given an execution block information
......@@ -167,41 +162,38 @@ class LocalExecutor(BaseExecutor):
library_cache=library_cache,
custom_root_folders=custom_root_folders)
self.working_dir = None
self.executor = None
self.message_handler = None
self.executor_socket = None
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
self.loop_executor = None
self.loop_message_handler = None
self.loop_socket = None
super(LocalExecutor, self).__enter__()
self.zmq_context = None
self._prepare_inputs()
self._prepare_outputs()
return self
def __cleanup(self):
if self.loop_executor:
self.loop_executor.wait()
for handler in [self.message_handler, self.loop_message_handler]:
if handler:
handler.kill()
handler.join()
handler.destroy()
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
for socket in [self.executor_socket, self.loop_socket]:
if socket:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
(self.input_list, self.data_loaders) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
cache_access=AccessMode.LOCAL, db_access=AccessMode.LOCAL,
databases=self.databases
)
if self.zmq_context is not None:
self.zmq_context.destroy()
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
(self.output_list, self.data_sinks) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
input_list=self.input_list, data_loaders=self.data_loaders
)
if self.working_dir is not None:
shutil.rmtree(self.working_dir)
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
......@@ -281,79 +273,76 @@ class LocalExecutor(BaseExecutor):
raise RuntimeError("execution information is bogus:\n * %s" % \
'\n * '.join(self.errors))
self.runner = self.algorithm.runner()
self.message_handler = MessageHandler('127.0.0.1')
self.message_handler.start()
self.zmq_context = zmq.Context()
self.executor_socket = self.zmq_context.socket(zmq.PAIR)
self.executor_socket.connect(self.message_handler.address)
self.working_dir = tempfile.mkdtemp(prefix=__name__)
working_prefix = os.path.join(self.working_dir, 'prefix')
self.dump_runner_configuration(self.working_dir)
self.algorithm.export(working_prefix)
retval = self.runner.setup(self.data['parameters'])
if self.loop_algorithm:
self.loop_algorithm.export(working_prefix)
self.loop_message_handler = LoopMessageHandler('127.0.0.1')
self.loop_socket = self.zmq_context.socket(zmq.PAIR)
self.loop_socket.connect(self.loop_message_handler.address)
self.loop_executor = LoopExecutor(self.loop_message_handler,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache)
retval = self.loop_executor.setup()
if not retval:
self.__cleanup()
raise RuntimeError("Loop algorithm setup failed")
prepared = self.loop_executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed")
self.loop_executor.process()
self.executor = AlgorithmExecutor(self.executor_socket,
self.working_dir,
database_cache=self.databases,
cache_root=self.cache,
loop_socket=self.loop_socket)
retval = self.executor.setup()
if not retval:
self.__cleanup()
raise RuntimeError("Algorithm setup failed")
prepared = self.runner.prepare(self.data_loaders)
prepared = self.executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Algorithm prepare failed")
if not self.input_list or not self.output_list:
raise RuntimeError("I/O for execution block has not yet been set up")
_start = time.time()
try:
if self.algorithm.type == algorithm.Algorithm.AUTONOMOUS:
if self.analysis:
result = self.runner.process(data_loaders=self.data_loaders,
output=self.output_list[0])
else:
result = self.runner.process(data_loaders=self.data_loaders,
outputs=self.output_list)
if not result:
return False
else:
main_group = self.input_list.main_group
while main_group.hasMoreData():
main_group.restricted_access = False
main_group.next()
main_group.restricted_access = True
try:
if self.algorithm.type == algorithm.Algorithm.LEGACY:
if self.analysis:
success = self.runner.process(inputs=self.input_list,
output=self.output_list[0])
else:
success = self.runner.process(inputs=self.input_list,
outputs=self.output_list)
elif self.algorithm.type == algorithm.Algorithm.SEQUENTIAL:
if self.analysis:
success = self.runner.process(inputs=self.input_list,
data_loaders=self.data_loaders,
output=self.output_list[0])
else:
success = self.runner.process(inputs=self.input_list,
data_loaders=self.data_loaders,
outputs=self.output_list)
if not success:
return _create_result(1, "The algorithm returned 'False'")
except Exception as e:
message = _process_exception(e, self.prefix, 'algorithms')
return _create_result(1, message)
processed = self.executor.process()
except Exception as e:
message = _process_exception(e, self.prefix, 'databases')
self.__cleanup()
return _create_result(1, message)
missing_data_outputs = [x for x in self.output_list if x.isDataMissing()]
if not processed:
self.__cleanup()
raise RuntimeError("Algorithm process failed")
proc_time = time.time() - _start
if missing_data_outputs:
raise RuntimeError("Missing data on the following output(s): %s" % \
', '.join([x.name for x in missing_data_outputs]))
# some local information
logger.debug("Total processing time was %.3f seconds" , proc_time)
self.__cleanup()
return _create_result(0)
This diff is collapsed.
......@@ -185,6 +185,7 @@ class Experiment(object):
self.storage = None
self.datasets = {}
self.blocks = {}
self.loops = {}
self.analyzers = {}
self.databases = {}
......@@ -235,6 +236,7 @@ class Experiment(object):
# checks all internal aspects of the experiment
self._check_datasets(database_cache, dataformat_cache)
self._check_blocks(algorithm_cache, dataformat_cache, library_cache)
self._check_loops(algorithm_cache, dataformat_cache, library_cache)
self._check_analyzers(algorithm_cache, dataformat_cache, library_cache)
self._check_global_parameters()
self._load_toolchain(toolchain_data)
......@@ -324,7 +326,7 @@ class Experiment(object):
self.algorithms[algoname] = thisalgo
if thisalgo.errors:
self.errors.append("/blocks/%s: algorithm `%s' is invalid: %s" % \
(blockname, algoname, str(thisalgo.errors)))
(blockname, algoname, "\n".join(thisalgo.errors)))
else:
thisalgo = self.algorithms[algoname]
if thisalgo.errors: continue # already done
......@@ -362,6 +364,63 @@ class Experiment(object):
self.blocks[blockname] = block
def _check_loops(self, algorithm_cache, dataformat_cache, library_cache):
"""checks all loops are valid"""
if 'loops' not in self.data:
return
for loopname, loop in self.data['loops'].items():
algoname = loop['algorithm']
if algoname not in self.algorithms:
# loads the algorithm
if algoname in algorithm_cache:
thisalgo = algorithm_cache[algoname]
else:
thisalgo = algorithm.Algorithm(self.prefix,
algoname,
dataformat_cache,
library_cache)
algorithm_cache[algoname] = thisalgo
self.algorithms[algoname] = thisalgo
if thisalgo.errors:
self.errors.append("/loops/%s: algorithm `%s' is invalid:\n%s" % \
(loopname, algoname, "\n".join(thisalgo.errors)))
continue
else:
thisalgo = self.algorithms[algoname]
if thisalgo.errors: continue # already done
# checks all inputs correspond
for algoin, loop_input in loop['inputs'].items():
if algoin not in thisalgo.input_map:
self.errors.append("/analyzers/%s/inputs/%s: algorithm `%s' does " \
"not have an input named `%s' - valid algorithm inputs " \
"are %s" % (loopname, loop_input, algoname, algoin,
", ".join(thisalgo.input_map.keys())))
# checks if parallelization makes sense
if loop.get('nb_slots', 1) > 1 and not thisalgo.splittable:
self.errors.append("/loop/%s/nb_slots: you have set the number " \
"of slots for algorithm `%s' to %d, but it is not " \
"splittable" % (analyzername, thisalgo.name,
loop['nb_slots']))
# check parameter consistence
for parameter, value in loop.get('parameters', {}).items():
try:
thisalgo.clean_parameter(parameter, value)
except Exception as e:
self.errors.append("/loop/%s/parameters/%s: cannot convert " \
"value `%s' to required type: %s" % \
(loopname, parameter, value, e))
self.loops[loopname] = loop
def _check_analyzers(self, algorithm_cache, dataformat_cache, library_cache):
"""checks all analyzers are valid"""
......@@ -557,13 +616,16 @@ class Experiment(object):
)['outputs'][from_endpt[1]]
from_name = "dataset"
else: # it is a block
elif from_endpt[0] in self.blocks: # it is a block
block = self.blocks[from_endpt[0]]
mapping = block['outputs']
imapping = dict(zip(mapping.values(), mapping.keys()))
algout = imapping[from_endpt[1]] #name of output on algorithm
from_dtype = self.algorithms[block['algorithm']].output_map[algout]
from_name = "block"
else:
self.errors.append("Unknown endpoint %s" % to_endpt[0])
continue
to_endpt = connection['to'].split('.', 1)
......@@ -575,13 +637,24 @@ class Experiment(object):
to_dtype = self.algorithms[block['algorithm']].input_map[algoin]
to_name = "block"
else: # it is an analyzer
elif to_endpt[0] in self.loops:
loop = self.loops[to_endpt[0]]
mapping = loop['inputs']
imapping = dict(zip(mapping.values(), mapping.keys()))
algoin = imapping[to_endpt[1]] #name of input on algorithm
to_dtype = self.algorithms[loop['algorithm']].input_map[algoin]
to_name = "loop"
elif to_endpt[0] in self.analyzers: # it is an analyzer
analyzer = self.analyzers[to_endpt[0]]
mapping = analyzer['inputs']
imapping = dict(zip(mapping.values(), mapping.keys()))
algoin = imapping[to_endpt[1]] #name of input on algorithm
to_dtype = self.algorithms[analyzer['algorithm']].input_map[algoin]
to_name = "analyzer"
else:
self.errors.append("Unknown endpoint %s" % to_endpt[0])
continue
if from_dtype == to_dtype: continue #OK
......@@ -714,7 +787,12 @@ class Experiment(object):
retval = dict()
config_data = self.blocks.get(name, self.analyzers.get(name))
# config_data = self.blocks.get(name, self.analyzers.get(name))
for item in [self.blocks, self.loops, self.analyzers]:
if name in item:
config_data = item[name]
break
if config_data is None:
raise KeyError("did not find `%s' among blocks or analyzers" % name)
......@@ -806,10 +884,10 @@ class Experiment(object):
KeyError: if the block name does not exist in this experiment.
"""
if name in self.blocks:
config_data = self.blocks[name]
else:
config_data = self.analyzers[name]
for item in [self.blocks, self.loops, self.analyzers]:
if name in item:
config_data = item[name]
break
# resolve parameters taking globals in consideration
parameters = self.data['globals'].get(config_data['algorithm'])
......@@ -826,10 +904,10 @@ class Experiment(object):
self.data['globals']['environment'])
nb_slots = config_data.get('nb_slots', 1)
toolchain_data = self.toolchain.blocks.get(name,
self.toolchain.analyzers.get(name))
toolchain_data = self.toolchain.algorithm_item(name)
if toolchain_data is None:
raise KeyError("did not find `%s' among blocks or analyzers" % name)
raise KeyError("did not find `%s' among blocks, loops or analyzers" % name)
retval = dict(
inputs=self._inputs(name),
......@@ -841,6 +919,25 @@ class Experiment(object):
nb_slots=nb_slots,
)
loop = self.toolchain.get_loop_for_block(name)
if loop is not None:
loop_name = loop['name']
loop_toolchain_data = self.toolchain.algorithm_item(loop_name)
loop_config_data = self.data['loops'][loop_name]
loop_algorithm = loop_config_data['algorithm']
parameters = self.data['globals'].get(loop_algorithm, dict())
parameters.update(loop_config_data.get('parameters', dict()))
loop_data = dict(
inputs=self._inputs(loop_name),
channel=loop_toolchain_data['synchronized_channel'],
algorithm=loop_algorithm,
parameters=parameters
)
retval['loop'] = loop_data
if name in self.blocks:
retval['outputs'] = self._block_outputs(name)
else:
......
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Algorithm v3 descriptor",
"description": "This schema defines the properties of a v3 algorithm",
"oneOf": [
{ "$ref": "2.json#/definitions/block" },
{ "$ref": "2.json#/definitions/analyzer" },
{ "$ref": "#/definitions/loop_user" },
{ "$ref": "#/definitions/loop" }
],
"definitions": {
"loop_io_group": {
"type": "object",