Commit a7a74713 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[bcpapi][broker] Fixed handling on worker disconnection annoucement

parent 85f2ddf8
......@@ -95,6 +95,7 @@ class BeatComputationBroker(object):
def __init__(self, verbose=False):
"""Initialize broker state."""
self.verbose = verbose
self.continue_ = True = {}
......@@ -163,14 +164,17 @@ class BeatComputationBroker(object):
def process_client(self, sender, msg):
"""Process a request coming from a client."""
# Service name + body
assert len(msg) >= 2 # nosec
service = msg.pop(0)
# Set reply return address to client sender
msg = [sender, b""] + msg
self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg):
"""Process message sent to us by a worker."""
# At least, command
assert len(msg) >= 1 # nosec
......@@ -222,12 +226,14 @@ class BeatComputationBroker(object):
def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker."""
assert worker is not None # nosec
if disconnect:
self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None)
if worker.service is not None:
if worker in worker.service.waiting:
on_disconnection = self.callbacks.get("on_disconnection", None)
if on_disconnection:
......@@ -236,8 +242,12 @@ class BeatComputationBroker(object):
if worker.identity in self.workers:
if worker in self.waiting:
def require_worker(self, address):
"""Finds the worker (creates if necessary)."""
assert address is not None # nosec
identity = hexlify(address)
worker = self.workers.get(identity)
......@@ -251,6 +261,7 @@ class BeatComputationBroker(object):
def require_service(self, name):
"""Locates the service (creates if necessary)."""
assert name is not None # nosec
service =
if service is None:
......@@ -264,11 +275,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers.
self.socket.bind(endpoint)"I: BCP broker/0.0.1 is active at %s", endpoint)
def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time"""
if time.time() > self.heartbeat_at:
for worker in self.waiting:
self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None)
......@@ -276,16 +289,16 @@ class BeatComputationBroker(object):
self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL
def purge_workers(self):
"""Look for & kill expired workers.
"""Look for & kill expired workers."""
for item in self.waiting:
if item.expiry < time.time():"I: deleting expired worker: %s", item.identity)
self.delete_worker(item, False)
def worker_waiting(self, worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
if worker not in self.waiting:
......@@ -298,10 +311,14 @@ class BeatComputationBroker(object):
def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible"""
assert service is not None # nosec
if msg is not None: # Queue message if any
while service.waiting and service.requests:
msg = service.requests.pop(0)
worker = service.waiting.pop(0)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment