From d89469c1f8f78d62ccf598b0d143fb31f14ae25e Mon Sep 17 00:00:00 2001
From: Samuel Gaist <samuel.gaist@idiap.ch>
Date: Fri, 15 May 2020 17:29:33 +0200
Subject: [PATCH] [test][algorithm] Add tests using the multiprocessing module

These test ensure that DataLoaders can be used with
the multiprocessing module if these are passed through
a queue.
---
 .../algorithms/autonomous/multiprocess/1.json | 28 ++++++
 .../algorithms/autonomous/multiprocess/1.py   | 90 +++++++++++++++++++
 .../algorithms/autonomous/multiprocess/1.rst  |  1 +
 .../algorithms/sequential/multiprocess/1.json | 28 ++++++
 .../algorithms/sequential/multiprocess/1.py   | 87 ++++++++++++++++++
 .../algorithms/sequential/multiprocess/1.rst  |  1 +
 beat/backend/python/test/test_algorithm.py    | 52 +++++++++++
 7 files changed, 287 insertions(+)
 create mode 100644 beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.json
 create mode 100755 beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.py
 create mode 100644 beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.rst
 create mode 100644 beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.json
 create mode 100755 beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.py
 create mode 100644 beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.rst

diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.json b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.json
new file mode 100644
index 0000000..ae67a98
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.json
@@ -0,0 +1,28 @@
+{
+    "schema_version": 2,
+    "language": "python",
+    "api_version": 2,
+    "type": "autonomous",
+    "splittable": false,
+    "groups": [
+        {
+            "inputs": {
+                "in1": {
+                    "type": "user/single_integer/1"
+                }
+            },
+            "outputs": {
+                "out": {
+                    "type": "user/single_integer/1"
+                }
+            }
+        },
+        {
+            "inputs": {
+                "in2": {
+                    "type": "user/single_integer/1"
+                }
+            }
+        }
+    ]
+}
diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.py b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.py
new file mode 100755
index 0000000..c000d7f
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+
+###################################################################################
+#                                                                                 #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
+# Contact: beat.support@idiap.ch                                                  #
+#                                                                                 #
+# Redistribution and use in source and binary forms, with or without              #
+# modification, are permitted provided that the following conditions are met:     #
+#                                                                                 #
+# 1. Redistributions of source code must retain the above copyright notice, this  #
+# list of conditions and the following disclaimer.                                #
+#                                                                                 #
+# 2. Redistributions in binary form must reproduce the above copyright notice,    #
+# this list of conditions and the following disclaimer in the documentation       #
+# and/or other materials provided with the distribution.                          #
+#                                                                                 #
+# 3. Neither the name of the copyright holder nor the names of its contributors   #
+# may be used to endorse or promote products derived from this software without   #
+# specific prior written permission.                                              #
+#                                                                                 #
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
+#                                                                                 #
+###################################################################################
+
+import multiprocessing
+
+
+def foo(queue_in, queue_out, index):
+    text, data_loader = queue_in.get()
+    data, _, _ = data_loader[index]
+    value = data["in1"].value
+
+    queue_out.put("hello " + text + " {}".format(value))
+    queue_in.task_done()
+
+
+class Algorithm:
+    def prepare(self, data_loaders):
+        data_loader = data_loaders.loaderOf("in2")
+
+        data, _, _ = data_loader[0]
+        self.offset = data["in2"].value
+
+        return True
+
+    def process(self, data_loaders, outputs):
+        data_loader = data_loaders.loaderOf("in1")
+
+        # ensure loader has been used before sending it
+        for i in range(data_loader.count()):
+            data, _, _ = data_loader[i]
+            data["in1"].value
+
+        num_thread = data_loader.count()
+
+        queue_in = multiprocessing.JoinableQueue(num_thread)
+        queue_out = []
+
+        # Start worker processes
+        jobs = []
+        for i in range(num_thread):
+            queue_out.append(multiprocessing.Queue())
+            p = multiprocessing.Process(target=foo, args=(queue_in, queue_out[i], i))
+            jobs.append(p)
+            p.start()
+
+        # Add None to the queue to kill the workers
+        for task in range(num_thread):
+            queue_in.put(("test {}".format(task), data_loader))
+
+        # Wait for all the tasks to finish
+        queue_in.join()
+
+        for i in range(data_loader.count()):
+            data, _, end = data_loader[i]
+
+            outputs["out"].write({"value": data["in1"].value + self.offset}, end)
+
+        return True
diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.rst b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.rst
new file mode 100644
index 0000000..e62c1c4
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/autonomous/multiprocess/1.rst
@@ -0,0 +1 @@
+Test documentation
diff --git a/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.json b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.json
new file mode 100644
index 0000000..34eaad0
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.json
@@ -0,0 +1,28 @@
+{
+    "schema_version": 2,
+    "language": "python",
+    "api_version": 2,
+    "type": "sequential",
+    "splittable": false,
+    "groups": [
+        {
+            "inputs": {
+                "in1": {
+                    "type": "user/single_integer/1"
+                }
+            },
+            "outputs": {
+                "out": {
+                    "type": "user/single_integer/1"
+                }
+            }
+        },
+        {
+            "inputs": {
+                "in2": {
+                    "type": "user/single_integer/1"
+                }
+            }
+        }
+    ]
+}
diff --git a/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.py b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.py
new file mode 100755
index 0000000..f5389eb
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+
+###################################################################################
+#                                                                                 #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
+# Contact: beat.support@idiap.ch                                                  #
+#                                                                                 #
+# Redistribution and use in source and binary forms, with or without              #
+# modification, are permitted provided that the following conditions are met:     #
+#                                                                                 #
+# 1. Redistributions of source code must retain the above copyright notice, this  #
+# list of conditions and the following disclaimer.                                #
+#                                                                                 #
+# 2. Redistributions in binary form must reproduce the above copyright notice,    #
+# this list of conditions and the following disclaimer in the documentation       #
+# and/or other materials provided with the distribution.                          #
+#                                                                                 #
+# 3. Neither the name of the copyright holder nor the names of its contributors   #
+# may be used to endorse or promote products derived from this software without   #
+# specific prior written permission.                                              #
+#                                                                                 #
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
+#                                                                                 #
+###################################################################################
+
+import multiprocessing
+
+
+def foo(queue_in, queue_out, index):
+    text, data_loader = queue_in.get()
+
+    data, _, _ = data_loader[index]
+    value = data["in2"].value
+
+    queue_out.put("hello " + text + " {}".format(value))
+    queue_in.task_done()
+
+
+class Algorithm:
+    def prepare(self, data_loaders):
+        data_loader = data_loaders.loaderOf("in2")
+
+        data, _, _ = data_loader[0]
+        self.offset = data["in2"].value
+
+        return True
+
+    def process(self, inputs, data_loaders, outputs):
+        data_loader = data_loaders.loaderOf("in2")
+
+        for i in range(data_loader.count()):
+            data, _, _ = data_loader[i]
+            data["in2"].value
+
+        num_thread = data_loader.count()
+
+        queue_in = multiprocessing.JoinableQueue(num_thread)
+        queue_out = []
+
+        # Start worker processes
+        jobs = []
+        for i in range(num_thread):
+            queue_out.append(multiprocessing.Queue())
+            p = multiprocessing.Process(target=foo, args=(queue_in, queue_out[i], i))
+            jobs.append(p)
+            p.start()
+
+        # Add None to the queue to kill the workers
+        for task in range(num_thread):
+            queue_in.put(("test {}".format(task), data_loader))
+
+        # Wait for all the tasks to finish
+        queue_in.join()
+
+        outputs["out"].write({"value": inputs["in1"].data.value + self.offset})
+
+        return True
diff --git a/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.rst b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.rst
new file mode 100644
index 0000000..e62c1c4
--- /dev/null
+++ b/beat/backend/python/test/prefix/algorithms/sequential/multiprocess/1.rst
@@ -0,0 +1 @@
+Test documentation
diff --git a/beat/backend/python/test/test_algorithm.py b/beat/backend/python/test/test_algorithm.py
index 21bad30..8b90e6f 100644
--- a/beat/backend/python/test/test_algorithm.py
+++ b/beat/backend/python/test/test_algorithm.py
@@ -1078,6 +1078,36 @@ class TestSequentialAPI_Process(TestExecutionBase):
         self.assertEqual(data_unit.end, 3)
         self.assertEqual(data_unit.data.value, 2014)
 
+    def test_multiprocess(self):
+        self.writeData(
+            "in1",
+            [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7)],
+            1000,
+        )
+        self.writeData("in2", [(0, 1), (2, 3)], 2000)
+
+        (data_loaders, inputs, outputs, data_sink) = self.create_io(
+            {"group1": ["in1"], "group2": ["in2"]}
+        )
+
+        algorithm = Algorithm(prefix, "sequential/multiprocess/1")
+        runner = algorithm.runner()
+
+        self.assertTrue(runner.setup({"sync": "in2"}))
+        self.assertTrue(runner.prepare(data_loaders=data_loaders))
+
+        while inputs.hasMoreData():
+            inputs.restricted_access = False
+            inputs.next()
+            inputs.restricted_access = True
+            self.assertTrue(
+                runner.process(
+                    inputs=inputs, data_loaders=data_loaders, outputs=outputs
+                )
+            )
+
+        self.assertEqual(len(data_sink.written), 8)
+
 
 # ----------------------------------------------------------
 
@@ -1270,3 +1300,25 @@ class TestAutonomousAPI_Process(TestExecutionBase):
         self.assertEqual(data_unit.start, 3)
         self.assertEqual(data_unit.end, 3)
         self.assertEqual(data_unit.data.value, 2014)
+
+    def test_multiprocess(self):
+        self.writeData(
+            "in1",
+            [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7)],
+            1000,
+        )
+        self.writeData("in2", [(0, 1), (2, 3)], 2000)
+
+        (data_loaders, outputs, data_sink) = self.create_io(
+            {"group1": ["in1"], "group2": ["in2"]}
+        )
+
+        algorithm = Algorithm(prefix, "autonomous/multiprocess/1")
+        runner = algorithm.runner()
+
+        self.assertTrue(runner.setup({"sync": "in2"}))
+
+        self.assertTrue(runner.prepare(data_loaders=data_loaders.secondaries()))
+        self.assertTrue(runner.process(data_loaders=data_loaders, outputs=outputs))
+
+        self.assertEqual(len(data_sink.written), 8)
-- 
GitLab