diff --git a/beat/core/scripts/databases_provider.py b/beat/core/scripts/databases_provider.py new file mode 100644 index 0000000000000000000000000000000000000000..68e17d3b58507e933631a10d5c67c6d19b0d7e19 --- /dev/null +++ b/beat/core/scripts/databases_provider.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +"""Executes some database views. (%(version)s) + +usage: + %(prog)s [--debug] <addr> <dir> + %(prog)s (--help) + %(prog)s (--version) + + +arguments: + <addr> Address of the server for I/O requests + <dir> Directory containing all configuration required to run the views + + +options: + -h, --help Shows this help message and exit + -V, --version Shows program's version number and exit + -d, --debug Runs executor in debugging mode + +""" + +import logging + +import os +import sys +import docopt + +import zmq + +from ..dbexecutor import DBExecutor + + +class UserError(Exception): + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +def send_error(logger, socket, tp, message): + """Sends a user (usr) or system (sys) error message to the infrastructure""" + + logger.debug('send: (err) error') + socket.send('err', zmq.SNDMORE) + socket.send(tp, zmq.SNDMORE) + logger.debug('send: """%s"""' % message.rstrip()) + socket.send(message) + + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + this_try = 1 + max_tries = 5 + timeout = 1000 #ms + while this_try <= max_tries: + socks = dict(poller.poll(timeout)) #blocks here, for 5 seconds at most + if socket in socks and socks[socket] == zmq.POLLIN: + answer = socket.recv() #ack + logger.debug('recv: %s', answer) + break + logger.warn('(try %d) waited %d ms for "ack" from server', + this_try, timeout) + this_try += 1 + if this_try > max_tries: + logger.error('could not send error message to server') + logger.error('stopping 0MQ client anyway') + + +def main(): + package = __name__.rsplit('.', 2)[0] + version = package + ' v' + \ + __import__('pkg_resources').require(package)[0].version + prog = os.path.basename(sys.argv[0]) + + args = docopt.docopt(__doc__ % dict(prog=prog, version=version), + version=version) + + # Sets up the logging system + if args['--debug']: + logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s', + level=logging.DEBUG) + else: + logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s', + level=logging.WARNING) + + logger = logging.getLogger(__name__) + + # Creates the 0MQ socket for communication with BEAT + context = zmq.Context() + socket = context.socket(zmq.PAIR) + address = args['<addr>'] + socket.connect(address) + logger.debug("zmq client connected to `%s'", address) + + try: + + # Check the dir + if not os.path.exists(args['<dir>']): + raise IOError("Running directory `%s' not found" % args['<dir>']) + + # Sets up the execution + dataformat_cache = {} + database_cache = {} + + try: + dbexecutor = DBExecutor(args['<dir>'], 'configuration.json', + dataformat_cache, database_cache) + except (MemoryError): + raise + except Exception as e: + import traceback + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.extract_tb(exc_traceback) + s = ''.join(traceback.format_list(tb[4:])) #exclude this file + s = s.replace(args['<dir>']).strip() + raise UserError("%s%s: %s" % (s, type(e).__name__, e)) + + # Execute the code + try: + with dbexecutor: + dbexecutor.process(context, socket) + dbexecutor.wait() + except (MemoryError): + raise + except Exception as e: + import traceback + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.extract_tb(exc_traceback) + s = ''.join(traceback.format_list(tb[4:])) + s = s.replace(args['<dir>']).strip() + raise UserError("%s%s: %s" % (s, type(e).__name__, e)) + + except UserError as e: + msg = str(e).decode('string_escape').strip("'") + send_error(logger, socket, 'usr', msg) + return 1 + + except MemoryError as e: + # Say something meaningful to the user + msg = "The user process for this block ran out of memory. We " \ + "suggest you optimise your code to reduce memory usage or, " \ + "if this is not an option, choose an appropriate processing " \ + "queue with enough memory." + send_error(logger, socket, 'usr', msg) + return 1 + + except Exception as e: + import traceback + send_error(logger, socket, 'sys', traceback.format_exc()) + return 1 + + finally: + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.term() + logger.debug("0MQ client finished") + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/setup.py b/setup.py index 07149bd055dad067b7cc17e7746ef0a04551ab1e..fbe56b920453c7675a59e3955e9d7176d573a654 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,12 @@ setup( install_requires=requires, + entry_points={ + 'console_scripts': [ + 'databases_provider = beat.core.scripts.databases_provider:main', + ], + }, + classifiers = [ 'Framework :: BEAT', 'Development Status :: 5 - Production/Stable',