diff --git a/beat/backend/python/inputs.py b/beat/backend/python/inputs.py index eb812e36cf8b5c4f54286ef8ee33da0ba88e0682..bf7118a92c593cfba5097d49aa4a68dea5f824da 100644 --- a/beat/backend/python/inputs.py +++ b/beat/backend/python/inputs.py @@ -68,8 +68,11 @@ class Input: self.data_format = data_format self.socket = socket self.data = None + self.data_index = -1 + self.data_index_end = -1 self.group = None self.comm_time = 0. #total time spent on communication + self.nb_data_blocks_read = 0 def isDataUnitDone(self): @@ -108,15 +111,19 @@ class Input: self.socket.send('nxt', zmq.SNDMORE) self.socket.send(self.group.channel, zmq.SNDMORE) self.socket.send(self.name) + self.data_index = int(self.socket.recv()) + self.data_index_end = int(self.socket.recv()) self.unpack(self.socket.recv()) self.comm_time += time.time() - _start + self.nb_data_blocks_read += 1 def unpack(self, packed): """Receives data through socket""" self.data = self.data_format.type() - logger.debug('recv: <bin> (size=%d)', len(packed)) + logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), + self.data_index, self.data_index_end) self.data.unpack(packed) @@ -239,7 +246,10 @@ class InputGroup: "channel `%s' while performing `next' operation on this group " \ "(current reading position is %d/%d)" % \ (name, self.channel, k, n)) + inpt.data_index = int(parts.pop(0)) + inpt.data_index_end = int(parts.pop(0)) inpt.unpack(parts.pop(0)) + inpt.nb_data_blocks_read += 1 self.comm_time += time.time() - _start