From 890da671ccfda6af0b247c43b7e097a49c65e5e7 Mon Sep 17 00:00:00 2001 From: Philip Abbet <philip.abbet@idiap.ch> Date: Wed, 15 Mar 2017 11:45:55 +0100 Subject: [PATCH] Inputs: receive data indexes alongside data units --- beat/backend/python/inputs.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/beat/backend/python/inputs.py b/beat/backend/python/inputs.py index eb812e3..bf7118a 100644 --- a/beat/backend/python/inputs.py +++ b/beat/backend/python/inputs.py @@ -68,8 +68,11 @@ class Input: self.data_format = data_format self.socket = socket self.data = None + self.data_index = -1 + self.data_index_end = -1 self.group = None self.comm_time = 0. #total time spent on communication + self.nb_data_blocks_read = 0 def isDataUnitDone(self): @@ -108,15 +111,19 @@ class Input: self.socket.send('nxt', zmq.SNDMORE) self.socket.send(self.group.channel, zmq.SNDMORE) self.socket.send(self.name) + self.data_index = int(self.socket.recv()) + self.data_index_end = int(self.socket.recv()) self.unpack(self.socket.recv()) self.comm_time += time.time() - _start + self.nb_data_blocks_read += 1 def unpack(self, packed): """Receives data through socket""" self.data = self.data_format.type() - logger.debug('recv: <bin> (size=%d)', len(packed)) + logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), + self.data_index, self.data_index_end) self.data.unpack(packed) @@ -239,7 +246,10 @@ class InputGroup: "channel `%s' while performing `next' operation on this group " \ "(current reading position is %d/%d)" % \ (name, self.channel, k, n)) + inpt.data_index = int(parts.pop(0)) + inpt.data_index_end = int(parts.pop(0)) inpt.unpack(parts.pop(0)) + inpt.nb_data_blocks_read += 1 self.comm_time += time.time() - _start -- GitLab