Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.core
Commits
0569936c
Commit
0569936c
authored
Jun 04, 2018
by
Samuel GAIST
Browse files
Merge branch 'improve_documentation' into '1.6.x'
Improve documentation See merge request
!25
parents
72ae9448
e3001bc0
Pipeline
#20806
passed with stages
in 49 minutes and 25 seconds
Changes
26
Pipelines
2
Expand all
Hide whitespace changes
Inline
Side-by-side
beat/core/algorithm.py
View file @
0569936c
...
...
@@ -25,15 +25,21 @@
# #
###############################################################################
"""
=========
algorithm
=========
"""Validation for algorithms"""
Validation for algorithms
Forward importing from :py:mod:`beat.backend.python.algorithm`
:py:class:`beat.backend.python.algorithm.Storage`
:py:class:`beat.backend.python.algorithm.Runner`
"""
import
os
import
sys
import
six
import
numpy
import
simplejson
from
.
import
dataformat
from
.
import
library
...
...
@@ -46,14 +52,13 @@ from beat.backend.python.algorithm import Runner
from
beat.backend.python.algorithm
import
Algorithm
as
BackendAlgorithm
class
Algorithm
(
BackendAlgorithm
):
"""Algorithms represent runnable components within the platform.
This class can only parse the meta-parameters of the algorithm (i.e., input
and output declaration, grouping, synchronization details, parameters and
splittability). The actual algorithm is not directly treated by this class
-
i
t can, however, provide you with a loader for actually running the
splittability). The actual algorithm is not directly treated by this class
.
I
t can, however, provide you with a loader for actually running the
algorithmic code (see :py:meth:`.runner`).
...
...
@@ -93,33 +98,34 @@ class Algorithm(BackendAlgorithm):
storage (object): A simple object that provides information about file
paths for this algorithm
dataformats (dict): A dictionary containing all pre-loaded dataformats
used
by this algorithm. Data format objects will be of type
dataformats (dict): A dictionary containing all pre-loaded dataformats
used
by this algorithm. Data format objects will be of type
:py:class:`beat.core.dataformat.DataFormat`.
libraries (dict): A mapping object defining other libraries this
algorithm
needs to load so it can work properly.
libraries (dict): A mapping object defining other libraries this
algorithm
needs to load so it can work properly.
uses (dict): A mapping object defining the required library import name
(keys) and the full-names (values).
parameters (dict): A dictionary containing all pre-defined parameters
that
this algorithm accepts.
parameters (dict): A dictionary containing all pre-defined parameters
that
this algorithm accepts.
splittable (bool): A boolean value that indicates if this algorithm is
automatically parallelizeable by our backend.
input_map (dict): A dictionary where the key is the input name and the
value, its type. All input names (potentially from different groups)
are
comprised in this dictionary.
value, its type. All input names (potentially from different groups)
are
comprised in this dictionary.
output_map (dict): A dictionary where the key is the output name and the
value, its type. All output names (potentially from different groups)
are
comprised in this dictionary.
value, its type. All output names (potentially from different groups)
are
comprised in this dictionary.
results (dict): If this algorithm is actually an analyzer (i.e., there are
no formal outputs, but results that must be saved by the platform), then
this dictionary contains the names and data types of those elements.
results (dict): If this algorithm is actually an analyzer (i.e., there
are no formal outputs, but results that must be saved by the platform),
then this dictionary contains the names and data types of those
elements.
groups (dict): A list containing dictionaries with inputs and outputs
belonging to the same synchronization group.
...
...
@@ -148,21 +154,21 @@ class Algorithm(BackendAlgorithm):
self
.
_name
=
None
self
.
storage
=
None
self
.
dataformats
=
{}
# preloaded dataformats
self
.
libraries
=
{}
# preloaded libraries
self
.
dataformats
=
{}
# preloaded dataformats
self
.
libraries
=
{}
# preloaded libraries
code
=
None
if
data
is
None
:
#
loads prototype and validates it
if
data
is
None
:
#
loads prototype and validates it
data
=
None
code
=
None
elif
isinstance
(
data
,
(
tuple
,
list
)):
#
user has passed individual info
elif
isinstance
(
data
,
(
tuple
,
list
)):
#
user has passed individual info
data
,
code
=
data
#
break down into two components
data
,
code
=
data
#
break down into two components
if
isinstance
(
data
,
six
.
string_types
):
#
user has passed a file pointer
if
isinstance
(
data
,
six
.
string_types
):
#
user has passed a file pointer
self
.
_name
=
data
self
.
storage
=
Storage
(
self
.
prefix
,
self
.
_name
)
...
...
@@ -170,21 +176,21 @@ class Algorithm(BackendAlgorithm):
self
.
errors
.
append
(
'Algorithm declaration file not found: %s'
%
data
)
return
data
=
self
.
storage
.
json
.
path
#
loads data from JSON declaration
data
=
self
.
storage
.
json
.
path
#
loads data from JSON declaration
# At this point, `data' can be a dictionary or ``None``
if
data
is
None
:
# loads the default declaration for an algorithm
if
data
is
None
:
# loads the default declaration for an algorithm
self
.
data
,
self
.
errors
=
prototypes
.
load
(
'algorithm'
)
assert
not
self
.
errors
,
"
\n
* %s"
%
"
\n
*"
.
join
(
self
.
errors
)
else
:
# just assign it
else
:
# just assign it
# this runs basic validation, including JSON loading if required
self
.
data
,
self
.
errors
=
schema
.
validate
(
'algorithm'
,
data
)
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
storage
is
not
None
:
#
loading from the disk, check code
if
self
.
storage
is
not
None
:
#
loading from the disk, check code
if
not
self
.
storage
.
code
.
exists
():
if
self
.
data
[
'language'
]
!=
'cxx'
:
self
.
errors
.
append
(
'Algorithm code not found: %s'
%
\
...
...
@@ -195,15 +201,15 @@ class Algorithm(BackendAlgorithm):
# At this point, `code' can be a string (or a binary blob) or ``None``
if
code
is
None
:
# loads the default code for an algorithm
if
code
is
None
:
# loads the default code for an algorithm
self
.
code
=
prototypes
.
binary_load
(
'algorithm.py'
)
self
.
data
[
'language'
]
=
'python'
else
:
# just assign it - notice that in this case, no language is set
else
:
# just assign it - notice that in this case, no language is set
self
.
code
=
code
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
errors
:
return
#
don't proceed with the rest of validation
# if no errors so far, make sense out of the declaration data
...
...
@@ -255,11 +261,11 @@ class Algorithm(BackendAlgorithm):
for
name
,
input
in
group
[
'inputs'
].
items
():
if
input
[
'type'
]
in
self
.
dataformats
:
continue
if
dataformat_cache
and
input
[
'type'
]
in
dataformat_cache
:
#
reuse
if
dataformat_cache
and
input
[
'type'
]
in
dataformat_cache
:
#
reuse
thisformat
=
dataformat_cache
[
input
[
'type'
]]
else
:
#
load it
else
:
#
load it
thisformat
=
dataformat
.
DataFormat
(
self
.
prefix
,
input
[
'type'
])
if
dataformat_cache
is
not
None
:
#
update it
if
dataformat_cache
is
not
None
:
#
update it
dataformat_cache
[
input
[
'type'
]]
=
thisformat
self
.
dataformats
[
input
[
'type'
]]
=
thisformat
...
...
@@ -275,11 +281,11 @@ class Algorithm(BackendAlgorithm):
for
name
,
output
in
group
[
'outputs'
].
items
():
if
output
[
'type'
]
in
self
.
dataformats
:
continue
if
dataformat_cache
and
output
[
'type'
]
in
dataformat_cache
:
#
reuse
if
dataformat_cache
and
output
[
'type'
]
in
dataformat_cache
:
#
reuse
thisformat
=
dataformat_cache
[
output
[
'type'
]]
else
:
#
load it
else
:
#
load it
thisformat
=
dataformat
.
DataFormat
(
self
.
prefix
,
output
[
'type'
])
if
dataformat_cache
is
not
None
:
#
update it
if
dataformat_cache
is
not
None
:
#
update it
dataformat_cache
[
output
[
'type'
]]
=
thisformat
self
.
dataformats
[
output
[
'type'
]]
=
thisformat
...
...
@@ -298,11 +304,11 @@ class Algorithm(BackendAlgorithm):
if
result
[
'type'
]
in
self
.
dataformats
:
continue
if
dataformat_cache
and
result
[
'type'
]
in
dataformat_cache
:
#
reuse
if
dataformat_cache
and
result
[
'type'
]
in
dataformat_cache
:
#
reuse
thisformat
=
dataformat_cache
[
result
[
'type'
]]
else
:
thisformat
=
dataformat
.
DataFormat
(
self
.
prefix
,
result
[
'type'
])
if
dataformat_cache
is
not
None
:
#
update it
if
dataformat_cache
is
not
None
:
#
update it
dataformat_cache
[
result
[
'type'
]]
=
thisformat
self
.
dataformats
[
result
[
'type'
]]
=
thisformat
...
...
@@ -315,7 +321,8 @@ class Algorithm(BackendAlgorithm):
def
_convert_parameter_types
(
self
):
"""Converts types to numpy equivalents, checks defaults, ranges and choices
"""Converts types to numpy equivalents, checks defaults, ranges and
choices
"""
def
_try_convert
(
name
,
tp
,
value
,
desc
):
...
...
@@ -352,14 +359,14 @@ class Algorithm(BackendAlgorithm):
parameter
[
'default'
]
=
_try_convert
(
name
,
parameter
[
'type'
],
parameter
[
'default'
],
'default'
)
if
'range'
in
parameter
:
#
check range
if
'range'
in
parameter
:
#
check range
if
parameter
[
'default'
]
<
parameter
[
'range'
][
0
]
or
\
parameter
[
'default'
]
>
parameter
[
'range'
][
1
]:
self
.
errors
.
append
(
"default for parameter `%s' (%r) is not "
\
"within parameter range [%r, %r]"
%
(
name
,
parameter
[
'default'
],
parameter
[
'range'
][
0
],
parameter
[
'range'
][
1
]))
if
'choice'
in
parameter
:
#
check choices
if
'choice'
in
parameter
:
#
check choices
if
parameter
[
'default'
]
not
in
parameter
[
'choice'
]:
self
.
errors
.
append
(
"default for parameter `%s' (%r) is not "
\
"a valid choice `[%s]'"
%
(
name
,
parameter
[
'default'
],
...
...
@@ -385,13 +392,13 @@ class Algorithm(BackendAlgorithm):
def
_check_language_consistence
(
self
):
# all used libraries must be programmed with the same language
if
self
.
language
==
'unknown'
:
return
#
bail out on unknown language
if
self
.
language
==
'unknown'
:
return
#
bail out on unknown language
if
self
.
uses
:
for
name
,
library
in
self
.
uses
.
items
():
if
library
not
in
self
.
libraries
:
continue
#
invalid
if
library
not
in
self
.
libraries
:
continue
#
invalid
if
self
.
libraries
[
library
].
data
is
None
:
self
.
errors
.
append
(
"language for used library `%s' cannot be "
\
...
...
beat/core/baseformat.py
View file @
0569936c
...
...
@@ -25,5 +25,12 @@
# #
###############################################################################
"""
==========
baseformat
==========
Froward imports from :py:mod:`beat.backend.python.baseformat`
"""
from
beat.backend.python.baseformat
import
*
beat/core/data.py
View file @
0569936c
...
...
@@ -25,6 +25,25 @@
# #
###############################################################################
"""
====
data
====
Forward importing from :py:mod:`beat.backend.python.data`:
:py:func:`beat.backend.python.data.mixDataIndices`
:py:func:`beat.backend.python.data.getAllFilenames`
:py:class:`beat.backend.python.data.DataSource`
:py:class:`beat.backend.python.data.CachedDataSource`
:py:class:`beat.backend.python.data.DatabaseOutputDataSource`
:py:class:`beat.backend.python.data.RemoteDataSource`
:py:class:`beat.backend.python.data.DataSink`
:py:class:`beat.backend.python.data.CachedDataSink`
:py:class:`beat.backend.python.data.StdoutDataSink`
:py:func:`beat.backend.python.data.load_data_index`
:py:func:`beat.backend.python.data.load_data_index_db`
:py:func:`beat.backend.python.data.foundSplitRanges`
"""
from
beat.backend.python.data
import
mixDataIndices
from
beat.backend.python.data
import
getAllFilenames
...
...
beat/core/data_loaders.py
View file @
0569936c
...
...
@@ -25,6 +25,17 @@
# #
###############################################################################
"""
============
data_loaders
============
Forward importing from :py:mod:`beat.backend.python.data_loaders`
:py:class:`beat.backend.python.data_loaders.DataLoaderList`
:py:class:`beat.backend.python.data_loaders.DataLoader`
:py:class:`beat.backend.python.data_loaders.DataView`
"""
from
beat.backend.python.data_loaders
import
DataLoaderList
from
beat.backend.python.data_loaders
import
DataLoader
...
...
beat/core/database.py
View file @
0569936c
...
...
@@ -26,29 +26,30 @@
###############################################################################
"""Validation of databases"""
"""
========
database
========
Validation of databases
Forward importing from :py:mod:`beat.backend.python.database`:
:py:class:`beat.backend.python.database.Storage`
"""
import
os
import
sys
import
collections
import
six
import
simplejson
from
.
import
schema
from
.
import
loader
from
.dataformat
import
DataFormat
from
.
import
hash
from
.
import
utils
from
.
import
prototypes
from
beat.backend.python.database
import
Storage
from
beat.backend.python.database
import
View
from
beat.backend.python.database
import
Database
as
BackendDatabase
class
Database
(
BackendDatabase
):
"""Databases define the start point of the dataflow in an experiment.
...
...
@@ -58,9 +59,9 @@ class Database(BackendDatabase):
prefix (str): Establishes the prefix of your installation.
data (dict, str): The piece of data representing the database. It must
validate against the schema defined for databases. If a string is
passed,
it is supposed to be a valid path to an database in the
designated prefix
area.
validate against the schema defined for databases. If a string is
passed,
it is supposed to be a valid path to an database in the
designated prefix
area.
dataformat_cache (:py:class:`dict`, Optional): A dictionary mapping
dataformat names to loaded dataformats. This parameter is optional and,
...
...
@@ -99,14 +100,14 @@ class Database(BackendDatabase):
self
.
_name
=
None
self
.
storage
=
None
self
.
dataformats
=
{}
# preloaded dataformats
self
.
dataformats
=
{}
# preloaded dataformats
code
=
None
if
isinstance
(
data
,
(
tuple
,
list
)):
#
user has passed individual info
if
isinstance
(
data
,
(
tuple
,
list
)):
#
user has passed individual info
data
,
code
=
data
#
break down into two components
data
,
code
=
data
#
break down into two components
if
isinstance
(
data
,
six
.
string_types
):
#
user has passed a file pointer
if
isinstance
(
data
,
six
.
string_types
):
#
user has passed a file pointer
self
.
_name
=
data
self
.
storage
=
Storage
(
self
.
prefix
,
self
.
_name
)
...
...
@@ -118,9 +119,9 @@ class Database(BackendDatabase):
# this runs basic validation, including JSON loading if required
self
.
data
,
self
.
errors
=
schema
.
validate
(
'database'
,
data
)
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
storage
is
not
None
:
#
loading from the disk, check code
if
self
.
storage
is
not
None
:
#
loading from the disk, check code
if
not
self
.
storage
.
code
.
exists
():
self
.
errors
.
append
(
'Database view code not found: %s'
%
\
self
.
storage
.
code
.
path
)
...
...
@@ -130,14 +131,14 @@ class Database(BackendDatabase):
# At this point, `code' can be a string (or a binary blob) or ``None``
if
code
is
None
:
# loads the default code for an algorithm
if
code
is
None
:
# loads the default code for an algorithm
self
.
code
=
prototypes
.
binary_load
(
'view.py'
)
else
:
# just assign it - notice that in this case, no language is set
self
.
code
=
code
if
self
.
errors
:
return
#
don't proceed with the rest of validation
if
self
.
errors
:
return
#
don't proceed with the rest of validation
self
.
_validate_semantics
(
dataformat_cache
)
...
...
@@ -169,7 +170,7 @@ class Database(BackendDatabase):
if
value
in
self
.
dataformats
:
continue
if
value
in
dataformat_cache
:
#
re-use
if
value
in
dataformat_cache
:
#
re-use
dataformat
=
dataformat_cache
[
value
]
else
:
dataformat
=
DataFormat
(
self
.
prefix
,
value
)
...
...
beat/core/dataformat.py
View file @
0569936c
...
...
@@ -26,7 +26,16 @@
###############################################################################
"""Validation and parsing for dataformats"""
"""
==========
dataformat
==========
Validation and parsing for dataformats
Forward importing from :py:mod:`beat.backend.python.dataformat`:
:py:class:`beat.backend.python.dataformat.Storage`
"""
import
os
import
copy
...
...
beat/core/dock.py
View file @
0569936c
...
...
@@ -25,25 +25,32 @@
# #
###############################################################################
"""
====
dock
====
Docker helper classes
"""
import
os
import
six
import
simplejson
import
socket
import
tarfile
import
tempfile
import
time
import
docker
import
subprocess
as
sp
import
logging
logger
=
logging
.
getLogger
(
__name__
)
from
.
import
stats
from
beat.core
import
stats
logger
=
logging
.
getLogger
(
__name__
)
class
Host
(
object
):
'''An object of this class can connect to the docker host and resolve stuff'''
"""An object of this class can connect to the docker host and resolve stuff
"""
images_cache
=
{}
...
...
@@ -86,14 +93,14 @@ class Host(object):
def
env2docker
(
self
,
key
):
'''
Returns a nice docker image name given a BEAT environment key
'''
"""
Returns a nice docker image name given a BEAT environment key
"""
attrs
=
self
.
processing_environments
[
key
]
return
attrs
[
'image'
]
def
db2docker
(
self
,
db_names
):
'''
Returns a nice docker image name given a database name
'''
"""
Returns a nice docker image name given a database name
"""
def
_all_in
(
db_names
,
databases
):
return
len
([
x
for
x
in
db_names
if
x
in
databases
])
==
len
(
db_names
)
...
...
@@ -125,14 +132,14 @@ class Host(object):
@
property
def
ip
(
self
):
'''
The IP address of the docker host
'''
"""
The IP address of the docker host
"""
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_DGRAM
)
s
.
connect
((
'8.8.8.8'
,
1
))
# connecting to a UDP address doesn't send packets
return
s
.
getsockname
()[
0
]
def
_discover_environments
(
self
):
'''
Returns a dictionary containing information about docker environments
"""
Returns a dictionary containing information about docker environments
Raises:
...
...
@@ -140,10 +147,10 @@ class Host(object):
found environments that override each other for their description
keys (``<name>(<version>)``), which should be unique.
'''
"""
def
_describe
(
image
):
'''
Tries to run the "describe" app on the image, collect results
'''
"""
Tries to run the "describe" app on the image, collect results
"""
if
image
in
Host
.
images_cache
:
return
Host
.
images_cache
[
image
]
...
...
@@ -174,8 +181,8 @@ class Host(object):
def
_must_replace
(
image
,
environments
,
key
):
# this check avoids we do a new environment and, by mistake,
override
# it with a previous version or the contrary.
# this check avoids we do a new environment and, by mistake,
#
override
it with a previous version or the contrary.
if
self
.
raise_on_errors
:
raise
RuntimeError
(
"Environments at '%s' and '%s' have the "
\
"same name ('%s'). Distinct environments must be "
\
...
...
@@ -293,7 +300,7 @@ class Host(object):
def
create_container
(
self
,
image
,
command
):
if
image
in
self
:
# Replace by a real image name
if
image
in
self
:
# Replace by a real image name
image
=
self
.
env2docker
(
image
)
return
Container
(
image
,
command
)
...
...
@@ -303,8 +310,8 @@ class Host(object):
"""Starts the execution of a container
The process will be run in the background, and its standard output and
standard error will be read after it finishes, into a limited size
circular
buffer.
standard error will be read after it finishes, into a limited size
circular
buffer.
Parameters:
...
...
@@ -319,7 +326,6 @@ class Host(object):
CPU the user process may consume on the host. The value ``100``
equals to using 100% of a single core. If not specified, then a CPU
limitation is not put in place.
"""
cmd
=
[
...
...
@@ -335,32 +341,35 @@ class Host(object):
if
virtual_memory_in_megabytes
:
# For this to work properly, memory swap limitation has to be enabled on
# the kernel. This typically goes by setting "cgroup_enable=memory" as a
# boot parameter to kernels which are compiled with this support.
# For this to work properly, memory swap limitation has to be
# enabled on the kernel. This typically goes by setting
# "cgroup_enable=memory" as a boot parameter to kernels which are
# compiled with this support.
# More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting
logger
.
debug
(
'Setting maximum memory to %dMB'
%
virtual_memory_in_megabytes
)
cmd
.
append
(
'--memory=%dm'
%
virtual_memory_in_megabytes
)
cmd
.
append
(
'--memory-swap=%dm'
%
virtual_memory_in_megabytes
)
if
max_cpu_percent
:
# The period corresponds to the scheduling interval for the CFS in Linux
# The quota corresponds to a fraction or a multiple of the period, the
# container will get. A quota that is 2x the period gets the container up
# to 200% cpu time (2 cores). If the quota is 0.5x the period, the
# container gets up to 50% the cpu time. Each core represents 100%. A
# system with 2 cores has 200% computing power.
# The period corresponds to the scheduling interval for the CFS in
# Linux. The quota corresponds to a fraction or a multiple of the
# period, the container will get. A quota that is 2x the period
# gets the container up to 200% cpu time (2 cores). If the quota is
# 0.5x the period, the container gets up to 50% the cpu time. Each
# core represents 100%. A system with 2 cores has 200% computing
# power.
#
# More info:
# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
#
# For this to work properly, CPU bandwidth provisioning for the Linux CFS
# must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
# For this to work properly, CPU bandwidth provisioning for the
# Linux CFS must be enabled on the kernel. More info on how to do
# it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/
#
# If your system is running on a virtual machine, having more cores
# available to docker engine normally translates to more precise
# scheduling.
period
=
100000
#
microseconds
period
=
100000
#
microseconds
quota
=
max_cpu_percent
/
100.0
logger
.
debug
(
'Setting CPU quota to %d%%'
%
max_cpu_percent
)
...
...
@@ -410,14 +419,15 @@ class Host(object):
def
wait
(
self
,
container
,
timeout
=
None
):
'''
Wait for the container to finish its job
"""
Wait for the container to finish its job
Parameters:
timeout (:py:class:`float`, Optional): A timeout in seconds to wait
for the user process to finish. If a timeout value is not given,
waits forever.
'''
"""
(
status
,
stdout
,
stderr
)
=
self
.
_exec
([
'docker'
,
'wait'
,
container
.
id
],
timeout
=
timeout
)
if
status
!=
0
:
...
...
@@ -427,7 +437,7 @@ class Host(object):
def
status
(
self
,
container
):
'''
Checks the status of a given container
'''
"""
Checks the status of a given container
"""
logger
.
debug
(
"Inspect container %s"
,
container
.
id
)
...
...
@@ -442,7 +452,8 @@ class Host(object):
def
logs
(
self
,
container
):