Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.cmdline
Commits
22ef482c
Commit
22ef482c
authored
May 10, 2017
by
Philip ABBET
Browse files
Database indexing now works on local development environments
parent
a582e6f2
Changes
1
Hide whitespace changes
Inline
Side-by-side
beat/cmdline/databases.py
100644 → 100755
View file @
22ef482c
...
...
@@ -91,6 +91,8 @@ Examples:
import
os
import
glob
import
zmq.green
as
zmq
import
logging
logger
=
logging
.
getLogger
()
...
...
@@ -101,6 +103,9 @@ from beat.core.utils import NumpyJSONEncoder
from
beat.core.database
import
Database
from
beat.core.dataformat
import
DataFormat
from
beat.core.data
import
load_data_index
from
beat.core
import
dock
from
beat.core
import
inputs
from
beat.core
import
utils
from
.
import
common
...
...
@@ -239,6 +244,9 @@ def index_output(configuration, names, ls, delete, checksum):
protocols
=
[
protocol_filter
]
host
=
dock
.
Host
()
host
.
setup
(
raise_on_errors
=
True
)
for
protocol_name
in
protocols
:
sets
=
database
.
set_names
(
protocol_name
)
...
...
@@ -255,26 +263,89 @@ def index_output(configuration, names, ls, delete, checksum):
for
set_name
in
sets
:
try
:
view
=
database
.
view
(
protocol_name
,
set_name
)
view
.
prepare_outputs
()
view
.
setup
()
except
Exception
:
logger
.
error
(
"Failed to setup the set `%s/%s/%s':"
,
db_name
,
protocol_name
,
set_name
)
import
traceback
for
e
in
traceback
.
format_exc
().
split
(
'
\n
'
):
logger
.
error
(
' %s'
,
e
)
retcode
+=
1
continue
zmq_context
=
zmq
.
Context
()
db_socket
=
zmq_context
.
socket
(
zmq
.
PAIR
)
db_address
=
'tcp://'
+
host
.
ip
port
=
db_socket
.
bind_to_random_port
(
db_address
)
db_address
+=
':%d'
%
port
input_list
=
inputs
.
InputList
()
input_group
=
inputs
.
RemoteInputGroup
(
set_name
,
restricted_access
=
True
,
socket
=
db_socket
)
input_list
.
add
(
input_group
)
db_set
=
database
.
set
(
protocol_name
,
set_name
)
db_configuration
=
{
'inputs'
:
{},
'channel'
:
set_name
,
}
for
output_name
,
dataformat_name
in
db_set
[
'outputs'
].
items
():
input
=
inputs
.
RemoteInput
(
output_name
,
database
.
dataformats
[
dataformat_name
],
db_socket
)
input_group
.
add
(
input
)
db_configuration
[
'inputs'
][
output_name
]
=
dict
(
database
=
db_name
,
protocol
=
protocol_name
,
set
=
set_name
,
output
=
output_name
,
channel
=
set_name
)
input_group
=
view
.
input_group
()
db_tempdir
=
utils
.
temporary_directory
()
with
open
(
os
.
path
.
join
(
db_tempdir
,
'configuration.json'
),
'wb'
)
as
f
:
simplejson
.
dump
(
db_configuration
,
f
,
indent
=
4
)
tmp_prefix
=
os
.
path
.
join
(
db_tempdir
,
'prefix'
)
if
not
os
.
path
.
exists
(
tmp_prefix
):
os
.
makedirs
(
tmp_prefix
)
database
.
export
(
tmp_prefix
)
json_path
=
os
.
path
.
join
(
tmp_prefix
,
'databases'
,
db_name
+
'.json'
)
with
open
(
json_path
,
'r'
)
as
f
:
db_data
=
simplejson
.
load
(
f
)
database_path
=
db_data
[
'root_folder'
]
db_data
[
'root_folder'
]
=
os
.
path
.
join
(
'/databases'
,
db_name
)
with
open
(
json_path
,
'w'
)
as
f
:
simplejson
.
dump
(
db_data
,
f
,
indent
=
4
)
try
:
db_envkey
=
host
.
db2docker
([
db_name
])
except
:
raise
RuntimeError
(
"No environment found for the database `%s' "
\
"- available environments are %s"
%
(
db_name
,
", "
.
join
(
host
.
db_environments
.
keys
())))
tmp_dir
=
os
.
path
.
join
(
'/tmp'
,
os
.
path
.
basename
(
db_tempdir
))
db_cmd
=
[
'databases_provider'
,
db_address
,
tmp_dir
]
volumes
=
{}
volumes
[
database_path
]
=
{
'bind'
:
os
.
path
.
join
(
'/databases'
,
db_name
),
'mode'
:
'ro'
,
}
# Note: we only support one databases image loaded at the same time
db_process
=
dock
.
Popen
(
host
,
db_envkey
,
command
=
db_cmd
,
tmp_archive
=
db_tempdir
,
volumes
=
volumes
)
index_filenames
=
[]
previous_data_indices
=
[]
for
output
in
view
.
outputs
:
index_hash
=
database
.
hash_output
(
protocol_name
,
set_name
,
output
.
name
)
for
output_name
in
db_set
[
'outputs'
].
keys
():
index_hash
=
database
.
hash_output
(
protocol_name
,
set_name
,
output_name
)
index_filename
=
os
.
path
.
join
(
configuration
.
cache
,
toPath
(
index_hash
,
'.index'
))
...
...
@@ -303,7 +374,7 @@ def index_output(configuration, names, ls, delete, checksum):
logger
.
info
(
"Indexing database `%s', protocol `%s', set `%s', "
\
"output `%s' -> `%s'"
,
db_name
,
protocol_name
,
set_name
,
output
.
name
,
index_filename
)
output
_
name
,
index_filename
)
if
os
.
path
.
exists
(
index_filename
):
logger
.
extra
(
"Overwriting existing index file `%s'"
,
...
...
@@ -352,6 +423,14 @@ def index_output(configuration, names, ls, delete, checksum):
retcode
+=
1
continue
db_process
.
kill
()
db_process
.
wait
()
db_process
.
rm
()
db_socket
.
setsockopt
(
zmq
.
LINGER
,
0
)
db_socket
.
close
()
zmq_context
.
term
()
return
retcode
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment