Commit 41726a0b authored by Philip ABBET's avatar Philip ABBET

Optimize the number of messages sent between the remote inputs and the message handler

parent 3e7fdebc
......@@ -235,31 +235,35 @@ class RemoteInput(BaseInput):
def __init__(self, name, data_format, socket, unpack=True):
super(RemoteInput, self).__init__(name, data_format)
self.socket = socket
self.comm_time = 0.0 # Total time spent on communication
self._unpack = unpack
self.socket = socket
self.comm_time = 0.0 # Total time spent on communication
self._unpack = unpack
self._has_more_data = None # To avoid repetitive requests
def hasMoreData(self):
"""Indicates if there is more data to process on the input"""
logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
if self._has_more_data is None:
logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
_start = time.time()
_start = time.time()
self.socket.send('hmd', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name)
self.socket.send('hmd', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
if answer == 'err':
process_error(self.socket)
if answer == 'err':
process_error(self.socket)
return answer == 'tru'
self._has_more_data = (answer == 'tru')
return self._has_more_data
def next(self):
......@@ -287,6 +291,7 @@ class RemoteInput(BaseInput):
self.nb_data_blocks_read += 1
self.data_same_as_previous = False
self._has_more_data = None
def unpack(self, packed):
......@@ -437,22 +442,11 @@ class InputGroup:
if self.socket is None:
return False
logger.debug('send: (hmd) has-more-data %s', self.channel)
for x in self.remoteInputs():
if x.hasMoreData():
return True
_start = time.time()
self.socket.send('hmd', zmq.SNDMORE)
self.socket.send(self.channel)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
if answer == 'err':
process_error(self.socket)
return answer == 'tru'
return False
def next(self):
......@@ -471,42 +465,10 @@ class InputGroup:
for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
input.next()
input.data_same_as_previous = False
remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
if len(remote_inputs_to_update) > 0:
logger.debug('send: (nxt) next %s', self.channel)
self.socket.send('nxt', zmq.SNDMORE)
self.socket.send(self.channel)
# read all incomming data
_start = time.time()
more = True
parts = []
while more:
parts.append(self.socket.recv())
if parts[-1] == 'err':
self.comm_time += time.time() - _start
process_error(self.socket)
more = self.socket.getsockopt(zmq.RCVMORE)
n = int(parts.pop(0))
logger.debug('recv: %d (inputs)', n)
for k in range(n):
name = parts.pop(0)
logger.debug('recv: %s (data follows)', name)
inpt = self[name]
if inpt is None:
raise RuntimeError("Could not find input `%s' at input group for " \
"channel `%s' while performing `next' operation on this group " \
"(current reading position is %d/%d)" % \
(name, self.channel, k, n))
inpt.data_index = int(parts.pop(0))
inpt.data_index_end = int(parts.pop(0))
inpt.unpack(parts.pop(0))
inpt.nb_data_blocks_read += 1
self.comm_time += time.time() - _start
for remote_input in remote_inputs_to_update:
remote_input.next()
for input in inputs_up_to_date:
input.data_same_as_previous = True
......
......@@ -105,8 +105,6 @@ class MessageHandler(threading.Thread):
more = self.socket.getsockopt(zmq.RCVMORE)
command = parts[0]
logger.debug("recv: %s", command)
if command in self.callbacks:
try: #to handle command
self.callbacks[command](*parts[1:])
......@@ -147,6 +145,8 @@ class MessageHandler(threading.Thread):
break
else:
logger.debug("recv: %s", command)
message = "Command `%s' is not implemented - stopping user process" \
% command
logger.error(message)
......@@ -170,81 +170,49 @@ class MessageHandler(threading.Thread):
return retval
def next(self, channel, name=None):
"""Syntax: nxt channel [name] ..."""
if name is not None: #single input
logger.debug('recv: nxt %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
input_candidate.next()
if input_candidate.data is None: #error
message = "User algorithm asked for more data for channel " \
"`%s' on input `%s', but it is over (no more data). This " \
"normally indicates a programming error on the user " \
"side." % (channel, name)
self.user_error += message + '\n'
raise RuntimeError(message)
if isinstance(input_candidate.data, baseformat.baseformat):
packed = input_candidate.data.pack()
else:
packed = input_candidate.data
logger.debug('send: <bin> (size=%d), indexes=(%d, %d)', len(packed),
input_candidate.data_index, input_candidate.data_index_end)
self.socket.send('%d' % input_candidate.data_index, zmq.SNDMORE)
self.socket.send('%d' % input_candidate.data_index_end, zmq.SNDMORE)
self.socket.send(packed)
else: #whole group data
logger.debug('recv: nxt %s', channel)
channel_group = self.input_list.group(channel)
# Call next() on the group
channel_group.restricted_access = False
channel_group.next()
channel_group.restricted_access = True
# Loop over the inputs
inputs_to_go = len(channel_group)
self.socket.send(str(inputs_to_go), zmq.SNDMORE)
for inp in channel_group:
logger.debug('send: %s', inp.name)
self.socket.send(str(inp.name), zmq.SNDMORE)
if inp.data is None:
message = "User algorithm process asked for more data on channel " \
"`%s' (all inputs), but input `%s' has nothing. This " \
"normally indicates a programming error on the user " \
"side." % (channel, inp.name)
self.user_error += message + '\n'
raise RuntimeError(message)
elif isinstance(inp.data, baseformat.baseformat):
packed = inp.data.pack()
else:
packed = inp.data
logger.debug('send: <bin> (size=%d), indexes=(%d, %d)', len(packed),
inp.data_index, inp.data_index_end)
self.socket.send('%d' % inp.data_index, zmq.SNDMORE)
self.socket.send('%d' % inp.data_index_end, zmq.SNDMORE)
inputs_to_go -= 1
if inputs_to_go > 0:
self.socket.send(packed, zmq.SNDMORE)
else:
self.socket.send(packed)
def next(self, channel, name):
"""Syntax: nxt channel name ..."""
logger.debug('recv: nxt %s %s', channel, name)
channel_group = self.input_list.group(channel)
restricted = channel_group.restricted_access
channel_group.restricted_access = False
input_candidate = self._get_input_candidate(channel, name)
input_candidate.next()
channel_group.restricted_access = restricted
if input_candidate.data is None: #error
message = "User algorithm asked for more data for channel " \
"`%s' on input `%s', but it is over (no more data). This " \
"normally indicates a programming error on the user " \
"side." % (channel, name)
self.user_error += message + '\n'
raise RuntimeError(message)
if isinstance(input_candidate.data, baseformat.baseformat):
packed = input_candidate.data.pack()
else:
packed = input_candidate.data
logger.debug('send: <bin> (size=%d), indexes=(%d, %d)', len(packed),
input_candidate.data_index, input_candidate.data_index_end)
self.socket.send('%d' % input_candidate.data_index, zmq.SNDMORE)
self.socket.send('%d' % input_candidate.data_index_end, zmq.SNDMORE)
self.socket.send(packed)
def has_more_data(self, channel, name=None):
"""Syntax: hmd channel [name]"""
if name: #single input
logger.debug('recv: hmd %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.hasMoreData() else 'fal'
def has_more_data(self, channel, name):
"""Syntax: hmd channel name"""
else: #for all channel names
logger.debug('recv: hmd %s', channel)
channel_group = self.input_list.group(channel)
what = 'tru' if channel_group.hasMoreData() else 'fal'
logger.debug('recv: hmd %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.hasMoreData() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment