Commit 0039a0ca authored by André Anjos's avatar André Anjos 💬

Merge branch 'implement_loop_with_input_test' into 'master'

Implement loop with input test

See merge request !47
parents b2052bea 2a8eebf5
Pipeline #25532 passed with stages
in 14 minutes and 49 seconds
......@@ -153,7 +153,7 @@ class DockerExecutor(RemoteExecutor):
self.host = host
def __create_db_container(self, datasets_uid, network_name):
def __create_db_container(self, datasets_uid, network_name, configuration_name=None):
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
......@@ -195,6 +195,9 @@ class DockerExecutor(RemoteExecutor):
'/beat/cache'
]
if configuration_name:
cmd.append(configuration_name)
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
......@@ -336,7 +339,7 @@ class DockerExecutor(RemoteExecutor):
if self.loop_algorithm is not None:
if len(self.databases) > 0:
databases_infos['loop_db'] = self.__create_db_container(datasets_uid, network_name)
databases_infos['loop_db'] = self.__create_db_container(datasets_uid, network_name, "loop")
loop_algorithm_container_port = utils.find_free_port()
cmd = [
......
......@@ -175,7 +175,7 @@ class SubprocessExecutor(RemoteExecutor):
'databases_provider'))
def __create_db_process(self):
def __create_db_process(self, configuration_name=None):
databases_process = None
databases_configuration_path = None
database_port = None
......@@ -199,6 +199,9 @@ class SubprocessExecutor(RemoteExecutor):
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
if configuration_name is not None:
cmd.append(configuration_name)
databases_process_stdout = tempfile.NamedTemporaryFile(delete=False)
databases_process_stderr = tempfile.NamedTemporaryFile(delete=False)
......@@ -312,7 +315,7 @@ class SubprocessExecutor(RemoteExecutor):
if self.loop_algorithm is not None:
if len(self.databases) > 0:
database_infos['loop_db'] = self.__create_db_process()
database_infos['loop_db'] = self.__create_db_process("loop")
loop_algorithm_port = utils.find_free_port()
cmd = [
......@@ -433,6 +436,7 @@ class SubprocessExecutor(RemoteExecutor):
databases_process.terminate()
databases_process.communicate()
status = databases_process.returncode
with open(db_info['stdout'].name, 'r') as f:
retval['stdout'] += '\n' + f.read()
......@@ -445,6 +449,7 @@ class SubprocessExecutor(RemoteExecutor):
logger.debug("Stopping loop process")
loop_algorithm_process.terminate()
loop_algorithm_process.communicate()
status = loop_algorithm_process.returncode
with open(loop_algorithm_process_stdout.name, 'r') as f:
retval['stdout'] += '\n' + f.read()
......
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "loop",
"groups": [
{
"inputs": {
"in": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/single_integer/1"
},
"answer": {
"type": "user/single_integer/1"
}
}
}
],
"parameters": {
"threshold": {
"default": "9",
"type": "int8",
"description": "Value that will change loop result"
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2018 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.backend.python module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.threshold = None
self.max = 0
def setup(self, parameters):
self.threshold = parameters['threshold']
return True
def prepare(self, data_loaders):
data_loader = data_loaders.loaderOf('in')
for i in range(data_loader.count()):
view = data_loader.view('in', i)
(data, _, _) = view[view.count() - 1]
value = data['in'].value
self.max += value
return True
def validate(self, result):
value = result.value
result = value > self.threshold and value < self.max
delta = self.max - value
return (result, {'value': np.int32(delta)})
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "loop_user",
"splittable": false,
"parameters": {
},
"groups": [
{
"inputs": {
"in": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/single_integer/1"
},
"answer": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2018 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.backend.python module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def process(self, data_loaders, outputs, loop_channel):
cnt = 1
is_valid, delta = loop_channel.validate({'value': np.int32(cnt)})
while not is_valid:
cnt = cnt + 1
is_valid, delta = loop_channel.validate({'value': np.int32(cnt)})
delta = delta.value
data_loader = data_loaders.loaderOf('in')
for i in range(data_loader.count()):
view = data_loader.view('in', i)
(data, _, end) = view[view.count() - 1]
value = data['in'].value
new_value = (value + cnt) * delta
outputs['out'].write({
'value': np.int32(new_value),
},
end
)
return True
......@@ -2,7 +2,7 @@
"schema_version": 2,
"blocks": {
"loop_user": {
"algorithm": "autonomous/loop_user/1",
"algorithm": "user/db_input_loop_user/1",
"parameters": {
},
"inputs": {
......@@ -14,8 +14,8 @@
}
},
"loops": {
"test_loop": {
"algorithm": "autonomous/loop/1",
"loop": {
"algorithm": "user/db_input_loop/1",
"parameters": {
"threshold": 1
},
......@@ -39,6 +39,11 @@
"database": "integers_db/1",
"protocol": "double",
"set": "double"
},
"integers2": {
"database": "simple/1",
"protocol": "protocol2",
"set": "set"
}
},
"globals": {
......
......@@ -7,8 +7,8 @@
"channel": "integers"
},
{
"from": "integers.b",
"to": "test_loop.in",
"from": "integers2.out",
"to": "loop.in",
"channel": "integers"
},
{
......@@ -20,10 +20,10 @@
"loop_connections": [
{
"from": "loop_user.request",
"to": "test_loop.request"
"to": "loop.request"
},
{
"from": "test_loop.answer",
"from": "loop.answer",
"to": "loop_user.answer"
}
],
......@@ -34,6 +34,12 @@
"b"
],
"name": "integers"
},
{
"outputs": [
"out"
],
"name": "integers2"
}
],
"blocks": [
......@@ -50,7 +56,7 @@
],
"loops": [
{
"name": "test_loop",
"name": "loop",
"synchronized_channel": "integers",
"inputs": [
"in"
......
......@@ -280,7 +280,7 @@ class BaseExecutionMixIn(object):
@slow
def test_loop_1(self):
assert self.execute('user/user/loop/1/loop', [{'sum': 45, 'nb': 9}]) is None
assert self.execute('user/user/loop/1/loop', [{'sum': 504, 'nb': 9}]) is None
# For benchmark purposes
# @slow
......
......@@ -443,7 +443,9 @@ class TestOneWorker(TestWorkerBase):
self.controller.execute(WORKER1, 1, config)
(worker, status, job_id, data) = self._wait()
message = self._wait()
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, WORKER1)
self.assertEqual(status, WorkerController.JOB_ERROR)
......@@ -461,7 +463,9 @@ class TestOneWorker(TestWorkerBase):
self.controller.execute(WORKER1, 1, config)
(worker, status, job_id, data) = self._wait()
message = self._wait()
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, WORKER1)
self.assertEqual(status, WorkerController.JOB_ERROR)
......@@ -475,7 +479,9 @@ class TestOneWorker(TestWorkerBase):
self.controller.execute(WORKER1, 1, config)
(worker, status, job_id, data) = self._wait()
message = self._wait()
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, WORKER1)
self.assertTrue(status in [WorkerController.ERROR, WorkerController.JOB_ERROR])
......@@ -514,7 +520,9 @@ class TestOneWorker(TestWorkerBase):
self.controller.execute(WORKER1, 1, config)
self.controller.cancel(WORKER1, 1)
(worker, status, job_id, data) = self._wait()
message = self._wait()
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, WORKER1)
self.assertEqual(status, WorkerController.CANCELLED)
......@@ -525,7 +533,9 @@ class TestOneWorker(TestWorkerBase):
def test_error_cancel_unknown_job(self):
self.controller.cancel(WORKER1, 1)
(worker, status, job_id, data) = self._wait()
message = self._wait()
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, WORKER1)
self.assertEqual(status, WorkerController.ERROR)
......@@ -557,6 +567,7 @@ class TestTwoWorkers(TestWorkerBase):
while message is None:
message = self.controller.process(100)
self.assertTrue(message is not None)
(worker, status, job_id, data) = message
self.assertEqual(worker, worker_name)
......
......@@ -32,8 +32,8 @@ check_env DOCKER_REGISTRY
# Select here images that are required for minimal operation (or tests)
IMAGES=(
"${DOCKER_REGISTRY}/beat/beat.env.system.python:1.3.0r3"
"${DOCKER_REGISTRY}/beat/beat.env.db.examples:1.4.0r3"
"${DOCKER_REGISTRY}/beat/beat.env.system.python:1.3.0r4"
"${DOCKER_REGISTRY}/beat/beat.env.db.examples:1.4.0r4"
"${DOCKER_REGISTRY}/beat/beat.env.cxx:2.0.0r1"
"${DOCKER_REGISTRY}/beat/beat.env.client:2.0.0r1"
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment