Commit d89469c1 authored by Samuel GAIST's avatar Samuel GAIST

[test][algorithm] Add tests using the multiprocessing module

These test ensure that DataLoaders can be used with
the multiprocessing module if these are passed through
a queue.
parent 222a8dbd
Pipeline #39884 passed with stage
in 5 minutes and 16 seconds
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "autonomous",
"splittable": false,
"groups": [
{
"inputs": {
"in1": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
}
},
{
"inputs": {
"in2": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import multiprocessing
def foo(queue_in, queue_out, index):
text, data_loader = queue_in.get()
data, _, _ = data_loader[index]
value = data["in1"].value
queue_out.put("hello " + text + " {}".format(value))
queue_in.task_done()
class Algorithm:
def prepare(self, data_loaders):
data_loader = data_loaders.loaderOf("in2")
data, _, _ = data_loader[0]
self.offset = data["in2"].value
return True
def process(self, data_loaders, outputs):
data_loader = data_loaders.loaderOf("in1")
# ensure loader has been used before sending it
for i in range(data_loader.count()):
data, _, _ = data_loader[i]
data["in1"].value
num_thread = data_loader.count()
queue_in = multiprocessing.JoinableQueue(num_thread)
queue_out = []
# Start worker processes
jobs = []
for i in range(num_thread):
queue_out.append(multiprocessing.Queue())
p = multiprocessing.Process(target=foo, args=(queue_in, queue_out[i], i))
jobs.append(p)
p.start()
# Add None to the queue to kill the workers
for task in range(num_thread):
queue_in.put(("test {}".format(task), data_loader))
# Wait for all the tasks to finish
queue_in.join()
for i in range(data_loader.count()):
data, _, end = data_loader[i]
outputs["out"].write({"value": data["in1"].value + self.offset}, end)
return True
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "sequential",
"splittable": false,
"groups": [
{
"inputs": {
"in1": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
}
},
{
"inputs": {
"in2": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import multiprocessing
def foo(queue_in, queue_out, index):
text, data_loader = queue_in.get()
data, _, _ = data_loader[index]
value = data["in2"].value
queue_out.put("hello " + text + " {}".format(value))
queue_in.task_done()
class Algorithm:
def prepare(self, data_loaders):
data_loader = data_loaders.loaderOf("in2")
data, _, _ = data_loader[0]
self.offset = data["in2"].value
return True
def process(self, inputs, data_loaders, outputs):
data_loader = data_loaders.loaderOf("in2")
for i in range(data_loader.count()):
data, _, _ = data_loader[i]
data["in2"].value
num_thread = data_loader.count()
queue_in = multiprocessing.JoinableQueue(num_thread)
queue_out = []
# Start worker processes
jobs = []
for i in range(num_thread):
queue_out.append(multiprocessing.Queue())
p = multiprocessing.Process(target=foo, args=(queue_in, queue_out[i], i))
jobs.append(p)
p.start()
# Add None to the queue to kill the workers
for task in range(num_thread):
queue_in.put(("test {}".format(task), data_loader))
# Wait for all the tasks to finish
queue_in.join()
outputs["out"].write({"value": inputs["in1"].data.value + self.offset})
return True
......@@ -1078,6 +1078,36 @@ class TestSequentialAPI_Process(TestExecutionBase):
self.assertEqual(data_unit.end, 3)
self.assertEqual(data_unit.data.value, 2014)
def test_multiprocess(self):
self.writeData(
"in1",
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7)],
1000,
)
self.writeData("in2", [(0, 1), (2, 3)], 2000)
(data_loaders, inputs, outputs, data_sink) = self.create_io(
{"group1": ["in1"], "group2": ["in2"]}
)
algorithm = Algorithm(prefix, "sequential/multiprocess/1")
runner = algorithm.runner()
self.assertTrue(runner.setup({"sync": "in2"}))
self.assertTrue(runner.prepare(data_loaders=data_loaders))
while inputs.hasMoreData():
inputs.restricted_access = False
inputs.next()
inputs.restricted_access = True
self.assertTrue(
runner.process(
inputs=inputs, data_loaders=data_loaders, outputs=outputs
)
)
self.assertEqual(len(data_sink.written), 8)
# ----------------------------------------------------------
......@@ -1270,3 +1300,25 @@ class TestAutonomousAPI_Process(TestExecutionBase):
self.assertEqual(data_unit.start, 3)
self.assertEqual(data_unit.end, 3)
self.assertEqual(data_unit.data.value, 2014)
def test_multiprocess(self):
self.writeData(
"in1",
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7)],
1000,
)
self.writeData("in2", [(0, 1), (2, 3)], 2000)
(data_loaders, outputs, data_sink) = self.create_io(
{"group1": ["in1"], "group2": ["in2"]}
)
algorithm = Algorithm(prefix, "autonomous/multiprocess/1")
runner = algorithm.runner()
self.assertTrue(runner.setup({"sync": "in2"}))
self.assertTrue(runner.prepare(data_loaders=data_loaders.secondaries()))
self.assertTrue(runner.process(data_loaders=data_loaders, outputs=outputs))
self.assertEqual(len(data_sink.written), 8)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment