Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.core
Commits
e3001bc0
Commit
e3001bc0
authored
May 30, 2018
by
Samuel GAIST
Browse files
[worker] Documented module
parent
266db371
Pipeline
#20730
passed with stages
in 58 minutes and 51 seconds
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
beat/core/worker.py
View file @
e3001bc0
...
...
@@ -34,7 +34,6 @@ Worker implementation
"""
import
logging
logger
=
logging
.
getLogger
(
__name__
)
import
zmq
import
socket
...
...
@@ -42,24 +41,46 @@ import simplejson
from
.utils
import
send_multipart
class
WorkerController
(
object
):
logger
=
logging
.
getLogger
(
__name__
)
# Status codes
READY
=
b
'rdy'
EXIT
=
b
'ext'
DONE
=
b
'don'
JOB_ERROR
=
b
'erj'
ERROR
=
b
'err'
CANCELLED
=
b
'cld'
class
WorkerController
(
object
):
"""Implements the controller that will handle the workers allocated.
Constants:
Status:
:py:const:`READY`
:py:const:`EXIT`
:py:const:`DONE`
:py:const:`JOB_ERROR`
:py:const:`ERROR`
:py:const:`CANCELLED`
Commands:
:py:const:`EXECUTE`
:py:const:`CANCEL`
:py:const:`ACK`
:py:const:`SCHEDULER_SHUTDOWN`
"""
# Status code
READY
=
b
'rdy'
#: The worker is ready to be used
EXIT
=
b
'ext'
#: The worker has exited
DONE
=
b
'don'
#: The worker as successfully finished its task
JOB_ERROR
=
b
'erj'
#: The worker failed to finish its task
ERROR
=
b
'err'
#: The worker encountered an error
CANCELLED
=
b
'cld'
#: The worker's task has been canceled
# Commands
EXECUTE
=
b
'exe'
CANCEL
=
b
'cnl'
ACK
=
b
'ack'
SCHEDULER_SHUTDOWN
=
b
'shd'
EXECUTE
=
b
'exe'
#: Execute the given job
CANCEL
=
b
'cnl'
#: Cancel the given job
ACK
=
b
'ack'
#: Acknowledge
SCHEDULER_SHUTDOWN
=
b
'shd'
#: Shutdown the scheduler
class
Callbacks
(
object
):
"""Set of callbacks used when a worker is ready or went away"""
def
__init__
(
self
):
self
.
onWorkerReady
=
None
...
...
@@ -115,6 +136,15 @@ class WorkerController(object):
def
execute
(
self
,
worker
,
job_id
,
configuration
):
"""Executes the given job by the given worker using passed
configuration
Parameters:
:param str worker: Address of the worker
:param int job_id: Identifier of the job to execute
:param dict configuration: Configuration for the job
"""
parts
=
[
worker
,
WorkerController
.
EXECUTE
,
...
...
@@ -125,6 +155,13 @@ class WorkerController(object):
def
cancel
(
self
,
worker
,
job_id
):
"""Cancels the given job on the given worker
Parameters:
:param str worker: Address of the worker
:param int job_id: Identifier of the job to execute
"""
parts
=
[
worker
,
WorkerController
.
CANCEL
,
...
...
@@ -134,6 +171,12 @@ class WorkerController(object):
def
ack
(
self
,
worker
):
"""Send acknowledge to worker
Parameters:
:param str worker: Address of the worker
"""
parts
=
[
worker
,
WorkerController
.
ACK
...
...
@@ -142,6 +185,18 @@ class WorkerController(object):
def
process
(
self
,
timeout
=
0
):
"""Processing loop
Gets processing information through ZeroMQ and acts accordingly.
Parameters:
:param int timeout: Maximum time allocate for processing
Returns:
tuple: Returns a tuple containing the worker address, job_id and
corresponding data if any or None in case of error.
"""
while
True
:
socks
=
dict
(
self
.
poller
.
poll
(
timeout
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment