Commit 7b3efc83 authored by Philip ABBET's avatar Philip ABBET

Modifications needed by the 'SubprocessExecutor' class of beat.core

parent ac5c51d8
......@@ -79,7 +79,7 @@ class Executor(object):
"""
def __init__(self, socket, directory, dataformat_cache=None,
database_cache=None, library_cache=None):
database_cache=None, library_cache=None, cache_root='/cache'):
self.socket = socket
......@@ -108,14 +108,14 @@ class Executor(object):
cache_access = AccessMode.LOCAL
(self.input_list, _) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, '/cache',
self.data, self.algorithm, self.prefix, cache_root,
cache_access=cache_access, db_access=AccessMode.REMOTE,
socket=self.socket
)
# Loads algorithm outputs
(self.output_list, _) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, '/cache', self.input_list,
self.data, self.algorithm, self.prefix, cache_root, self.input_list,
cache_access=cache_access, socket=self.socket
)
......
......@@ -41,10 +41,10 @@ from . import outputs
def convert_experiment_configuration_to_container(config, proxy_mode):
data = {
'proxy_mode': proxy_mode,
'algorithm': config['algorithm'],
'parameters': config['parameters'],
'channel': config['channel'],
'uid': os.getuid(),
'algorithm': config['algorithm'],
'parameters': config['parameters'],
'channel': config['channel'],
'uid': os.getuid(),
}
if 'range' in config:
......@@ -75,7 +75,8 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
cache_access=AccessMode.NONE,
db_access=AccessMode.NONE,
unpack=True, socket=None,
databases=None):
databases=None,
no_synchronisation_listeners=False):
data_sources = []
views = {}
......@@ -179,9 +180,13 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
# Synchronization bits
group = input_list.group(details['channel'])
if group is None:
synchronization_listener = None
if not no_synchronisation_listeners:
synchronization_listener = outputs.SynchronizationListener()
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
synchronization_listener=synchronization_listener,
restricted_access=(details['channel'] == config['channel'])
)
input_list.add(group)
......@@ -216,6 +221,8 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
for name, details in output_config.items():
synchronization_listener = None
if 'result' in config:
dataformat_name = 'analysis:' + algorithm.name
dataformat = algorithm.result_dataformat()
......@@ -223,6 +230,9 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
dataformat_name = algorithm.output_map[name]
dataformat = algorithm.dataformats[dataformat_name]
input_group = input_list.group(details['channel'])
if input_group is not None:
synchronization_listener = input_group.synchronization_listener
if cache_access == AccessMode.LOCAL:
......@@ -250,14 +260,6 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
if not status:
raise IOError("Cannot create cache sink '%s'" % details['path'])
synchronization_listener = None
if 'result' not in config:
input_group = input_list.group(details['channel'])
if (input_group is not None) and hasattr(input_group, 'synchronization_listener'):
synchronization_listener = input_group.synchronization_listener
output_list.add(outputs.Output(name, data_sink,
synchronization_listener=synchronization_listener,
force_start_index=start_index or 0)
......@@ -274,7 +276,10 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
if socket is None:
raise IOError("No socket provided for remote outputs")
output_list.add(outputs.RemoteOutput(name, dataformat, socket))
output_list.add(outputs.RemoteOutput(name, dataformat, socket,
synchronization_listener=synchronization_listener,
force_start_index=start_index or 0)
)
logger.debug("RemoteOutput '%s' created: group='%s', dataformat='%s'" % \
(name, details['channel'], dataformat_name))
......
......@@ -29,7 +29,7 @@
"""Executes a single algorithm. (%(version)s)
usage:
%(prog)s [--debug] <addr> <dir>
%(prog)s [--debug] [--cache=<path>] <addr> <dir>
%(prog)s (--help)
%(prog)s (--version)
......@@ -44,6 +44,7 @@ options:
-h, --help Shows this help message and exit
-V, --version Shows program's version number and exit
-d, --debug Runs executor in debugging mode
-c, --cache=<path> Cache prefix, otherwise defaults to '/cache'
"""
......@@ -169,6 +170,10 @@ def main():
logger = logging.getLogger(__name__)
# Retrieve the cache path
cache = args['--cache'] if args['--cache'] is not None else '/cache'
# Creates the 0MQ socket for communication with BEAT
context = zmq.Context()
socket = context.socket(zmq.PAIR)
......@@ -184,46 +189,49 @@ def main():
return 1
# Create a new user with less privileges
# Load the configuration
with open(os.path.join(args['<dir>'], 'configuration.json'), 'r') as f:
cfg = simplejson.load(f)
retcode = subprocess.call(['adduser', '--uid', str(cfg['uid']),
'--no-create-home', '--disabled-password',
'--disabled-login', '--gecos', '""', '-q',
'beat-nobody'])
if retcode != 0:
send_error(logger, socket, 'sys', 'Failed to create an user with the UID %d' % cfg['uid'])
close(logger, socket, context)
return 1
# Create a new user with less privileges (if necessary)
if os.getuid() != cfg['uid']:
retcode = subprocess.call(['adduser', '--uid', str(cfg['uid']),
'--no-create-home', '--disabled-password',
'--disabled-login', '--gecos', '""', '-q',
'beat-nobody'])
if retcode != 0:
send_error(logger, socket, 'sys', 'Failed to create an user with the UID %d' % cfg['uid'])
close(logger, socket, context)
return 1
# Ensure that the needed files are readable by the new user
access = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH
os.chmod(args['<dir>'], access)
# Ensure that the needed files are readable by the new user
access = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH
for root, dirs, files in os.walk(args['<dir>']):
for d in dirs:
os.chmod(os.path.join(root, d), access)
for f in files:
os.chmod(os.path.join(root, f), access)
os.chmod(args['<dir>'], access)
for root, dirs, files in os.walk(args['<dir>']):
for d in dirs:
os.chmod(os.path.join(root, d), access)
for f in files:
os.chmod(os.path.join(root, f), access)
# Change to the user with less privileges
try:
os.setgid(cfg['uid'])
os.setuid(cfg['uid'])
except:
import traceback
send_error(logger, socket, 'sys', traceback.format_exc())
close(logger, socket, context)
return 1
# Change to the user with less privileges
try:
os.setgid(cfg['uid'])
os.setuid(cfg['uid'])
except:
import traceback
send_error(logger, socket, 'sys', traceback.format_exc())
close(logger, socket, context)
return 1
try:
# Sets up the execution
executor = Executor(socket, args['<dir>'])
executor = Executor(socket, args['<dir>'], cache_root=cache)
try:
status = executor.setup()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment