Commit bec61e60 authored by Philip ABBET's avatar Philip ABBET
Browse files

Database indexing now works on the platform

parent 22ef482c
Pipeline #9083 failed with stage
in 5 minutes and 42 seconds
......@@ -34,7 +34,7 @@
%(prog)s databases diff <name>
%(prog)s databases status
%(prog)s databases version <name>
%(prog)s databases index [--list | --delete | --checksum] [<name>]...
%(prog)s databases index [--list | --delete | --checksum] [--uid=<uid>] [--db-root=<path>] [<name>]...
%(prog)s databases view [--exclude=<output>] <set_name>
%(prog)s databases --help
......@@ -166,7 +166,7 @@ def pull(webapi, prefix, names, force, indentation, format_cache):
return status + df_status
def index_output(configuration, names, ls, delete, checksum):
def index_output(configuration, names, ls, delete, checksum, uid, db_root):
names = common.make_up_local_list(configuration.path, 'database', names)
......@@ -244,8 +244,9 @@ def index_output(configuration, names, ls, delete, checksum):
protocols = [protocol_filter]
host = dock.Host()
host.setup(raise_on_errors=True)
if not (ls or delete or checksum): #we're indexing
host = dock.Host()
host.setup(raise_on_errors=True)
for protocol_name in protocols:
......@@ -263,83 +264,98 @@ def index_output(configuration, names, ls, delete, checksum):
for set_name in sets:
zmq_context = zmq.Context()
db_socket = zmq_context.socket(zmq.PAIR)
db_address = 'tcp://' + host.ip
port = db_socket.bind_to_random_port(db_address)
db_address += ':%d' % port
db_set = database.set(protocol_name, set_name)
input_list = inputs.InputList()
if not (ls or delete or checksum): #we're indexing
zmq_context = zmq.Context()
db_socket = zmq_context.socket(zmq.PAIR)
db_address = 'tcp://' + host.ip
port = db_socket.bind_to_random_port(db_address)
db_address += ':%d' % port
input_group = inputs.RemoteInputGroup(set_name, restricted_access=True, socket=db_socket)
input_list.add(input_group)
input_list = inputs.InputList()
db_set = database.set(protocol_name, set_name)
input_group = inputs.RemoteInputGroup(set_name, restricted_access=True, socket=db_socket)
input_list.add(input_group)
db_configuration = {
'inputs': {},
'channel': set_name,
}
for output_name, dataformat_name in db_set['outputs'].items():
input = inputs.RemoteInput(output_name, database.dataformats[dataformat_name], db_socket)
input_group.add(input)
db_configuration['inputs'][output_name] = dict(
database=db_name,
protocol=protocol_name,
set=set_name,
output=output_name,
channel=set_name
)
db_configuration = {
'inputs': {},
'channel': set_name,
}
db_tempdir = utils.temporary_directory()
if uid is not None:
db_configuration['datasets_uid'] = uid
with open(os.path.join(db_tempdir, 'configuration.json'), 'wb') as f:
simplejson.dump(db_configuration, f, indent=4)
if db_root is not None:
db_configuration['datasets_root_path'] = db_root
tmp_prefix = os.path.join(db_tempdir, 'prefix')
if not os.path.exists(tmp_prefix):
os.makedirs(tmp_prefix)
for output_name, dataformat_name in db_set['outputs'].items():
input = inputs.RemoteInput(output_name, database.dataformats[dataformat_name], db_socket)
input_group.add(input)
database.export(tmp_prefix)
db_configuration['inputs'][output_name] = dict(
database=db_name,
protocol=protocol_name,
set=set_name,
output=output_name,
channel=set_name
)
json_path = os.path.join(tmp_prefix, 'databases', db_name + '.json')
db_tempdir = utils.temporary_directory()
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
with open(os.path.join(db_tempdir, 'configuration.json'), 'wb') as f:
simplejson.dump(db_configuration, f, indent=4)
database_path = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
tmp_prefix = os.path.join(db_tempdir, 'prefix')
if not os.path.exists(tmp_prefix):
os.makedirs(tmp_prefix)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
database.export(tmp_prefix)
try:
db_envkey = host.db2docker([db_name])
except:
raise RuntimeError("No environment found for the database `%s' " \
"- available environments are %s" % (
db_name,
", ".join(host.db_environments.keys())))
if db_root is None:
json_path = os.path.join(tmp_prefix, 'databases', db_name + '.json')
tmp_dir = os.path.join('/tmp', os.path.basename(db_tempdir))
db_cmd = ['databases_provider', db_address, tmp_dir]
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
volumes = {}
volumes[database_path] = {
'bind': os.path.join('/databases', db_name),
'mode': 'ro',
}
database_path = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
# Note: we only support one databases image loaded at the same time
db_process = dock.Popen(
host,
db_envkey,
command=db_cmd,
tmp_archive=db_tempdir,
volumes=volumes
)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
try:
db_envkey = host.db2docker([db_name])
except:
raise RuntimeError("No environment found for the database `%s' " \
"- available environments are %s" % (
db_name,
", ".join(host.db_environments.keys())))
tmp_dir = os.path.join('/tmp', os.path.basename(db_tempdir))
db_cmd = ['databases_provider', db_address, tmp_dir]
volumes = {}
if db_root is None:
volumes[database_path] = {
'bind': os.path.join('/databases', db_name),
'mode': 'ro',
}
else:
volumes[db_root] = {
'bind': db_root,
'mode': 'ro',
}
# Note: we only support one databases image loaded at the same time
db_process = dock.Popen(
host,
db_envkey,
command=db_cmd,
tmp_archive=db_tempdir,
volumes=volumes
)
index_filenames = []
previous_data_indices = []
......@@ -363,8 +379,8 @@ def index_output(configuration, names, ls, delete, checksum):
configuration.cache)
elif checksum:
assert load_data_index(index_filename, index_filename)
logger.info("index for `%s' can be loaded and checksums",
assert load_data_index(configuration.cache, toPath(index_hash, '.index'))
logger.info("index for `%s' can be loaded and checksumed",
index_filename)
else:
......@@ -423,13 +439,14 @@ def index_output(configuration, names, ls, delete, checksum):
retcode += 1
continue
db_process.kill()
db_process.wait()
db_process.rm()
if not (ls or delete or checksum): #we're indexing
db_process.kill()
db_process.wait()
db_process.rm()
db_socket.setsockopt(zmq.LINGER, 0)
db_socket.close()
zmq_context.term()
db_socket.setsockopt(zmq.LINGER, 0)
db_socket.close()
zmq_context.term()
return retcode
......@@ -563,7 +580,9 @@ def process(args):
elif args['index']:
return index_output(args['config'], args['<name>'], args['--list'],
args['--delete'], args['--checksum'])
args['--delete'], args['--checksum'],
int(args['--uid']) if args['--uid'] is not None else None,
args['--db-root'])
# Should not happen
logger.error("unrecognized `databases' subcommand")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment