Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.core
Commits
0ae10fb4
Commit
0ae10fb4
authored
Feb 20, 2020
by
Flavio TARSETTI
Browse files
Merge branch 'improve_disconnection_handling' into 'master'
Improve disconnection handling See merge request
!112
parents
85f2ddf8
a1b40d53
Pipeline
#37583
passed with stages
in 17 minutes and 29 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
beat/core/bcp/worker.py
View file @
0ae10fb4
...
...
@@ -253,6 +253,7 @@ def run(
logger
.
info
(
"The scheduler shut down, we will wait for it"
)
worker
.
destroy
()
worker
.
send_to_broker
(
BCP
.
BCPW_DISCONNECT
)
worker
.
destroy
()
# Cleanup
for
execution_process
in
execution_processes
:
...
...
beat/core/bcpapi/broker.py
View file @
0ae10fb4
...
...
@@ -95,6 +95,7 @@ class BeatComputationBroker(object):
def
__init__
(
self
,
verbose
=
False
):
"""Initialize broker state."""
self
.
verbose
=
verbose
self
.
continue_
=
True
self
.
services
=
{}
...
...
@@ -163,14 +164,17 @@ class BeatComputationBroker(object):
def
process_client
(
self
,
sender
,
msg
):
"""Process a request coming from a client."""
# Service name + body
assert
len
(
msg
)
>=
2
# nosec
service
=
msg
.
pop
(
0
)
# Set reply return address to client sender
msg
=
[
sender
,
b
""
]
+
msg
self
.
dispatch
(
self
.
require_service
(
service
),
msg
)
def
process_worker
(
self
,
sender
,
msg
):
"""Process message sent to us by a worker."""
# At least, command
assert
len
(
msg
)
>=
1
# nosec
...
...
@@ -222,12 +226,14 @@ class BeatComputationBroker(object):
def
delete_worker
(
self
,
worker
,
disconnect
):
"""Deletes worker from all data structures, and deletes worker."""
assert
worker
is
not
None
# nosec
if
disconnect
:
self
.
send_to_worker
(
worker
,
BCP
.
BCPW_DISCONNECT
,
None
,
None
)
if
worker
.
service
is
not
None
:
worker
.
service
.
waiting
.
remove
(
worker
)
if
worker
in
worker
.
service
.
waiting
:
worker
.
service
.
waiting
.
remove
(
worker
)
on_disconnection
=
self
.
callbacks
.
get
(
"on_disconnection"
,
None
)
if
on_disconnection
:
...
...
@@ -236,8 +242,12 @@ class BeatComputationBroker(object):
if
worker
.
identity
in
self
.
workers
:
self
.
workers
.
pop
(
worker
.
identity
)
if
worker
in
self
.
waiting
:
self
.
waiting
.
pop
(
self
.
waiting
.
index
(
worker
))
def
require_worker
(
self
,
address
):
"""Finds the worker (creates if necessary)."""
assert
address
is
not
None
# nosec
identity
=
hexlify
(
address
)
worker
=
self
.
workers
.
get
(
identity
)
...
...
@@ -251,6 +261,7 @@ class BeatComputationBroker(object):
def
require_service
(
self
,
name
):
"""Locates the service (creates if necessary)."""
assert
name
is
not
None
# nosec
service
=
self
.
services
.
get
(
name
)
if
service
is
None
:
...
...
@@ -264,11 +275,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers.
"""
self
.
socket
.
bind
(
endpoint
)
logger
.
info
(
"I: BCP broker/0.0.1 is active at %s"
,
endpoint
)
def
send_heartbeats
(
self
):
"""Send heartbeats to idle workers if it's time"""
if
time
.
time
()
>
self
.
heartbeat_at
:
for
worker
in
self
.
waiting
:
self
.
send_to_worker
(
worker
,
BCP
.
BCPW_HEARTBEAT
,
None
,
None
)
...
...
@@ -276,16 +289,16 @@ class BeatComputationBroker(object):
self
.
heartbeat_at
=
time
.
time
()
+
1e-3
*
self
.
HEARTBEAT_INTERVAL
def
purge_workers
(
self
):
"""Look for & kill expired workers.
"""
"""Look for & kill expired workers.
"""
for
item
in
self
.
waiting
:
if
item
.
expiry
<
time
.
time
():
logger
.
info
(
"I: deleting expired worker: %s"
,
item
.
identity
)
self
.
delete_worker
(
item
,
False
)
self
.
waiting
.
pop
(
self
.
waiting
.
index
(
item
))
def
worker_waiting
(
self
,
worker
):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
if
worker
not
in
self
.
waiting
:
...
...
@@ -298,10 +311,14 @@ class BeatComputationBroker(object):
def
dispatch
(
self
,
service
,
msg
):
"""Dispatch requests to waiting workers as possible"""
assert
service
is
not
None
# nosec
if
msg
is
not
None
:
# Queue message if any
service
.
requests
.
append
(
msg
)
self
.
purge_workers
()
while
service
.
waiting
and
service
.
requests
:
msg
=
service
.
requests
.
pop
(
0
)
worker
=
service
.
waiting
.
pop
(
0
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment