From 7f0ee2b598d091d5b84924268d3afe5b32ca7ca8 Mon Sep 17 00:00:00 2001 From: Philip ABBET <philip.abbet@idiap.ch> Date: Mon, 16 Oct 2017 15:24:40 +0200 Subject: [PATCH] Add a 'hasDataChanged()' method to the 'Input' class --- beat/backend/python/inputs.py | 54 +++++++++++++++++++++----- beat/backend/python/message_handler.py | 11 ++++++ 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/beat/backend/python/inputs.py b/beat/backend/python/inputs.py index ea06402..59a704f 100755 --- a/beat/backend/python/inputs.py +++ b/beat/backend/python/inputs.py @@ -78,20 +78,23 @@ class Input: def __init__(self, name, data_format, data_source): - self.group = None - self.name = str(name) - self.data = None - self.data_index = -1 - self.data_index_end = -1 - self.data_same_as_previous = False - self.data_format = data_format - self.data_source = data_source - self.nb_data_blocks_read = 0 + self.group = None + self.name = str(name) + self.data = None + self.data_index = -1 + self.data_index_end = -1 + self.data_same_as_previous = True + self.data_format = data_format + self.data_source = data_source + self.nb_data_blocks_read = 0 def isDataUnitDone(self): """Indicates if the current data unit will change at the next iteration""" + if (self.data_index_end >= 0) and (self.group.data_index_end == -1): + return True + return (self.data_index_end == self.group.data_index_end) @@ -101,9 +104,19 @@ class Input: return self.data_source.hasMoreData() + def hasDataChanged(self): + """Indicates if the current data unit is different than the one at the + previous iteration""" + + return not self.data_same_as_previous + + def next(self): """Retrieves the next block of data""" + if self.group.restricted_access: + raise RuntimeError('Not authorized') + (self.data, self.data_index, self.data_index_end) = self.data_source.next() if self.data is None: @@ -229,6 +242,29 @@ class RemoteInput: return answer == 'tru' + def hasDataChanged(self): + """Indicates if the current data unit is different than the one at the + previous iteration""" + + logger.debug('send: (hdc) has-data-changed %s %s', self.group.channel, self.name) + + _start = time.time() + + self.socket.send('hdc', zmq.SNDMORE) + self.socket.send(self.group.channel, zmq.SNDMORE) + self.socket.send(self.name) + + answer = self.socket.recv() + + self.comm_time += time.time() - _start + logger.debug('recv: %s', answer) + + if answer == 'err': + process_error(self.socket) + + return answer == 'tru' + + def next(self): """Retrieves the next block of data""" diff --git a/beat/backend/python/message_handler.py b/beat/backend/python/message_handler.py index 2cd7be7..38623ce 100755 --- a/beat/backend/python/message_handler.py +++ b/beat/backend/python/message_handler.py @@ -75,6 +75,7 @@ class MessageHandler(threading.Thread): self.callbacks = dict( nxt = self.next, hmd = self.has_more_data, + hdc = self.has_data_changed, idd = self.is_dataunit_done, don = self.done, err = self.error, @@ -251,6 +252,16 @@ class MessageHandler(threading.Thread): self.socket.send(what) + def has_data_changed(self, channel, name): + """Syntax: hdc channel name""" + + logger.debug('recv: hdc %s %s', channel, name) + input_candidate = self._get_input_candidate(channel, name) + what = 'tru' if input_candidate.hasDataChanged() else 'fal' + logger.debug('send: %s', what) + self.socket.send(what) + + def is_dataunit_done(self, channel, name): """Syntax: idd channel name""" -- GitLab