Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.backend.python
Commits
8fbee90b
Commit
8fbee90b
authored
Aug 14, 2020
by
Amir MOHAMMADI
Browse files
Use a global prefix to keep things around, add an iris test
parent
73491015
Pipeline
#42047
failed with stage
in 90 minutes and 5 seconds
Changes
11
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
beat/backend/python/algorithm.py
View file @
8fbee90b
...
...
@@ -50,13 +50,17 @@ import numpy
import
simplejson
as
json
import
six
from
.
import
config
from
.
import
dataformat
from
.
import
library
from
.
import
loader
from
.
import
utils
from
.library
import
Library
logger
=
logging
.
getLogger
(
__name__
)
ALGORITHM_TYPE
=
"algorithm"
ALGORITHM_FOLDER
=
"algorithms"
# ----------------------------------------------------------
...
...
@@ -66,28 +70,26 @@ class Storage(utils.CodeStorage):
Parameters:
prefix (str): Establishes the prefix of your installation.
name (str): The name of the algorithm object in the format
``<user>/<name>/<version>``.
"""
asset_type
=
"algorithm"
asset_folder
=
"algorithms"
asset_type
=
ALGORITHM_TYPE
asset_folder
=
ALGORITHM_FOLDER
def
__init__
(
self
,
prefix
,
name
,
language
=
None
):
def
__init__
(
self
,
name
,
language
=
None
,
prefix
=
None
):
if
name
.
count
(
"/"
)
!=
2
:
raise
RuntimeError
(
"invalid algorithm name: `%s'"
%
name
)
self
.
username
,
self
.
name
,
self
.
version
=
name
.
split
(
"/"
)
self
.
fullname
=
name
if
prefix
is
None
:
prefix
=
config
.
get_config
()[
"prefix"
]
self
.
prefix
=
prefix
path
=
utils
.
hashed_or_simple
(
self
.
prefix
,
self
.
asset_folder
,
name
,
suffix
=
".json"
)
path
=
utils
.
hashed_or_simple
(
prefix
,
self
.
asset_folder
,
name
,
suffix
=
".json"
)
path
=
path
[:
-
5
]
super
(
Storage
,
self
).
__init__
(
path
,
language
)
...
...
@@ -375,7 +377,16 @@ class Runner(object):
# ----------------------------------------------------------
class
Algorithm
(
object
):
def
_to_name
(
v
,
convert_basic_types
=
False
):
if
hasattr
(
v
,
"name"
):
v
=
v
.
name
if
convert_basic_types
:
if
not
isinstance
(
v
,
str
):
v
=
numpy
.
dtype
(
v
).
name
return
v
class
Algorithm
(
metaclass
=
config
.
PrefixMeta
):
"""Algorithms represent runnable components within the platform.
This class can only parse the meta-parameters of the algorithm (i.e., input
...
...
@@ -387,20 +398,8 @@ class Algorithm(object):
Parameters:
prefix (str): Establishes the prefix of your installation.
name (str): The fully qualified algorithm name (e.g. ``user/algo/1``)
dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
dataformat names to loaded dataformats. This parameter is optional and,
if passed, may greatly speed-up algorithm loading times as dataformats
that are already loaded may be re-used.
library_cache (:py:class:`dict`, Optional): A dictionary mapping library
names to loaded libraries. This parameter is optional and, if passed,
may greatly speed-up library loading times as libraries that are
already loaded may be re-used.
Attributes:
...
...
@@ -449,6 +448,9 @@ class Algorithm(object):
"""
asset_type
=
ALGORITHM_TYPE
asset_folder
=
ALGORITHM_FOLDER
LEGACY
=
"legacy"
SEQUENTIAL
=
"sequential"
AUTONOMOUS
=
"autonomous"
...
...
@@ -459,27 +461,83 @@ class Algorithm(object):
dataformat_klass
=
dataformat
.
DataFormat
def
__init__
(
self
,
prefix
,
name
,
dataformat_cache
=
None
,
library_cache
=
None
):
def
_init
(
self
):
self
.
_name
=
None
self
.
storage
=
None
self
.
prefix
=
prefix
self
.
dataformats
=
{}
self
.
libraries
=
{}
self
.
groups
=
[]
self
.
errors
=
[]
return
self
def
__init__
(
self
,
name
):
self
.
_init
()
self
.
_load
(
name
)
@
classmethod
def
new
(
cls
,
code_path
,
name
,
groups
,
parameters
=
None
,
description
=
None
,
type
=
"autonomous"
,
splittable
=
False
,
api_version
=
2
,
language
=
"python"
,
uses
=
None
,
schema_version
=
2
,
**
kwargs
,
):
self
=
cls
.
__new__
(
cls
).
_init
()
uses
=
{
k
:
_to_name
(
v
)
for
k
,
v
in
(
uses
or
{}).
items
()}
or
None
# convert dataformats to their names in groups
for
i
,
grp
in
enumerate
(
groups
):
for
key
,
value
in
grp
.
items
():
if
key
not
in
(
"inputs"
,
"outputs"
):
continue
for
node_details
in
value
.
values
():
node_details
[
"type"
]
=
_to_name
(
node_details
[
"type"
])
data
=
dict
(
api_version
=
api_version
,
description
=
description
,
groups
=
groups
,
language
=
language
,
parameters
=
parameters
,
schema_version
=
schema_version
,
splittable
=
splittable
,
type
=
type
,
uses
=
uses
,
)
data
=
{
k
:
v
for
k
,
v
in
data
.
items
()
if
v
is
not
None
}
self
.
data
=
data
if
not
name
:
raise
ValueError
(
f
"Invalid
{
name
}
. The name should be a non-empty string!"
)
dataformat_cache
=
dataformat_cache
if
dataformat_cache
is
not
None
else
{}
library_cache
=
library_cache
if
library_cache
is
not
None
else
{}
self
.
name
=
name
self
.
_load
(
name
,
dataformat_cache
,
library_cache
)
self
.
storage
=
Storage
(
self
.
name
)
self
.
code_path
=
self
.
storage
.
code
.
path
=
code_path
with
open
(
code_path
,
"rt"
)
as
f
:
self
.
code
=
self
.
storage
.
code
.
contents
=
f
.
read
()
self
.
storage
.
json
.
contents
=
str
(
self
)
def
_load
(
self
,
data
,
dataformat_cache
,
library_cache
):
self
.
_resolve
()
return
self
def
_load
(
self
,
name
):
"""Loads the algorithm"""
self
.
_
name
=
data
self
.
name
=
name
self
.
storage
=
Storage
(
self
.
prefix
,
data
)
self
.
storage
=
Storage
(
self
.
name
)
json_path
=
self
.
storage
.
json
.
path
if
not
self
.
storage
.
exists
():
self
.
errors
.
append
(
"Algorithm declaration file not found: %s"
%
json_path
)
...
...
@@ -497,7 +555,9 @@ class Algorithm(object):
self
.
code_path
=
self
.
storage
.
code
.
path
self
.
code
=
self
.
storage
.
code
.
load
()
self
.
_resolve
()
def
_resolve
(
self
):
self
.
groups
=
self
.
data
[
"groups"
]
# create maps for easy access to data
...
...
@@ -511,46 +571,46 @@ class Algorithm(object):
for
k
,
v
in
g
.
get
(
"outputs"
,
{}).
items
()
]
)
# find the group name of outputs
self
.
output_group
=
None
for
g
in
self
.
groups
:
for
k
,
v
in
g
.
get
(
"outputs"
,
{}).
items
():
self
.
output_group
=
g
break
break
self
.
loop_map
=
dict
(
[(
k
,
v
[
"type"
])
for
g
in
self
.
groups
for
k
,
v
in
g
.
get
(
"loop"
,
{}).
items
()]
)
self
.
_load_dataformats
(
dataformat_cache
)
self
.
_load_dataformats
()
self
.
_convert_parameter_types
()
self
.
_load_libraries
(
library_cache
)
self
.
_load_libraries
()
def
_update_dataformat_cache
(
self
,
type_name
,
dataformat_cache
):
def
_update_dataformat_cache
(
self
,
type_name
):
"""Update the data format cache based on the type name"""
if
type_name
not
in
self
.
dataformats
:
if
dataformat_cache
and
type_name
in
dataformat_cache
:
# reuse
thisformat
=
dataformat_cache
[
type_name
]
else
:
# load it
thisformat
=
self
.
dataformat_klass
(
self
.
prefix
,
type_name
)
if
dataformat_cache
is
not
None
:
# update it
dataformat_cache
[
type_name
]
=
thisformat
thisformat
=
self
.
dataformat_klass
[
type_name
]
self
.
dataformats
[
type_name
]
=
thisformat
return
self
.
dataformats
[
type_name
]
def
_update_dataformat_cache_for_group
(
self
,
group
,
dataformat_cache
):
def
_update_dataformat_cache_for_group
(
self
,
group
):
for
_
,
entry
in
group
.
items
():
self
.
_update_dataformat_cache
(
entry
[
"type"
]
,
dataformat_cache
)
self
.
_update_dataformat_cache
(
entry
[
"type"
])
def
_load_dataformats
(
self
,
dataformat_cache
):
def
_load_dataformats
(
self
):
"""Makes sure we can load all requested formats
"""
for
group
in
self
.
groups
:
self
.
_update_dataformat_cache_for_group
(
group
[
"inputs"
]
,
dataformat_cache
)
self
.
_update_dataformat_cache_for_group
(
group
[
"inputs"
])
if
"outputs"
in
group
:
self
.
_update_dataformat_cache_for_group
(
group
[
"outputs"
],
dataformat_cache
)
self
.
_update_dataformat_cache_for_group
(
group
[
"outputs"
])
if
"loop"
in
group
:
self
.
_update_dataformat_cache_for_group
(
group
[
"loop"
]
,
dataformat_cache
)
self
.
_update_dataformat_cache_for_group
(
group
[
"loop"
])
if
self
.
results
:
...
...
@@ -559,7 +619,7 @@ class Algorithm(object):
# results can only contain base types and plots therefore, only
# process plots
if
result_type
.
find
(
"/"
)
!=
-
1
:
self
.
_update_dataformat_cache
(
result_type
,
dataformat_cache
)
self
.
_update_dataformat_cache
(
result_type
)
def
_convert_parameter_types
(
self
):
"""Converts types to numpy equivalents, checks defaults, ranges and choices
...
...
@@ -639,7 +699,7 @@ class Algorithm(object):
)
)
def
_load_libraries
(
self
,
library_cache
):
def
_load_libraries
(
self
):
# all used libraries must be loadable; cannot use self as a library
...
...
@@ -647,9 +707,7 @@ class Algorithm(object):
for
name
,
value
in
self
.
uses
.
items
():
self
.
libraries
[
value
]
=
library_cache
.
setdefault
(
value
,
library
.
Library
(
self
.
prefix
,
value
,
library_cache
)
)
self
.
libraries
[
value
]
=
Library
[
value
]
@
property
def
name
(
self
):
...
...
@@ -668,8 +726,11 @@ class Algorithm(object):
if
self
.
data
[
"language"
]
==
"unknown"
:
raise
RuntimeError
(
"algorithm has no programming language set"
)
if
"/"
not
in
value
:
value
=
f
"
{
config
.
get_config
()[
'user'
]
}
/
{
value
}
/1"
self
.
_name
=
value
self
.
storage
=
Storage
(
self
.
prefix
,
value
,
self
.
data
[
"language"
])
self
.
storage
=
Storage
(
value
,
self
.
data
[
"language"
])
@
property
def
schema_version
(
self
):
...
...
@@ -898,7 +959,7 @@ class Algorithm(object):
)
format
=
dataformat
.
DataFormat
(
self
.
prefix
,
dict
([(
k
,
v
[
"type"
])
for
k
,
v
in
self
.
results
.
items
()])
dict
([(
k
,
v
[
"type"
])
for
k
,
v
in
self
.
results
.
items
()])
)
format
.
name
=
"analysis:"
+
self
.
name
...
...
@@ -1034,7 +1095,7 @@ class Algorithm(object):
Raises:
RuntimeError: If prefix and
self.
prefix point to the same directory.
RuntimeError: If prefix and prefix point to the same directory.
"""
...
...
@@ -1044,19 +1105,53 @@ class Algorithm(object):
if
not
self
.
valid
:
raise
RuntimeError
(
"algorithm is not valid:
\n
%s"
%
"
\n
"
.
join
(
self
.
errors
))
if
prefix
==
self
.
prefix
:
raise
RuntimeError
(
"Cannot export algorithm to the same prefix ("
"%s)"
%
prefix
)
for
k
in
self
.
libraries
.
values
():
k
.
export
(
prefix
)
for
k
in
self
.
dataformats
.
values
():
k
.
export
(
prefix
)
self
.
write
(
Storage
(
prefix
,
self
.
name
,
self
.
language
))
self
.
write
(
Storage
(
self
.
name
,
self
.
language
,
prefix
=
prefix
))
class
Analyzer
(
Algorithm
):
"""docstring for Analyzer"""
@
classmethod
def
new
(
cls
,
code_path
,
name
,
groups
,
results
,
description
=
None
,
type
=
"autonomous"
,
api_version
=
2
,
language
=
"python"
,
uses
=
None
,
schema_version
=
2
,
**
kwargs
,
):
self
=
super
().
new
(
code_path
=
code_path
,
name
=
name
,
groups
=
groups
,
description
=
description
,
type
=
type
,
api_version
=
api_version
,
language
=
language
,
uses
=
uses
,
schema_version
=
schema_version
,
# no parameters or splittable for analyzers
parameters
=
None
,
splittable
=
None
,
**
kwargs
,
)
# convert dataformats to their names in results
for
name
,
details
in
results
.
items
():
details
[
"type"
]
=
_to_name
(
details
[
"type"
],
convert_basic_types
=
True
)
self
.
data
[
"results"
]
=
results
self
.
_resolve
()
return
self
beat/backend/python/config.py
View file @
8fbee90b
...
...
@@ -99,7 +99,7 @@ def set_config(**kwargs):
"""
supported_keys
=
set
(
DEFAULTS
.
keys
())
set_keys
=
set
(
kwargs
.
keys
())
if
set_keys
not
in
supported_keys
:
if
not
set_keys
.
issubset
(
supported_keys
)
:
raise
ValueError
(
f
"Only
{
supported_keys
}
are valid configurations. "
f
"Got these extra values:
{
set_keys
-
supported_keys
}
"
...
...
@@ -137,14 +137,14 @@ def config_context(**new_config):
"""
old_config
=
get_config
().
copy
()
# also backup prefix
old_prefix
=
Prefix
().
copy
()
prefix
=
Prefix
()
old_prefix
=
prefix
.
copy
()
set_config
(
**
new_config
)
try
:
yield
finally
:
set_config
(
**
old_config
)
prefix
=
Prefix
()
prefix
.
clear
()
prefix
.
update
(
old_prefix
)
...
...
@@ -154,7 +154,6 @@ def config_context(**new_config):
class
Singleton
(
type
):
"""A Singleton metaclass
The singleton class calls the __init__ method each time the instance is requested.
From: https://stackoverflow.com/a/6798042/1286165
"""
...
...
@@ -167,11 +166,55 @@ class Singleton(type):
class
Prefix
(
dict
,
metaclass
=
Singleton
):
def
__init__
(
self
,
path
=
None
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
pass
class
PrefixMeta
(
type
):
# cache instances when __init__ is called
def
__call__
(
cls
,
*
args
,
**
kwargs
):
instance
=
super
().
__call__
(
*
args
,
**
kwargs
)
folder
=
f
"
{
cls
.
asset_folder
}
/
{
instance
.
name
}
"
prefix
=
Prefix
()
prefix
[
folder
]
=
instance
print
(
f
"caching
{
cls
.
__name__
}
/
{
instance
.
name
}
in __init__ calls"
)
return
instance
def
__init__
(
cls
,
name
,
bases
,
clsdict
):
# cache instances when new is called
if
"new"
in
clsdict
:
def
caching_new
(
*
args
,
**
kwargs
):
instance
=
clsdict
[
"new"
].
__func__
(
cls
,
*
args
,
**
kwargs
)
folder
=
f
"
{
cls
.
asset_folder
}
/
{
instance
.
name
}
"
prefix
=
Prefix
()
prefix
[
folder
]
=
instance
print
(
f
"caching
{
cls
.
__name__
}
/
{
instance
.
name
}
in new calls"
)
return
instance
setattr
(
cls
,
"new"
,
caching_new
)
# change the key of instance when its name is changed
if
"name"
in
clsdict
:
name_property
=
clsdict
[
"name"
]
actual_fset
=
name_property
.
fset
def
updating_fset
(
self
,
value
):
print
(
f
"updating name of
{
cls
.
__name__
}
/
{
self
.
name
}
in cached prefix"
)
if
self
.
name
in
cls
:
del
cls
[
self
.
name
]
actual_fset
(
self
,
value
)
cls
[
self
.
name
]
=
self
name_property
=
property
(
name_property
.
fget
,
updating_fset
,
name_property
.
fdel
,
name_property
.
__doc__
,
)
setattr
(
cls
,
"name"
,
name_property
)
def
__contains__
(
cls
,
key
):
return
f
"
{
cls
.
asset_folder
}
/
{
key
}
"
in
Prefix
()
...
...
@@ -191,3 +234,8 @@ class PrefixMeta(type):
folder
=
f
"
{
cls
.
asset_folder
}
/
{
key
}
"
prefix
=
Prefix
()
prefix
[
folder
]
=
value
def
__delitem__
(
cls
,
key
):
folder
=
f
"
{
cls
.
asset_folder
}
/
{
key
}
"
prefix
=
Prefix
()
del
prefix
[
folder
]
beat/backend/python/data.py
View file @
8fbee90b
...
...
@@ -351,7 +351,7 @@ class CachedDataSource(DataSource):
)
self
.
dataformat
=
algo
.
result_dataformat
()
else
:
self
.
dataformat
=
DataFormat
(
self
.
prefix
,
dataformat_name
)
self
.
dataformat
=
DataFormat
[
dataformat_name
]
if
not
self
.
dataformat
.
valid
:
raise
RuntimeError
(
...
...
@@ -614,7 +614,7 @@ class DatabaseOutputDataSource(DataSource):
self
.
output_name
=
output_name
self
.
pack
=
pack
self
.
dataformat
=
DataFormat
(
self
.
prefix
,
dataformat_name
)
self
.
dataformat
=
DataFormat
[
dataformat_name
]
if
not
self
.
dataformat
.
valid
:
raise
RuntimeError
(
"the dataformat `%s' is not valid"
%
dataformat_name
)
...
...
@@ -731,7 +731,7 @@ class RemoteDataSource(DataSource):
self
.
input_name
=
input_name
self
.
unpack
=
unpack
self
.
dataformat
=
DataFormat
(
prefix
,
dataformat_name
)
self
.
dataformat
=
DataFormat
[
dataformat_name
]
if
not
self
.
dataformat
.
valid
:
raise
RuntimeError
(
"the dataformat `%s' is not valid"
%
dataformat_name
)
...
...
beat/backend/python/database.py
View file @
8fbee90b
...
...
@@ -60,6 +60,9 @@ from .exceptions import OutputError
from
.outputs
import
OutputList
from
.protocoltemplate
import
ProtocolTemplate
DATABASE_TYPE
=
"database"
DATABASE_FOLDER
=
"databases"
# ----------------------------------------------------------
...
...
@@ -73,20 +76,21 @@ class Storage(utils.CodeStorage):
"""
asset_type
=
"database"
asset_folder
=
"databases"
asset_type
=
DATABASE_TYPE
asset_folder
=
DATABASE_FOLDER
def
__init__
(
self
,
name
):
def
__init__
(
self
,
name
,
prefix
=
None
):
if
name
.
count
(
"/"
)
!=
1
:
raise
RuntimeError
(
"invalid database name: `%s'"
%
name
)
self
.
name
,
self
.
version
=
name
.
split
(
"/"
)
self
.
fullname
=
name
if
prefix
is
None
:
prefix
=
config
.
get_config
()[
"prefix"
]
self
.
prefix
=
prefix
path
=
os
.
path
.
join
(
config
.
get_config
()[
"prefix"
],
self
.
asset_folder
,
name
+
".json"
)
path
=
os
.
path
.
join
(
prefix
,
self
.
asset_folder
,
name
+
".json"
)
path
=
path
[:
-
5
]
# views are coded in Python
super
(
Storage
,
self
).
__init__
(
path
,
"python"
)
...
...
@@ -217,7 +221,7 @@ class Runner(object):
# ----------------------------------------------------------
class
Database
(
object
):
class
Database
(
metaclass
=
config
.
PrefixMeta
):
"""Databases define the start point of the dataflow in an experiment.
...
...
@@ -234,12 +238,16 @@ class Database(object):
"""
asset_type
=
DATABASE_TYPE
asset_folder
=
DATABASE_FOLDER
def
_init
(
self
):
self
.
_name
=
None
self
.
dataformats
=
{}
# preloaded dataformats
self
.
storage
=
None
self
.
errors
=
[]
self
.
data
=
None
return
self
def
__init__
(
self
,
name
):
...
...
@@ -256,8 +264,7 @@ class Database(object):
schema_version
=
2
,
root_folder
=
"/foo/bar"
,
):
self
=
cls
.
__new__
(
cls
)
self
.
_init
()
self
=
cls
.
__new__
(
cls
).
_init
()
if
not
name
:
raise
ValueError
(
f
"Invalid
{
name
}
. The name should be a non-empty string!"
)
...
...
@@ -275,7 +282,7 @@ class Database(object):
for
i
,
proto
in
enumerate
(
protocols
):
protocols
[
i
][
"template"
]
=
protocoltemplate_name
(
proto
[
"template"
])
data
=
dict
(
protocols
=
protocols
)
data
=
dict
(
protocols
=
protocols
,
root_folder
=
root_folder
)
if
description
is
not
None
:
data
[
"description"
]
=
description
if
schema_version
is
not
None
:
...
...
@@ -283,12 +290,11 @@ class Database(object):
self
.
data
=
data
self
.
storage
=
Storage
(
name
)
# save the code into storage
self
.
storage
=
Storage
(
self
.
name
)
self
.
code_path
=
self
.
storage
.
code
.
path
=
code_path
with
open
(
code_path
,
"rt"
)
as
f
:
self
.
storage
.
code
.
save
(
f
.
read
())
self
.
code_path
=
self
.
storage
.
code
.
path
self
.
code
=
self
.
storage
.
code
.
load
()
self
.
code
=
self
.
storage
.
code
.
contents
=
f
.
read
()
self
.
storage
.
json
.
contents
=
str
(
self
)
self
.
_load_v2
()
...
...
@@ -496,7 +502,8 @@ class Database(object):
protocol
=
self
.
protocol
(
protocol_name
)
template_name
=
protocol
[
"template"
]
protocol_template
=
ProtocolTemplate
[
template_name
]