Skip to content
Snippets Groups Projects
Commit 7f0ee2b5 authored by Philip ABBET's avatar Philip ABBET
Browse files

Add a 'hasDataChanged()' method to the 'Input' class

parent 07818995
Branches
Tags
No related merge requests found
...@@ -78,20 +78,23 @@ class Input: ...@@ -78,20 +78,23 @@ class Input:
def __init__(self, name, data_format, data_source): def __init__(self, name, data_format, data_source):
self.group = None self.group = None
self.name = str(name) self.name = str(name)
self.data = None self.data = None
self.data_index = -1 self.data_index = -1
self.data_index_end = -1 self.data_index_end = -1
self.data_same_as_previous = False self.data_same_as_previous = True
self.data_format = data_format self.data_format = data_format
self.data_source = data_source self.data_source = data_source
self.nb_data_blocks_read = 0 self.nb_data_blocks_read = 0
def isDataUnitDone(self): def isDataUnitDone(self):
"""Indicates if the current data unit will change at the next iteration""" """Indicates if the current data unit will change at the next iteration"""
if (self.data_index_end >= 0) and (self.group.data_index_end == -1):
return True
return (self.data_index_end == self.group.data_index_end) return (self.data_index_end == self.group.data_index_end)
...@@ -101,9 +104,19 @@ class Input: ...@@ -101,9 +104,19 @@ class Input:
return self.data_source.hasMoreData() return self.data_source.hasMoreData()
def hasDataChanged(self):
"""Indicates if the current data unit is different than the one at the
previous iteration"""
return not self.data_same_as_previous
def next(self): def next(self):
"""Retrieves the next block of data""" """Retrieves the next block of data"""
if self.group.restricted_access:
raise RuntimeError('Not authorized')
(self.data, self.data_index, self.data_index_end) = self.data_source.next() (self.data, self.data_index, self.data_index_end) = self.data_source.next()
if self.data is None: if self.data is None:
...@@ -229,6 +242,29 @@ class RemoteInput: ...@@ -229,6 +242,29 @@ class RemoteInput:
return answer == 'tru' return answer == 'tru'
def hasDataChanged(self):
"""Indicates if the current data unit is different than the one at the
previous iteration"""
logger.debug('send: (hdc) has-data-changed %s %s', self.group.channel, self.name)
_start = time.time()
self.socket.send('hdc', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
if answer == 'err':
process_error(self.socket)
return answer == 'tru'
def next(self): def next(self):
"""Retrieves the next block of data""" """Retrieves the next block of data"""
......
...@@ -75,6 +75,7 @@ class MessageHandler(threading.Thread): ...@@ -75,6 +75,7 @@ class MessageHandler(threading.Thread):
self.callbacks = dict( self.callbacks = dict(
nxt = self.next, nxt = self.next,
hmd = self.has_more_data, hmd = self.has_more_data,
hdc = self.has_data_changed,
idd = self.is_dataunit_done, idd = self.is_dataunit_done,
don = self.done, don = self.done,
err = self.error, err = self.error,
...@@ -251,6 +252,16 @@ class MessageHandler(threading.Thread): ...@@ -251,6 +252,16 @@ class MessageHandler(threading.Thread):
self.socket.send(what) self.socket.send(what)
def has_data_changed(self, channel, name):
"""Syntax: hdc channel name"""
logger.debug('recv: hdc %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.hasDataChanged() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def is_dataunit_done(self, channel, name): def is_dataunit_done(self, channel, name):
"""Syntax: idd channel name""" """Syntax: idd channel name"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment