Commit 7f822144 authored by Philip ABBET's avatar Philip ABBET
Browse files

Bugfix: No synchronization listener used for blocks with inputs coming from a database

parent 7949cf16
......@@ -292,6 +292,7 @@ class Executor(object):
if group is None:
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
restricted_access=(details['channel'] == self.data['channel'])
)
self.input_list.add(group)
......
{
"language": "python",
"splittable": true,
"groups": [
{
"name": "main",
"inputs": {
"value": {
"type": "user/single_integer/1"
},
"label": {
"type": "user/single_string/1"
}
},
"outputs": {
"result": {
"type": "user/single_integer/1"
}
}
}
],
"parameters": {
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.sum = np.int32(0)
def process(self, inputs, outputs):
self.sum += inputs['value'].data.value
if inputs['label'].isDataUnitDone():
outputs['result'].write({
'value': self.sum
})
return True
{
"language": "python",
"groups": [
{
"name": "main",
"inputs": {
"input": {
"type": "user/single_integer/1"
}
}
}
],
"results": {
"nb_data_units": {
"type": "int32",
"display": true
},
"indices": {
"type": "string",
"display": true
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.nb_data_units = 0
self.indices = ''
def process(self, inputs, output):
self.nb_data_units += 1
self.indices += '%d - %d\n' % (
inputs['input'].data_index,
inputs['input'].data_index_end
)
if not inputs['input'].hasMoreData():
output.write({
'nb_data_units': np.int32(self.nb_data_units),
'indices': self.indices
})
return True
......@@ -60,6 +60,21 @@
}
}
]
},
{
"name": "labelled",
"template": "labelled",
"sets": [
{
"name": "labelled",
"template": "labelled",
"view": "Labelled",
"outputs": {
"value": "user/single_integer/1",
"label": "user/single_string/1"
}
}
]
}
]
}
......@@ -59,6 +59,7 @@ class Double:
return True
class Triple:
def setup(self, root_folder, outputs, parameters):
......@@ -92,3 +93,50 @@ class Triple:
})
return True
class Labelled:
def setup(self, root_folder, outputs, parameters):
self.outputs = outputs
self.remaining = [
['A', [1, 2, 3, 4, 5]],
['B', [10, 20, 30, 40, 50]],
['C', [100, 200, 300, 400, 500]],
]
self.current_label = None
return True
def done(self):
return (len(self.remaining) == 0)
def next(self):
# Ensure that we are not done
if len(self.remaining) == 0:
return False
# Retrieve the next label and value
label = self.remaining[0][0]
value = self.remaining[0][1][0]
# Only write each label once on the output, with the correct range of indexes
if self.current_label != label:
self.outputs['label'].write({
'value': label,
}, self.outputs['label'].last_written_data_index + len(self.remaining[0][1]))
self.current_label = label
# Write the value
self.outputs['value'].write({
'value': numpy.int32(value),
})
# Remove the value (and if needed the label) from the list of remaining data
self.remaining[0][1] = self.remaining[0][1][1:]
if len(self.remaining[0][1]) == 0:
self.remaining = self.remaining[1:]
return True
{
"datasets": {
"integers": {
"database": "integers_db/1",
"protocol": "labelled",
"set": "labelled"
}
},
"blocks": {
"processing": {
"algorithm": "user/labelled_integers_sum/1",
"parameters": {
},
"inputs": {
"value": "value",
"label": "label"
},
"outputs": {
"result": "result"
}
}
},
"analyzers": {
"analysis": {
"algorithm": "user/synchronisation_analyzer/1",
"parameters": {
},
"inputs": {
"input": "input"
}
}
},
"globals": {
"environment": {
"name": "environment",
"version": "1"
},
"queue": "queue"
}
}
{
"datasets": [
{
"name": "integers",
"outputs": [
"value",
"label"
]
}
],
"blocks": [
{
"name": "processing",
"inputs": [
"value",
"label"
],
"outputs": [
"result"
],
"synchronized_channel": "integers"
}
],
"analyzers": [
{
"name": "analysis",
"inputs": [
"input"
],
"synchronized_channel": "integers"
}
],
"connections": [
{
"from": "integers.value",
"to": "processing.value",
"channel": "integers"
},
{
"from": "integers.label",
"to": "processing.label",
"channel": "integers"
},
{
"from": "processing.result",
"to": "analysis.input",
"channel": "integers"
}
],
"representation": {
"connections": {
},
"blocks": {
},
"channel_colors": {
"integers": "#5555ff"
}
}
}
......@@ -255,3 +255,11 @@ class TestExecution(unittest.TestCase):
def test_inputs_mix_4b(self):
assert self.execute('user/user/inputs_mix/4/test2', [{'sum': 25170, 'nb': 10}]) is None
def test_integers_labelled_1(self):
assert self.execute('user/user/integers_labelled/1/test', [
{
'nb_data_units': 3,
'indices': '0 - 4\n5 - 9\n10 - 14\n'
}
]) is None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment