Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.backend.python
Commits
0549151d
Commit
0549151d
authored
Jul 11, 2019
by
Samuel GAIST
Browse files
[scripts][loop_execute] pre-commit cleanup
parent
9aedbc01
Changes
1
Hide whitespace changes
Inline
Side-by-side
beat/backend/python/scripts/loop_execute.py
View file @
0549151d
...
@@ -61,9 +61,6 @@ import logging
...
@@ -61,9 +61,6 @@ import logging
import
os
import
os
import
sys
import
sys
import
docopt
import
docopt
import
simplejson
import
pwd
import
stat
import
zmq
import
zmq
...
@@ -72,25 +69,25 @@ from beat.backend.python.execution import LoopMessageHandler
...
@@ -72,25 +69,25 @@ from beat.backend.python.execution import LoopMessageHandler
from
beat.backend.python.exceptions
import
UserError
from
beat.backend.python.exceptions
import
UserError
#----------------------------------------------------------
#
----------------------------------------------------------
def
process_traceback
(
tb
,
prefix
):
def
process_traceback
(
tb
,
prefix
):
import
traceback
import
traceback
algorithms_prefix
=
os
.
path
.
join
(
prefix
,
'
algorithms
'
)
+
os
.
sep
algorithms_prefix
=
os
.
path
.
join
(
prefix
,
"
algorithms
"
)
+
os
.
sep
for
first_line
,
line
in
enumerate
(
tb
):
for
first_line
,
line
in
enumerate
(
tb
):
if
line
[
0
].
startswith
(
algorithms_prefix
):
if
line
[
0
].
startswith
(
algorithms_prefix
):
break
break
s
=
''
.
join
(
traceback
.
format_list
(
tb
[
first_line
:]))
s
=
""
.
join
(
traceback
.
format_list
(
tb
[
first_line
:]))
s
=
s
.
replace
(
algorithms_prefix
,
''
).
strip
()
s
=
s
.
replace
(
algorithms_prefix
,
""
).
strip
()
return
s
return
s
#----------------------------------------------------------
#
----------------------------------------------------------
def
main
(
arguments
=
None
):
def
main
(
arguments
=
None
):
...
@@ -99,74 +96,69 @@ def main(arguments=None):
...
@@ -99,74 +96,69 @@ def main(arguments=None):
if
arguments
is
None
:
if
arguments
is
None
:
arguments
=
sys
.
argv
[
1
:]
arguments
=
sys
.
argv
[
1
:]
package
=
__name__
.
rsplit
(
'.'
,
2
)[
0
]
package
=
__name__
.
rsplit
(
"."
,
2
)[
0
]
version
=
package
+
' v'
+
\
version
=
package
+
" v"
+
__import__
(
"pkg_resources"
).
require
(
package
)[
0
].
version
__import__
(
'pkg_resources'
).
require
(
package
)[
0
].
version
prog
=
os
.
path
.
basename
(
sys
.
argv
[
0
])
prog
=
os
.
path
.
basename
(
sys
.
argv
[
0
])
args
=
docopt
.
docopt
(
args
=
docopt
.
docopt
(
__doc__
%
dict
(
prog
=
prog
,
version
=
version
),
__doc__
%
dict
(
prog
=
prog
,
version
=
version
),
argv
=
arguments
,
version
=
version
argv
=
arguments
,
version
=
version
)
)
# Setup the logging system
# Setup the logging system
formatter
=
logging
.
Formatter
(
fmt
=
"[%(asctime)s - loop_provider.py - "
\
formatter
=
logging
.
Formatter
(
"%(name)s] %(levelname)s: %(message)s"
,
fmt
=
"[%(asctime)s - loop_provider.py - "
"%(name)s] %(levelname)s: %(message)s"
,
datefmt
=
"%d/%b/%Y %H:%M:%S"
)
datefmt
=
"%d/%b/%Y %H:%M:%S"
,
)
handler
=
logging
.
StreamHandler
()
handler
=
logging
.
StreamHandler
()
handler
.
setFormatter
(
formatter
)
handler
.
setFormatter
(
formatter
)
root_logger
=
logging
.
getLogger
(
'
beat.backend.python
'
)
root_logger
=
logging
.
getLogger
(
"
beat.backend.python
"
)
root_logger
.
addHandler
(
handler
)
root_logger
.
addHandler
(
handler
)
if
args
[
'
--debug
'
]:
if
args
[
"
--debug
"
]:
root_logger
.
setLevel
(
logging
.
DEBUG
)
root_logger
.
setLevel
(
logging
.
DEBUG
)
else
:
else
:
root_logger
.
setLevel
(
logging
.
INFO
)
root_logger
.
setLevel
(
logging
.
INFO
)
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
# Create the message handler
# Create the message handler
message_handler
=
LoopMessageHandler
(
args
[
'
<addr>
'
])
message_handler
=
LoopMessageHandler
(
args
[
"
<addr>
"
])
context
=
None
context
=
None
db_socket
=
None
db_socket
=
None
if
args
[
'
<db_addr>
'
]:
if
args
[
"
<db_addr>
"
]:
context
=
zmq
.
Context
()
context
=
zmq
.
Context
()
db_socket
=
context
.
socket
(
zmq
.
PAIR
)
db_socket
=
context
.
socket
(
zmq
.
PAIR
)
db_socket
.
connect
(
args
[
'<db_addr>'
])
db_socket
.
connect
(
args
[
"<db_addr>"
])
logger
.
debug
(
"loop: zmq client connected to db `%s'"
,
args
[
'<db_addr>'
])
logger
.
debug
(
"loop: zmq client connected to db `%s'"
,
args
[
"<db_addr>"
])
# If necessary, change to another user (with less privileges, but has access
# If necessary, change to another user (with less privileges, but has access
# to the databases)
# to the databases)
with
open
(
os
.
path
.
join
(
args
[
'<dir>'
],
'configuration.json'
),
'r'
)
as
f
:
cfg
=
simplejson
.
load
(
f
)
try
:
try
:
# Check the dir
# Check the dir
if
not
os
.
path
.
exists
(
args
[
'
<dir>
'
]):
if
not
os
.
path
.
exists
(
args
[
"
<dir>
"
]):
raise
IOError
(
"Running directory `%s' not found"
%
args
[
'
<dir>
'
])
raise
IOError
(
"Running directory `%s' not found"
%
args
[
"
<dir>
"
])
# Sets up the execution
# Sets up the execution
try
:
try
:
loop_executor
=
LoopExecutor
(
message_handler
=
message_handler
,
loop_executor
=
LoopExecutor
(
directory
=
args
[
'<dir>'
],
message_handler
=
message_handler
,
cache_root
=
args
[
'<cache>'
],
directory
=
args
[
"<dir>"
],
db_socket
=
db_socket
)
cache_root
=
args
[
"<cache>"
],
db_socket
=
db_socket
,
)
except
(
MemoryError
):
except
(
MemoryError
):
raise
raise
except
Exception
as
e
:
except
Exception
as
e
:
import
traceback
import
traceback
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
tb
=
traceback
.
extract_tb
(
exc_traceback
)
tb
=
traceback
.
extract_tb
(
exc_traceback
)
s
=
process_traceback
(
tb
,
os
.
path
.
join
(
args
[
'
<dir>
'
],
'
prefix
'
))
s
=
process_traceback
(
tb
,
os
.
path
.
join
(
args
[
"
<dir>
"
],
"
prefix
"
))
raise
UserError
(
"%s%s: %s"
%
(
s
,
type
(
e
).
__name__
,
e
))
raise
UserError
(
"%s%s: %s"
%
(
s
,
type
(
e
).
__name__
,
e
))
try
:
try
:
...
@@ -177,6 +169,7 @@ def main(arguments=None):
...
@@ -177,6 +169,7 @@ def main(arguments=None):
raise
raise
except
Exception
as
e
:
except
Exception
as
e
:
import
traceback
import
traceback
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
tb
=
traceback
.
extract_tb
(
exc_traceback
)
tb
=
traceback
.
extract_tb
(
exc_traceback
)
s
=
process_traceback
(
tb
,
loop_executor
.
prefix
)
s
=
process_traceback
(
tb
,
loop_executor
.
prefix
)
...
@@ -191,6 +184,7 @@ def main(arguments=None):
...
@@ -191,6 +184,7 @@ def main(arguments=None):
raise
raise
except
Exception
as
e
:
except
Exception
as
e
:
import
traceback
import
traceback
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
tb
=
traceback
.
extract_tb
(
exc_traceback
)
tb
=
traceback
.
extract_tb
(
exc_traceback
)
s
=
process_traceback
(
tb
,
loop_executor
.
prefix
)
s
=
process_traceback
(
tb
,
loop_executor
.
prefix
)
...
@@ -205,29 +199,33 @@ def main(arguments=None):
...
@@ -205,29 +199,33 @@ def main(arguments=None):
raise
raise
except
Exception
as
e
:
except
Exception
as
e
:
import
traceback
import
traceback
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
exc_type
,
exc_value
,
exc_traceback
=
sys
.
exc_info
()
tb
=
traceback
.
extract_tb
(
exc_traceback
)
tb
=
traceback
.
extract_tb
(
exc_traceback
)
s
=
process_traceback
(
tb
,
os
.
path
.
join
(
args
[
'
<dir>
'
],
'
prefix
'
))
s
=
process_traceback
(
tb
,
os
.
path
.
join
(
args
[
"
<dir>
"
],
"
prefix
"
))
raise
UserError
(
"%s%s: %s"
%
(
s
,
type
(
e
).
__name__
,
e
))
raise
UserError
(
"%s%s: %s"
%
(
s
,
type
(
e
).
__name__
,
e
))
except
UserError
as
e
:
except
UserError
as
e
:
message_handler
.
send_error
(
str
(
e
),
'
usr
'
)
message_handler
.
send_error
(
str
(
e
),
"
usr
"
)
message_handler
.
destroy
()
message_handler
.
destroy
()
return
1
return
1
except
MemoryError
as
e
:
except
MemoryError
:
# Say something meaningful to the user
# Say something meaningful to the user
msg
=
"The user process for this block ran out of memory. We "
\
msg
=
(
"suggest you optimise your code to reduce memory usage or, "
\
"The user process for this block ran out of memory. We "
"if this is not an option, choose an appropriate processing "
\
"suggest you optimise your code to reduce memory usage or, "
"queue with enough memory."
"if this is not an option, choose an appropriate processing "
message_handler
.
send_error
(
msg
,
'usr'
)
"queue with enough memory."
)
message_handler
.
send_error
(
msg
,
"usr"
)
message_handler
.
destroy
()
message_handler
.
destroy
()
return
1
return
1
except
Exception
as
e
:
except
Exception
:
import
traceback
import
traceback
message_handler
.
send_error
(
traceback
.
format_exc
(),
'sys'
)
message_handler
.
send_error
(
traceback
.
format_exc
(),
"sys"
)
message_handler
.
destroy
()
message_handler
.
destroy
()
return
1
return
1
...
@@ -243,6 +241,5 @@ def main(arguments=None):
...
@@ -243,6 +241,5 @@ def main(arguments=None):
return
0
return
0
if
__name__
==
"__main__"
:
if
__name__
==
'__main__'
:
sys
.
exit
(
main
())
sys
.
exit
(
main
())
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment