Commit 68fe111b authored by Philip ABBET's avatar Philip ABBET

More changes to the helper functions

parent 3aaf8666
......@@ -66,7 +66,8 @@ class Storage(utils.CodeStorage):
self.prefix = prefix
self.fullname = name
path = utils.hashed_or_simple(self.prefix, 'algorithms', name)
path = utils.hashed_or_simple(self.prefix, 'algorithms', name + '.json')
path = path[:-5]
super(Storage, self).__init__(path, language)
......
......@@ -68,7 +68,8 @@ class Storage(utils.CodeStorage):
self.name, self.version = name.split('/')
self.fullname = name
path = prefix.path(os.path.join('databases', name))
path = prefix.path(os.path.join('databases', name + '.json'))
path = path[:-5]
super(Storage, self).__init__(path, 'python') #views are coded in Python
......
......@@ -64,7 +64,8 @@ class Storage(utils.Storage):
self.username, self.name, self.version = name.split('/')
self.fullname = name
path = utils.hashed_or_simple(prefix, 'dataformats', name)
path = utils.hashed_or_simple(prefix, 'dataformats', name + '.json')
path = path[:-5]
super(Storage, self).__init__(path)
......
......@@ -110,7 +110,8 @@ class Executor(object):
# Loads algorithm outputs
(self.output_list, _) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, cache_root, self.input_list
self.data, self.algorithm, self.prefix, cache_root,
input_list=self.input_list, data_loaders=self.data_loaders
)
else:
......@@ -121,7 +122,8 @@ class Executor(object):
# Loads algorithm outputs
(self.output_list, _) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, cache_root, self.input_list
self.data, self.algorithm, self.prefix, cache_root,
input_list=self.input_list, data_loaders=self.data_loaders
)
......
......@@ -183,7 +183,7 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
if not views.has_key(channel):
view = db.view(details['protocol'], details['set'])
view.setup(os.path.join(cache_root, details['path']),
view.setup(os.path.join(cache_root, details['path']), pack=False,
start_index=start_index, end_index=end_index)
views[channel] = view
......@@ -286,7 +286,8 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
#----------------------------------------------------------
def create_outputs_from_configuration(config, algorithm, prefix, cache_root, input_list):
def create_outputs_from_configuration(config, algorithm, prefix, cache_root,
input_list=None, data_loaders=None):
data_sinks = []
output_list = OutputList()
......@@ -314,7 +315,8 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
dataformat_name = algorithm.output_map[name]
dataformat = algorithm.dataformats[dataformat_name]
input_group = input_list.group(details['channel'])
if input_list is not None:
input_group = input_list.group(config['channel'])
if input_group is not None:
synchronization_listener = input_group.synchronization_listener
......@@ -330,19 +332,39 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
raise
if start_index is None:
input_path = None
for k, v in config['inputs'].items():
if v['channel'] == config['channel']:
if v['channel'] != config['channel']:
continue
if 'database' not in v:
input_path = os.path.join(cache_root, v['path'] + '.data')
break
(data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
getAllFilenames(input_path)
if input_path is not None:
(data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
getAllFilenames(input_path)
end_indices = [ int(x.split('.')[-2]) for x in indices_filenames ]
end_indices.sort()
start_index = 0
end_index = end_indices[-1]
else:
for k, v in config['inputs'].items():
if v['channel'] != config['channel']:
continue
end_indices = [ int(x.split('.')[-2]) for x in indices_filenames ]
end_indices.sort()
start_index = 0
start_index = 0
end_index = end_indices[-1]
if (input_list is not None) and (input_list[k] is not None):
end_index = input_list[k].data_source.last_data_index()
break
elif data_loaders is not None:
end_index = data_loaders.main_loader.data_index_end
break
data_sink = CachedDataSink()
data_sinks.append(data_sink)
......
......@@ -61,7 +61,8 @@ class Storage(utils.CodeStorage):
self.prefix = prefix
self.fullname = name
path = utils.hashed_or_simple(self.prefix, 'libraries', name)
path = utils.hashed_or_simple(self.prefix, 'libraries', name + '.json')
path = path[:-5]
super(Storage, self).__init__(path, language)
......
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "autonomous",
"splittable": false,
"groups": [
{
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
class Algorithm:
def process(self, data_loaders, outputs):
a = b
return True
{
"language": "python",
"splittable": false,
"groups": [
{
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
class Algorithm:
def process(self, inputs, outputs):
a = b
return True
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "sequential",
"splittable": false,
"groups": [
{
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
class Algorithm:
def process(self, inputs, data_loaders, outputs):
a = b
return True
......@@ -587,6 +587,17 @@ class TestLegacyAPI_Process(TestExecutionBase):
return (inputs, outputs, data_sink)
def test_process_crashing_algorithm(self):
algorithm = Algorithm(prefix, 'legacy/process_crash/1')
self.assertTrue(algorithm.valid)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
with self.assertRaises(NameError):
runnable.process(inputs=InputList(), outputs=OutputList())
def test_one_group_of_one_input(self):
self.writeData('in', [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
......@@ -791,6 +802,17 @@ class TestSequentialAPI_Process(TestExecutionBase):
return (data_loaders, inputs, outputs, data_sink)
def test_process_crashing_algorithm(self):
algorithm = Algorithm(prefix, 'sequential/process_crash/1')
self.assertTrue(algorithm.valid)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
with self.assertRaises(NameError):
runnable.process(inputs=InputList(), data_loaders=DataLoaderList(), outputs=OutputList())
def test_one_group_of_one_input(self):
self.writeData('in', [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
......@@ -1032,6 +1054,17 @@ class TestAutonomousAPI_Process(TestExecutionBase):
return (data_loaders, outputs, data_sink)
def test_process_crashing_algorithm(self):
algorithm = Algorithm(prefix, 'autonomous/process_crash/1')
self.assertTrue(algorithm.valid)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
with self.assertRaises(NameError):
runnable.process(data_loaders=DataLoaderList(), outputs=OutputList())
def test_one_group_of_one_input(self):
self.writeData('in', [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment