Commit f12f3765 authored by Samuel GAIST's avatar Samuel GAIST

[bcp] Implement Beat Computation Protocol (BCP)

This implements a new paradigm for the process nodes
communication. Based on the ZMQ Majordomo protocol,
its goal is to improve reliability of the setup.
parent bf8fcaf7
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""BEAT Computation Protocol definitions"""
# This is the version of BCP/Client we implement
BCPC_CLIENT = b"BCPC01"
# BCP/Client commands, as strings
BCPC_REQUEST = b"\001"
bcpc_commands = [None, b"REQUEST"]
# This is the version of BCP/Worker we implement
BCPW_WORKER = b"BCPW01"
# BCP/Server commands, as strings
BCPW_READY = b"\001"
BCPW_REQUEST = b"\002"
BCPW_REPLY = b"\003"
BCPW_HEARTBEAT = b"\004"
BCPW_DISCONNECT = b"\005"
bcpw_commands = [None, b"READY", b"REQUEST", b"REPLY", b"HEARTBEAT", b"DISCONNECT"]
# BCP/Processing commands, as strings
BCPP_JOB_RECEIVED = b"\001"
BCPP_JOB_STARTED = b"\002"
BCPP_JOB_DONE = b"\003"
BCPP_JOB_ERROR = b"\004"
BCPP_JOB_CANCELLED = b"\005"
BCPP_ERROR = b"\006"
bcpp_commands = [
None,
b"JOB_RECEIVED",
b"JOB_STARTED",
b"JOB_DONE",
b"JOB_ERROR",
b"JOB_CANCELLED",
b"ERROR",
]
# BCP/Execution commands
BCPE_EXECUTE = b"\001" #: Execute the given job
BCPE_CANCEL = b"\002" #: Cancel the given job
BCPE_SCHEDULER_SHUTDOWN = b"\003" #: Shutdown the scheduler
bcpe_commands = [None, "EXECUTE", "CANCEL", "BCPE_SCHEDULER_SHUTDOWN"]
This diff is collapsed.
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""BEAT Computation client
Inspired from the Majordomo client
"""
import logging
import zmq
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
class BeatComputationClient(object):
"""Beat Computation Protocol Client API, Python version."""
def __init__(self, broker, verbose=False):
self.timeout = 2500
self.client = None
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.client:
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.DEALER)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logger.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""Send request to broker
"""
if not isinstance(request, list):
request = [request]
# Prefix request with protocol frames
# Frame 0: empty (REQ emulation)
# Frame 1: "BCPCxy" (six bytes, BCP/Client x.y)
# Frame 2: Service name (printable string)
request = [b"", BCP.BCPC_CLIENT, service] + request
if self.verbose:
logger.warn("I: send request to '%s' service: ", service)
dump(request)
self.client.send_multipart(request)
def recv(self):
"""Returns the reply message or None if there was no reply."""
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
return # interrupted
if items:
# if we got a reply, process it
msg = self.client.recv_multipart()
if self.verbose:
logger.info("I: received reply:")
dump(msg)
# Don't try to handle errors, just assert noisily
assert len(msg) >= 4 # nosec
msg.pop(0) # empty
header = msg.pop(0)
assert BCP.BCPC_CLIENT == header # nosec
msg.pop(0) # service, not used
return msg
else:
logger.warn("W: permanent error, abandoning request")
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""
==========
management
==========
Execution utilities
"""
import logging
import multiprocessing
import signal
import json
import zmq
from ..dock import Host
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor
from . import BCP
class ExecutionProcess(multiprocessing.Process):
"""Worker process using ZMQ to communicate back to the caller"""
def __init__(self, address, job_id, prefix, data, cache, docker, images_cache=None):
super(ExecutionProcess, self).__init__()
self.socket = None
self.queue = multiprocessing.Queue()
self.address = address
self.job_id = job_id
self.identity = "worker_{}".format(self.job_id.decode("utf-8"))
self.prefix = prefix
self.data = data
self.cache = cache
self.docker = docker
self.images_cache = images_cache
self.logger = logging.getLogger(self.identity)
def __send_message(self, status, message=None):
"""Format and send message"""
data = [status, self.job_id]
if message:
if not isinstance(message, list):
message = [message]
data += message
self.socket.send_multipart(data)
def run(self):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.queue.put("started")
ctx = zmq.Context()
self.socket = ctx.socket(zmq.DEALER)
self.socket.linger = 0
self.socket.setsockopt_string(zmq.IDENTITY, self.identity)
self.socket.connect(self.address)
self.logger.debug("Process (pid=%d) started for job #%s", self.pid, self.job_id)
# Create the executor
if self.docker:
host = Host(images_cache=self.images_cache, raise_on_errors=False)
executor = DockerExecutor(host, self.prefix, self.data, cache=self.cache)
else:
executor = LocalExecutor(self.prefix, self.data, cache=self.cache)
status = BCP.BCPP_JOB_DONE
message = None
if executor.valid:
try:
# Execute the algorithm
with executor:
result = executor.process()
if result["status"] != 0:
status = BCP.BCPP_JOB_ERROR
message = json.dumps(result).encode()
except Exception:
import traceback
status = BCP.BCPP_ERROR
message = traceback.format_exc().encode()
else:
status = BCP.BCPP_JOB_ERROR
message = [error.encode() for error in executor.errors]
self.logger.debug("Process (pid=%d) done", self.pid)
self.__send_message(status, message)
self.socket.close()
ctx.destroy()
self.socket = None
self.queue.put("done")
self.queue.close()
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""BEAT Computation worker"""
import logging
import zmq
from .zhelpers import dump
logger = logging.getLogger(__name__)
class BeatComputationProcessor(object):
"""BEAT Computation Protocol Processor API, Python version
"""
def __init__(self, poller, address, verbose=False):
self.verbose = verbose
self.ctx = zmq.Context()
self.sink = self.ctx.socket(zmq.ROUTER)
self.sink.linger = 0
self.sink.bind(address)
poller.register(self.sink, zmq.POLLIN)
def process(self):
msg = self.sink.recv_multipart()
worker_name = msg[0]
if self.verbose:
logger.debug("D: worker {} answered: ".format(worker_name))
dump(msg)
return msg
def destroy(self):
# context.destroy depends on pyzmq >= 2.1.10
self.ctx.destroy(0)
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""BEAT Computation worker"""
import logging
import time
import zmq
# BEAT Computation protocol constants:
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
class BeatComputationWorker(object):
"""BEAT Computation Protocol Worker API, Python version
"""
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
def __init__(self, poller, broker, service, verbose=False):
if isinstance(service, str):
service = service.encode("utf-8")
self.heartbeat_at = (
0
) # When to send HEARTBEAT (relative to time.time(), so in seconds)
self.liveness = 0 # How many attempts left
self.heartbeat = 2500 # Heartbeat delay, msecs
self.reconnect = 2500 # Reconnect delay, msecs
self.timeout = 2500 # poller timeout
self.reply_to = None # Return address, if any
self.worker = None
self.broker = broker
self.service = service
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = poller
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.worker:
self.poller.unregister(self.worker)
self.worker.close()
self.worker = self.ctx.socket(zmq.DEALER)
self.worker.linger = 0
self.worker.connect(self.broker)
self.poller.register(self.worker, zmq.POLLIN)
if self.verbose:
logger.info("I: connecting to broker at %s…", self.broker)
# Register service with broker
self.send_to_broker(BCP.BCPW_READY, self.service, [])
# If liveness hits zero, queue is considered disconnected
self.liveness = self.HEARTBEAT_LIVENESS
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
def send_to_broker(self, command, option=None, msg=None):
"""Send message to broker.
If no msg is provided, creates one internally
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
if option:
msg = [option] + msg
msg = [b"", BCP.BCPW_WORKER, command] + msg
if self.verbose:
logger.info("I: sending %s to broker", command)
dump(msg)
self.worker.send_multipart(msg)
def send(self, reply):
"""Send reply to broker"""
assert self.reply_to is not None # nosec
reply = [self.reply_to, b"", self.service] + reply
self.send_to_broker(BCP.BCPW_REPLY, msg=reply)
def process(self):
msg = self.worker.recv_multipart()
if self.verbose:
logger.info("I: received message from broker: ")
dump(msg)
self.liveness = self.HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3 # nosec
empty = msg.pop(0)
assert empty == b"" # nosec
header = msg.pop(0)
assert header == BCP.BCPW_WORKER # nosec
command = msg.pop(0)
if command == BCP.BCPW_REQUEST:
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one…
self.reply_to = msg.pop(0)
# pop empty
empty = msg.pop(0)
assert empty == b"" # nosec
return msg # We have a request to process
elif command == BCP.BCPW_HEARTBEAT:
# Do nothing for heartbeats
pass
elif command == BCP.BCPW_DISCONNECT:
self.reconnect_to_broker()
else:
logger.error("E: invalid input message: ")
dump(msg)
# Send HEARTBEAT if it's time
if time.time() > self.heartbeat_at:
self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
return None
def recv(self):
"""Wait for next request."""
while True:
# Poll socket for a reply, with timeout
try:
items = dict(self.poller.poll(self.timeout))
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.worker.recv_multipart()
if self.verbose:
logger.info("I: received message from broker: ")
dump(msg)
self.liveness = self.HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3 # nosec
empty = msg.pop(0)
assert empty == b"" # nosec
header = msg.pop(0)
assert header == BCP.BCPW_WORKER # nosec
command = msg.pop(0)
if command == BCP.BCPW_REQUEST:
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
self.reply_to = msg.pop(0)
# pop empty
empty = msg.pop(0)
assert empty == b"" # nosec
return msg # We have a request to process
elif command == BCP.BCPW_HEARTBEAT:
# Do nothing for heartbeats
pass
elif command == BCP.BCPW_DISCONNECT:
self.reconnect_to_broker()
else:
logger.error("E: invalid input message: ")
dump(msg)
else:
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logger.warn("W: disconnected from broker - retrying…")
try:
time.sleep(1e-3 * self.reconnect)
except KeyboardInterrupt:
break
self.reconnect_to_broker()
# Send HEARTBEAT if it's time
if time.time() > self.heartbeat_at:
self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
logger.warn("W: interrupt received, killing worker…")
return None
def destroy(self):
# context.destroy depends on pyzmq >= 2.1.10
self.ctx.destroy(0)
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""
Helper module for common zmq task
Based on Majordomo protocol zhelpers.
"""
import binascii
import logging
import zmq
logger = logging.getLogger(__name__)
def dump(msg_or_socket):
"""Receives all message parts from socket, printing each frame neatly"""
if isinstance(msg_or_socket, zmq.Socket):
# it's a socket, call on current message
msg = msg_or_socket.recv_multipart()
else:
msg = msg_or_socket
logger.info("----------------------------------------")
for part in msg:
logger.info("[%03d]" % len(part))
try:
logger.info(part.decode("ascii"))
except UnicodeDecodeError:
logger.info(r"0x%s" % (binascii.hexlify(part).decode("ascii")))
logger.info("----------------------------------------")
......@@ -35,30 +35,26 @@ Helper methods
Forward imports from :py:mod:`beat.backend.python.utils`
"""
import os
import six
import sys
import tempfile
import socket
import contextlib
import string
import random
import logging
import numpy
import simplejson
from beat.backend.python.utils import *
from beat.backend.python.utils import * # noqa: F401, F403
from . import hash
# ----------------------------------------------------------
def temporary_directory(prefix='beat_'):
def temporary_directory(prefix="beat_"):
"""Generates a temporary directory"""
if sys.platform == 'darwin':
return tempfile.mkdtemp(prefix=prefix, dir='/tmp')
if sys.platform == "darwin":
return tempfile.mkdtemp(prefix=prefix, dir="/tmp") # nosec
else:
return tempfile.mkdtemp(prefix=prefix)
......@@ -67,12 +63,13 @@ def temporary_directory(prefix='beat_'):
def uniq(seq):
'''Order preserving (very fast) uniq function for sequences'''
"""Order preserving (very fast) uniq function for sequences"""
seen = set()
result = []
for item in seq:
if item in seen: continue
if item in seen:
continue
seen.add(item)
result.append(item)
......@@ -90,7 +87,7 @@ def send_multipart(socket, parts):
for index, item in enumerate(parts):
if isinstance(item, six.string_types):
parts[index] = item.encode('utf-8')
parts[index] = item.encode("utf-8")
socket.send_multipart(parts)
......@@ -99,38 +96,76 @@ def send_multipart(socket, parts):
def find_free_port():
'''Returns the value of a free random port'''
"""Returns the value of a free random port"""