Commit e552efb4 authored by Philip ABBET's avatar Philip ABBET

Add a 0MQ-based worker script

parent d03a2718
Pipeline #12369 failed with stage
in 10 minutes and 27 seconds
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""Starts the worker process (%(version)s)
Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] <address>
%(prog)s (--help | -h)
%(prog)s (--version | -V)
Options:
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Prefix of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
"""
import logging
import gevent
import zmq.green as zmq
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import os
import sys
import signal
import simplejson
import threading
from docopt import docopt
from socket import gethostname
from ..version import __version__
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor
from ..dock import Host
from ..worker import WorkerController
stop = False
class ExecutorTask(threading.Thread): #gevent.Greenlet):
def __init__(self, executor):
self.executor = executor
super(ExecutorTask, self).__init__()
# def _run(self):
def run(self):
with self.executor:
result = self.executor.process()
self.result = result
# return result
def main(user_input=None):
# Parse the command-line arguments
if user_input is not None:
arguments = user_input
else:
arguments = sys.argv[1:]
prog = os.path.basename(sys.argv[0])
completions = dict(
prog=prog,
version=__version__,
hostname=gethostname(),
)
args = docopt(
__doc__ % completions,
argv=arguments,
options_first=True,
version='v%s' % __version__,
)
# Change the verbosity level
if args['--verbose'] == 1:
logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.INFO)
elif args['--verbose'] >= 2:
logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='[%(name)s] %(levelname)s: %(message)s', level=logging.WARNING)
logger = logging.getLogger(__name__)
# Check the prefix path
prefix = args['--prefix'] if args['--prefix'] is not None else '.'
if not os.path.exists(prefix):
logger.error("Prefix not found at: '%s'", prefix)
return 1
# Check the cache path
cache = args['--cache'] if args['--cache'] is not None else os.path.join(prefix, 'cache')
if not os.path.exists(cache):
logger.error("Cache not found at: '%s'", cache)
return 1
# Install a signal handler
def handler(signum, frame):
#ignore further signals
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info("Signal %d caught, terminating...", signum)
global stop
stop = True
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# (If necessary) Docker-related initialisations
host = None
if args['--docker']:
host = Host(raise_on_errors=False)
# Starts our 0MQ server
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, args['--name'])
address = args['<address>']
if address.find('://') < 0:
address = 'tcp://' + address
socket.connect(address)
logger.info("Connected to '%s'", address)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
# Send READY messages until the scheduler acknowlege us
while True:
socket.send('rdy')
socks = dict(poller.poll(100))
if not (socket in socks) or (socks[socket] != zmq.POLLIN):
continue
response = socket.recv()
if response != WorkerController.ACK:
logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'",
address, WorkerController.ACK, response)
return 1
break
# Process the requests
global stop
task = None
current_job_id = None
while not stop:
# Send the result of the processing (if any)
if (task is not None) and not task.is_alive(): #task.ready():
message = simplejson.dumps(task.result)
# message = simplejson.dumps(task.value)
logger.debug('send: """%s"""' % message.rstrip())
socket.send_multipart([
WorkerController.DONE,
current_job_id,
message
])
task = None
if task is None:
timeout = 1000 # ms
else:
timeout = 100
socks = dict(poller.poll(timeout))
if not (socket in socks) or (socks[socket] != zmq.POLLIN):
continue
# Read the next command
parts = socket.recv_multipart()
command = parts[0]
logger.debug("recv: %s", command)
# Command: execute <job-id> <json-command>
if command == WorkerController.EXECUTE:
job_id = parts[1]
data = simplejson.loads(parts[2])
# Check that the worker isn't busy
if task is not None:
socket.send_multipart([
WorkerController.JOB_ERROR,
job_id,
'Worker is already busy'
])
continue
# Create the executor
if args['--docker']:
executor = DockerExecutor(host, prefix, data, cache=cache)
else:
executor = LocalExecutor(prefix, data, cache=cache)
if not executor.valid:
logger.error("Failed to load the execution information:")
message = [
WorkerController.JOB_ERROR,
job_id,
]
message += executor.errors
socket.send_multipart(message)
else:
logger.info("Running '%s'", executor.algorithm.name)
current_job_id = job_id
task = ExecutorTask(executor)
task.start()
# Command: cancel
elif command == WorkerController.CANCEL:
# Check that the worker is busy
if task is None:
socket.send_multipart([
WorkerController.ERROR,
"Worker isn't busy"
])
continue
# Kill the processing thread
# task.kill()
task = None
socket.send_multipart([
WorkerController.CANCELLED,
current_job_id,
])
socket.send(WorkerController.EXIT)
# Cleanup
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
return 0
{
"language": "python",
"splittable": true,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import time
class Algorithm:
def process(self, inputs, outputs):
time.sleep(.5)
outputs['out_data'].write(inputs['in_data'].data)
return True
This diff is collapsed.
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import gevent
import zmq.green as zmq
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import simplejson
class WorkerController(object):
# Status codes
READY = 'rdy'
EXIT = 'ext'
DONE = 'don'
JOB_ERROR = 'erj'
ERROR = 'err'
CANCELLED = 'cld'
# Commands
EXECUTE = 'exe'
CANCEL = 'cnl'
ACK = 'ack'
def __init__(self, address, port):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
if port is not None:
self.address = 'tcp://%s:%d' % (address, port)
self.socket.bind(self.address)
else:
self.address = 'tcp://%s' % address
port = self.socket.bind_to_random_port(self.address, min_port=50000)
self.address += ':%d' % port
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.workers = []
def destroy(self):
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.context.destroy()
def execute(self, worker, job_id, configuration):
self.socket.send_multipart([
worker,
WorkerController.EXECUTE,
str(job_id),
simplejson.dumps(configuration)
])
def cancel(self, worker):
self.socket.send_multipart([
worker,
WorkerController.CANCEL
])
def ack(self, worker):
self.socket.send_multipart([
worker,
WorkerController.ACK
])
def process(self, timeout=0):
while True:
socks = dict(self.poller.poll(timeout))
if not (self.socket in socks) or (socks[self.socket] != zmq.POLLIN):
return None
(address, status, data) = self._receive()
if status == WorkerController.READY:
if address not in self.workers:
self.workers.append(address)
self.ack(address)
timeout = 0
elif status == WorkerController.EXIT:
self.workers.remove(address)
timeout = 0
elif status in [ WorkerController.DONE, WorkerController.JOB_ERROR,
WorkerController.CANCELLED ]:
job_id = int(data[0])
return (address, status, job_id, data[1:])
else:
job_id = None
return (address, status, job_id, data)
def _receive(self):
parts = self.socket.recv_multipart()
return parts[0], parts[1], parts[2:]
......@@ -67,14 +67,15 @@ setup(
namespace_packages=[
"beat",
],
],
install_requires=requires,
entry_points={
'console_scripts': [
],
},
'worker = beat.core.scripts.worker:main',
],
},
classifiers = [
'Framework :: BEAT',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment