Commit 917cd6ea authored by Philip ABBET's avatar Philip ABBET

Update to beat.backend.python 1.5.x

parent 27797406
Pipeline #15615 failed with stage
in 1 minute and 10 seconds
......@@ -403,85 +403,3 @@ class Algorithm(BackendAlgorithm):
self.errors.append("language for used library `%s' (`%s') " \
"differs from current language for this algorithm (`%s')" % \
(library, self.libraries[library].language, self.language))
def json_dumps(self, indent=4):
"""Dumps the JSON declaration of this object in a string
Parameters:
indent (int): The number of indentation spaces at every indentation level
Returns:
str: The JSON representation for this object
"""
return simplejson.dumps(self.data, indent=indent,
cls=utils.NumpyJSONEncoder)
def __str__(self):
return self.json_dumps()
def write(self, storage=None):
"""Writes contents to prefix location
Parameters:
storage (Storage, optional): If you pass a new storage, then this object
will be written to that storage point rather than its default.
"""
if self.data['language'] == 'unknown':
raise RuntimeError("algorithm has no programming language set")
if storage is None:
if not self._name:
raise RuntimeError("algorithm has no name")
storage = self.storage #overwrite
storage.save(str(self), self.code, self.description)
def export(self, prefix):
"""Recursively exports itself into another prefix
Dataformats and associated libraries are also copied.
Parameters:
prefix (str): A path to a prefix that must different then my own.
Returns:
None
Raises:
RuntimeError: If prefix and self.prefix point to the same directory.
"""
if not self._name:
raise RuntimeError("dataformat has no name")
if not self.valid:
raise RuntimeError("dataformat is not valid")
if os.path.samefile(prefix, self.prefix):
raise RuntimeError("Cannot export algorithm to the same prefix (%s == " \
"%s)" % (prefix, self.prefix))
for k in self.libraries.values(): k.export(prefix)
for k in self.dataformats.values(): k.export(prefix)
self.write(Storage(prefix, self.name, self.language))
......@@ -28,12 +28,12 @@
from beat.backend.python.data import mixDataIndices
from beat.backend.python.data import getAllFilenames
from beat.backend.python.data import CachedFileLoader
from beat.backend.python.data import DataSource
from beat.backend.python.data import DataSink
from beat.backend.python.data import CachedDataSource
from beat.backend.python.data import DatabaseOutputDataSource
from beat.backend.python.data import RemoteDataSource
from beat.backend.python.data import DataSink
from beat.backend.python.data import CachedDataSink
from beat.backend.python.data import MemoryDataSource
from beat.backend.python.data import MemoryDataSink
from beat.backend.python.data import StdoutDataSink
from beat.backend.python.data import load_data_index
from beat.backend.python.data import foundSplitRanges
......@@ -3,7 +3,7 @@
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
......@@ -25,30 +25,7 @@
# #
###############################################################################
class DoneCrashes:
def setup(self, *args, **kwargs): return True
def next(self): return True
def done(self, last_data_index):
a = b
return True
class NextCrashes:
def setup(self, *args, **kwargs): return True
def done(self, last_data_index): return False
def next(self):
a = b
return True
class SetupCrashes:
def done(self, last_data_index): return True
def next(self): return True
def setup(self, *args, **kwargs):
a = b
return True
class SetupFails:
def setup(self, *args, **kwargs): return False
def done(self, last_data_index): return True
def next(self): return True
from beat.backend.python.data_loaders import DataLoaderList
from beat.backend.python.data_loaders import DataLoader
from beat.backend.python.data_loaders import DataView
......@@ -191,174 +191,3 @@ class Database(BackendDatabase):
"subdirectory or points to a python module, what is " \
"unsupported by this version" % (_set['view'],)
)
@property
def name(self):
"""Returns the name of this object
"""
return self._name or '__unnamed_database__'
@name.setter
def name(self, value):
self._name = value
self.storage = Storage(self.prefix, value)
def hash_output(self, protocol, set, output):
"""Creates a unique hash the represents the output from the dataset
Parameters:
protocol (str): The name of the protocol containing the set and output
of interest
set (str): The name of the set in the protocol containing the output of
reference
output (str): The name of the output in the set.
Returns:
str: The hexadecimal digest of the hash representing the output.
Raises:
KeyError: If the protocol, set or output don't reference an existing
output for this database.
"""
# checks protocol, set and output names
set_data = self.set(protocol, set)
output_data = set_data['outputs'][output]
# dumps the hash
return hash.hashDatasetOutput(self.hash(), protocol, set, output)
@property
def description(self):
"""The short description for this object"""
return self.data.get('description', None)
@description.setter
def description(self, value):
"""Sets the short description for this object"""
self.data['description'] = value
@property
def documentation(self):
"""The full-length description for this object"""
if not self._name:
raise RuntimeError("database has no name")
if self.storage.doc.exists():
return self.storage.doc.load()
return None
@documentation.setter
def documentation(self, value):
"""Sets the full-length description for this object"""
if not self._name:
raise RuntimeError("database has no name")
if hasattr(value, 'read'):
self.storage.doc.save(value.read())
else:
self.storage.doc.save(value)
def hash(self):
"""Returns the hexadecimal hash for its declaration"""
if not self._name:
raise RuntimeError("database has no name")
return self.storage.hash()
def json_dumps(self, indent=4):
"""Dumps the JSON declaration of this object in a string
Parameters:
indent (int): The number of indentation spaces at every indentation level
Returns:
str: The JSON representation for this object
"""
return simplejson.dumps(self.data, indent=indent,
cls=utils.NumpyJSONEncoder)
def __str__(self):
return self.json_dumps()
def write(self, storage=None):
"""Writes contents to prefix location
Parameters:
storage (Storage, optional): If you pass a new storage, then this object
will be written to that storage point rather than its default.
"""
if storage is None:
if not self._name:
raise RuntimeError("database has no name")
storage = self.storage #overwrite
storage.save(str(self), self.code, self.description)
def export(self, prefix):
"""Recursively exports itself into another prefix
Dataformats associated are also exported recursively
Parameters:
prefix (str): A path to a prefix that must different then my own.
Returns:
None
Raises:
RuntimeError: If prefix and self.prefix point to the same directory.
"""
if not self._name:
raise RuntimeError("database has no name")
if not self.valid:
raise RuntimeError("database is not valid")
if os.path.samefile(prefix, self.prefix):
raise RuntimeError("Cannot export database to the same prefix (%s == " \
"%s)" % (prefix, self.prefix))
for k in self.dataformats.values(): k.export(prefix)
self.write(Storage(prefix, self.name))
......@@ -223,80 +223,3 @@ class DataFormat(BackendDataFormat):
# in ``self.resolved``.
if self.errors:
self.errors = utils.uniq(self.errors)
def json_dumps(self, indent=4):
"""Dumps the JSON declaration of this object in a string
Parameters:
indent (int): The number of indentation spaces at every indentation level
Returns:
str: The JSON representation for this object
"""
return simplejson.dumps(self.data, indent=indent,
cls=utils.NumpyJSONEncoder)
def __str__(self):
return self.json_dumps()
def write(self, storage=None):
"""Writes contents to prefix location
Parameters:
storage (Storage, optional): If you pass a new storage, then this object
will be written to that storage point rather than its default.
"""
if storage is None:
if not self._name:
raise RuntimeError("dataformat has no name")
storage = self.storage #overwrite
storage.save(str(self), self.description)
def export(self, prefix):
"""Recursively exports itself into another prefix
Other required dataformats are also copied.
Parameters:
prefix (str): A path to a prefix that must different then my own.
Returns:
None
Raises:
RuntimeError: If prefix and self.prefix point to the same directory.
"""
if not self._name:
raise RuntimeError("dataformat has no name")
if not self.valid:
raise RuntimeError("dataformat is not valid")
if os.path.samefile(prefix, self.prefix):
raise RuntimeError("Cannot dataformat object to the same prefix (%s " \
"== %s)" % (prefix, self.prefix))
for k in self.referenced.values(): k.export(prefix)
self.write(Storage(prefix, self.name))
This diff is collapsed.
This diff is collapsed.
......@@ -138,9 +138,9 @@ class BaseExecutor(object):
self.algorithm = None
self.databases = {}
self.input_list = None
self.data_loaders = None
self.output_list = None
self.data_sinks = []
self.data_sources = []
self.errors = []
self.data = data
......@@ -237,8 +237,8 @@ class BaseExecutor(object):
logger.info("Start the execution of '%s'", self.algorithm.name)
self._prepare_inputs()
self._prepare_outputs()
# self._prepare_inputs()
# self._prepare_outputs()
return self
......@@ -252,12 +252,11 @@ class BaseExecutor(object):
# n.b.: a system exit will raise SystemExit which is not an Exception
if not isinstance(exc_type, Exception):
sink.close()
sink.reset()
self.input_list = None
self.data_loaders = []
self.output_list = None
self.data_sinks = []
self.data_sources = []
def _prepare_inputs(self):
......@@ -359,13 +358,14 @@ class BaseExecutor(object):
def dump_runner_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
data = convert_experiment_configuration_to_container(self.data, self.proxy_mode)
data = convert_experiment_configuration_to_container(self.data)
with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
simplejson.dump(data, f, indent=2)
tmp_prefix = os.path.join(directory, 'prefix')
if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix)
if not os.path.exists(tmp_prefix):
os.makedirs(tmp_prefix)
self.algorithm.export(tmp_prefix)
......
......@@ -32,6 +32,7 @@ import os
import requests
import simplejson
import zmq
import random
import logging
logger = logging.getLogger(__name__)
......@@ -131,7 +132,7 @@ class DockerExecutor(RemoteExecutor):
def __init__(self, host, prefix, data, cache=None, dataformat_cache=None,
database_cache=None, algorithm_cache=None, library_cache=None,
custom_root_folders=None, proxy_mode=True):
custom_root_folders=None):
super(DockerExecutor, self).__init__(prefix, data, host.ip, cache=cache,
dataformat_cache=dataformat_cache,
......@@ -143,16 +144,6 @@ class DockerExecutor(RemoteExecutor):
# Initialisations
self.host = host
# Check if the execution environment supports proxy_mode=False (if necessary)
if not self.proxy_mode:
envkey = '%(name)s (%(version)s)' % self.data['environment']
if envkey not in self.host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, self.host,
", ".join(self.host.processing_environments.keys())))
self.proxy_mode = 'direct_access' not in self.host.processing_environments[envkey].get('capabilities', [])
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
......@@ -217,15 +208,15 @@ class DockerExecutor(RemoteExecutor):
def _kill():
self.host.kill(algorithm_container)
self.message_handler = message_handler.ProxyMessageHandler(
self.input_list, self.output_list, self.host.ip, kill_callback=_kill)
self.message_handler = message_handler.MessageHandler(self.host.ip,
kill_callback=_kill)
#----- (If necessary) Instantiate the docker container that provide the databases
databases_container = None
if self.db_address is not None:
if len(self.databases) > 0:
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
......@@ -260,19 +251,24 @@ class DockerExecutor(RemoteExecutor):
# Creation of the container
# Note: we only support one databases image loaded at the same time
database_port = random.randint(51000, 60000)
cmd = [
'databases_provider',
self.db_address,
os.path.join('/tmp', os.path.basename(databases_configuration_path))
'0.0.0.0:%d' % database_port,
'/beat/prefix',
'/beat/cache'
]
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.copy_path(databases_configuration_path, '/tmp')
# Specify the volumes to mount inside the container
databases_container.add_volume(databases_configuration_path, '/beat/prefix')
databases_container.add_volume(self.cache, '/beat/cache')
if not self.data.has_key('datasets_root_path'):
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
......@@ -280,8 +276,24 @@ class DockerExecutor(RemoteExecutor):
databases_container.add_volume(self.data['datasets_root_path'],
self.data['datasets_root_path'])
# Start the container
self.host.start(databases_container)
while True:
try:
databases_container.add_port(database_port, database_port, host_address=self.host.ip)
self.host.start(databases_container)
break
except Exception as e:
if str(e).find('port is already allocated') < 0:
break
databases_container.reset_ports()
database_port = random.randint(51000, 60000)
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_container.command = cmd
#----- Instantiate the algorithm container
......@@ -293,20 +305,23 @@ class DockerExecutor(RemoteExecutor):
# Command to execute
cmd = [
'execute',
'--cache=/beat/cache',
self.message_handler.address,
os.path.join('/tmp', os.path.basename(configuration_path))
'/beat/prefix'
]
if len(self.databases) > 0:
cmd.append('tcp://' + self.host.ip + ':%d' % database_port)
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
# Creation of the container
algorithm_container = self.host.create_container(processing_environment, cmd)
algorithm_container.copy_path(configuration_path, '/tmp')
# Volumes
if not self.proxy_mode:
algorithm_container.add_volume(self.cache, '/cache', read_only=False)
algorithm_container.add_volume(configuration_path, '/beat/prefix')
algorithm_container.add_volume(self.cache, '/beat/cache', read_only=False)
# Start the container
self.host.start(algorithm_container,
......@@ -326,42 +341,41 @@ class DockerExecutor(RemoteExecutor):
except requests.exceptions.ReadTimeout:
logger.warn("user process has timed out after %d minutes", timeout_in_minutes)
timed_out = True
self.host.kill(algorithm_container)
status = self.host.wait(algorithm_container)
if databases_container is not None:
self.host.kill(databases_container)
self.host.wait(databases_container)
timed_out = True
except KeyboardInterrupt: # Developer pushed CTRL-C
logger.info("stopping user process on CTRL-C console request")
self.host.kill(algorithm_container)
status = self.host.wait(algorithm_container)
finally:
if databases_container is not None:
self.host.kill(databases_container)
self.host.wait(databases_container)
finally:
self.message_handler.stop.set()
self.message_handler.join()
# Collects final information and returns to caller
container_log = self.host.stderr(algorithm_container)
container_log = self.host.logs(algorithm_container)
stderr = ''
if status != 0:
stdout = ''
stderr = container_log
else:
stdout = container_log
stderr = ''
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug("Log of the container: " + container_log)
retval = dict(
status = status,
stdout = self.host.stdout(algorithm_container),
stdout = stdout,
stderr = stderr,
timed_out = timed_out,
statistics = self.host.statistics(algorithm_container),
......@@ -375,20 +389,17 @@ class DockerExecutor(RemoteExecutor):
self.host.rm(algorithm_container)
if databases_container is not None:
db_container_log = self.host.stderr(databases_container)
db_container_log = self.host.logs(databases_container)
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug("Log of the database container: " + db_container_log)
if status != 0:
retval['stderr'] += '\n' + db_container_log
retval['stdout'] += '\n' + self.host.stdout(databases_container)
else:
retval['stdout'] += '\n' + db_container_log
self.host.rm(databases_container)
self.db_socket.setsockopt(zmq.LINGER, 0)
self.db_socket.close()
self.context.destroy()
self.message_handler.destroy()
self.message_handler = None
......
......@@ -63,7 +63,8 @@ class LocalExecutor(BaseExecutor):
Parameters:
prefix (str): Establishes the prefix of your installation.
prefix (beat.backend.python.utils.Prefix): Establishes the prefix of
your installation.
data (dict, str): The piece of data representing the block to be executed.
It must validate against the schema defined for execution blocks. If a
......@@ -161,10 +162,27 @@ class LocalExecutor(BaseExecutor):
custom_root_folders=custom_root_folders)
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
super(LocalExecutor, self).__enter__()