Commit 166eec7c authored by Philip ABBET's avatar Philip ABBET
Browse files

Add support for the 'no proxy' mode for outputs in containers

parent 69bb80cc
......@@ -322,6 +322,22 @@ class Agent(object):
'mode': 'ro',
}
if 'result' in configuration.data:
outputs_config = {
'result': configuration.data['result']
}
else:
outputs_config = configuration.data['outputs']
for name, details in outputs_config.items():
basename = os.path.join(configuration.cache, details['path'])
dirname = os.path.dirname(basename)
volumes[dirname] = {
'bind': os.path.join('/cache', dirname.replace(configuration.cache + '/', '')),
'mode': 'rw',
}
self.process = dock.Popen(
host,
envkey,
......
......@@ -31,7 +31,6 @@
import os
import sys
import glob
import errno
import tempfile
import subprocess
import zmq.green as zmq
......@@ -53,6 +52,7 @@ from . import dock
from beat.backend.python.helpers import convert_experiment_configuration_to_container
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import CacheAccess
......@@ -299,75 +299,15 @@ class Executor(object):
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
self.output_list = outputs.OutputList()
# This is used for parallelization purposes
start_index, end_index = self.data.get('range', (None, None))
if 'outputs' in self.data: #it is a normal block (not analyzer)
for name, details in self.data['outputs'].items():
if self.proxy_mode:
cache_access = CacheAccess.LOCAL
else:
cache_access = CacheAccess.NONE
path = os.path.join(self.cache, details['path'] + '.data')
dirname = os.path.dirname(path)
# Make sure that the directory exists while taking care of race
# conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
try:
if (len(dirname) > 0):
os.makedirs(dirname)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
data_sink = data.CachedDataSink()
self.data_sinks.append(data_sink)
status = data_sink.setup(
filename=path,
dataformat=self.algorithm.dataformats[self.algorithm.output_map[name]],
encoding='binary',
max_size=0, #in bytes, for individual file chunks
)
if not status:
raise IOError("cannot create cache sink `%s'" % details['path'])
input_group = self.input_list.group(details['channel'])
if (input_group is None) or not hasattr(input_group, 'synchronization_listener'):
synchronization_listener = None
else:
synchronization_listener = input_group.synchronization_listener
self.output_list.add(outputs.Output(name, data_sink,
synchronization_listener=synchronization_listener,
force_start_index=start_index or 0)
)
else: #it is an analyzer
name = 'result'
details = self.data[name]
path = os.path.join(self.cache, details['path'] + '.data')
dirname = os.path.dirname(path)
# Make sure that the directory exists while taking care of race
# conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
try:
if (len(dirname) > 0):
os.makedirs(dirname)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
data_sink = data.CachedDataSink()
self.data_sinks.append(data_sink)
status = data_sink.setup(
filename=path,
dataformat=self.algorithm.result_dataformat(),
encoding='binary',
)
if not status:
raise IOError("cannot create cache sink `%s'" % details['path'])
self.output_list.add(outputs.Output(name, data_sink,
force_start_index=start_index or 0))
(self.output_list, self.data_sinks) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache, self.input_list,
cache_access=cache_access
)
def process(self, host, virtual_memory_in_megabytes=0,
......
File mode changed from 100755 to 100644
{
"language": "python",
"splittable": true,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/empty_1d_array_of_integers/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def setup(self, parameters):
np.random.seed(0)
return True
def process(self, inputs, outputs):
# val = np.int32(np.random.randint(0, max(1, abs(inputs['in_data'].data.value)), size=(1000000,)))
val = np.int32(np.random.randint(0, max(1, abs(inputs['in_data'].data.value)), size=(1000000,)))
outputs['out_data'].write({
'value': val
})
return True
{
"language": "python",
"splittable": true,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/empty_1d_array_of_integers/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy as np
class Algorithm:
def process(self, inputs, outputs):
outputs['out_data'].write({
'value': np.int32(inputs['in_data'].data.value.sum())
})
return True
{
"root_folder": "/path/not/set",
"protocols": [
{
"name": "large",
"template": "template",
"sets": [
{
"name": "data",
"template": "set",
"view": "LargeView",
"outputs": {
"out": "user/empty_1d_array_of_integers/1"
}
}
]
},
{
"name": "small",
"template": "template",
"sets": [
{
"name": "data",
"template": "set",
"view": "SmallView",
"outputs": {
"out": "user/single_integer/1"
}
}
]
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import numpy
class LargeView:
def setup(self, root_folder, outputs, parameters):
numpy.random.seed(0) # So it is kept reproducible
self.outputs = outputs
return True
def done(self):
return (self.outputs['out'].last_written_data_index == 1000)
def next(self):
val = numpy.int32(numpy.random.randint(100, size=(1000000,)))
self.outputs['out'].write({
'value': val,
})
return True
class SmallView:
def setup(self, root_folder, outputs, parameters):
numpy.random.seed(0) # So it is kept reproducible
self.outputs = outputs
return True
def done(self):
return (self.outputs['out'].last_written_data_index == 1000)
def next(self):
val = numpy.int32(numpy.random.randint(0, 100))
self.outputs['out'].write({
'value': val,
})
return True
{
"analyzers": {
"analysis": {
"algorithm": "user/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo1": {
"algorithm": "user/integers_array_sum/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
},
"echo2": {
"algorithm": "user/integers_echo/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "large/1",
"protocol": "large",
"set": "data"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python 2.7",
"version": "1.1.0"
}
}
}
{
"analyzers": {
"analysis": {
"algorithm": "user/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo1": {
"algorithm": "user/integers_array_generator/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
},
"echo2": {
"algorithm": "user/integers_array_sum/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "large/1",
"protocol": "small",
"set": "data"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python 2.7",
"version": "1.1.0"
}
}
}
......@@ -136,7 +136,7 @@ class TestExecution(unittest.TestCase):
assert executor.valid, '\n * %s' % '\n * '.join(executor.errors)
with executor:
result = executor.process(self.host, timeout_in_minutes=1)
result = executor.process(self.host, timeout_in_minutes=3)
assert result
assert 'status' in result
assert 'stdout' in result
......@@ -270,6 +270,20 @@ class TestExecution(unittest.TestCase):
}
]) is None
# For benchmark purposes
# def test_double_1_large(self):
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large', [{'out_data': 49489830}]) is None
# print time.time() - start
# For benchmark purposes
# def test_double_1_large2(self):
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large2', [{'out_data': 21513820}]) is None
# print time.time() - start
class TestExecutionNoProxy(TestExecution):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment