Skip to content
Snippets Groups Projects
Commit 81392e5a authored by Philip ABBET's avatar Philip ABBET
Browse files

Refactoring: 'databases_provider.py' is now part of this package (from beat.core)

parent 6af74906
Branches
Tags
1 merge request!8Refactoring and support of dataset providing in a container
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.backend.python module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""Executes some database views. (%(version)s)
usage:
%(prog)s [--debug] <addr> <dir>
%(prog)s (--help)
%(prog)s (--version)
arguments:
<addr> Address of the server for I/O requests
<dir> Directory containing all configuration required to run the views
options:
-h, --help Shows this help message and exit
-V, --version Shows program's version number and exit
-d, --debug Runs executor in debugging mode
"""
import logging
import os
import sys
import docopt
import zmq
from ..dbexecution import DBExecutor
class UserError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def send_error(logger, socket, tp, message):
"""Sends a user (usr) or system (sys) error message to the infrastructure"""
logger.debug('send: (err) error')
socket.send('err', zmq.SNDMORE)
socket.send(tp, zmq.SNDMORE)
logger.debug('send: """%s"""' % message.rstrip())
socket.send(message)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
this_try = 1
max_tries = 5
timeout = 1000 #ms
while this_try <= max_tries:
socks = dict(poller.poll(timeout)) #blocks here, for 5 seconds at most
if socket in socks and socks[socket] == zmq.POLLIN:
answer = socket.recv() #ack
logger.debug('recv: %s', answer)
break
logger.warn('(try %d) waited %d ms for "ack" from server',
this_try, timeout)
this_try += 1
if this_try > max_tries:
logger.error('could not send error message to server')
logger.error('stopping 0MQ client anyway')
def main():
package = __name__.rsplit('.', 2)[0]
version = package + ' v' + \
__import__('pkg_resources').require(package)[0].version
prog = os.path.basename(sys.argv[0])
args = docopt.docopt(__doc__ % dict(prog=prog, version=version),
version=version)
# Sets up the logging system
if args['--debug']:
logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s',
level=logging.DEBUG)
else:
logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s',
level=logging.WARNING)
logger = logging.getLogger(__name__)
# Creates the 0MQ socket for communication with BEAT
context = zmq.Context()
socket = context.socket(zmq.PAIR)
address = args['<addr>']
socket.connect(address)
logger.debug("zmq client connected to `%s'", address)
try:
# Check the dir
if not os.path.exists(args['<dir>']):
raise IOError("Running directory `%s' not found" % args['<dir>'])
# Sets up the execution
dataformat_cache = {}
database_cache = {}
try:
dbexecutor = DBExecutor(os.path.join(args['<dir>'], 'prefix'),
os.path.join(args['<dir>'], 'configuration.json'),
dataformat_cache, database_cache)
except (MemoryError):
raise
except Exception as e:
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
s = ''.join(traceback.format_list(tb[4:])) #exclude this file
s = s.replace(args['<dir>'], '').strip()
raise UserError("%s%s: %s" % (s, type(e).__name__, e))
# Execute the code
try:
with dbexecutor:
dbexecutor.process(context, socket)
dbexecutor.wait()
except (MemoryError):
raise
except Exception as e:
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
s = ''.join(traceback.format_list(tb[4:]))
s = s.replace(args['<dir>'], '').strip()
raise UserError("%s%s: %s" % (s, type(e).__name__, e))
except UserError as e:
msg = str(e).decode('string_escape').strip("'")
send_error(logger, socket, 'usr', msg)
return 1
except MemoryError as e:
# Say something meaningful to the user
msg = "The user process for this block ran out of memory. We " \
"suggest you optimise your code to reduce memory usage or, " \
"if this is not an option, choose an appropriate processing " \
"queue with enough memory."
send_error(logger, socket, 'usr', msg)
return 1
except Exception as e:
import traceback
send_error(logger, socket, 'sys', traceback.format_exc())
return 1
finally:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.term()
logger.debug("0MQ client finished")
return 0
if __name__ == '__main__':
sys.exit(main())
...@@ -65,6 +65,7 @@ setup( ...@@ -65,6 +65,7 @@ setup(
'console_scripts': [ 'console_scripts': [
'execute = beat.backend.python.scripts.execute:main', 'execute = beat.backend.python.scripts.execute:main',
'describe = beat.backend.python.scripts.describe:main', 'describe = beat.backend.python.scripts.describe:main',
'databases_provider = beat.backend.python.scripts.databases_provider:main',
], ],
}, },
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment