Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.web
Commits
40b8919b
Commit
40b8919b
authored
Sep 11, 2020
by
Flavio TARSETTI
Browse files
Merge branch 'cleanup_scripts' into 'django3_migration'
Scripts cleanup See merge request
!360
parents
8d888733
648b32d5
Pipeline
#42673
passed with stage
in 14 minutes and 58 seconds
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
beat/web/scripts/scheduler.py
View file @
40b8919b
...
...
@@ -64,22 +64,22 @@ Examples:
level is set to ``WARNING`` if no ``-v`` flag is passed.
"""
import
logging
import
os
import
sys
import
signal
import
sys
import
docopt
import
logging
import
simplejson
from
..version
import
__version__
from
beat.core.worker
import
WorkerController
from
..version
import
__version__
logger
=
None
#----------------------------------------------------------
#
----------------------------------------------------------
def
onWorkerReady
(
name
):
...
...
@@ -90,14 +90,16 @@ def onWorkerReady(name):
try
:
worker
=
Worker
.
objects
.
get
(
name
=
name
)
worker
.
active
=
True
worker
.
info
=
'
Connected to the scheduler
'
worker
.
info
=
"
Connected to the scheduler
"
worker
.
save
()
except
:
import
traceback
;
print
(
traceback
.
format_exc
())
except
Exception
:
import
traceback
print
(
traceback
.
format_exc
())
logger
.
error
(
"No worker named '%s' found in the database"
,
name
)
#----------------------------------------------------------
#
----------------------------------------------------------
def
onWorkerGone
(
name
):
...
...
@@ -108,27 +110,28 @@ def onWorkerGone(name):
try
:
worker
=
Worker
.
objects
.
get
(
name
=
name
)
worker
.
active
=
False
worker
.
info
=
'
Disconnected from the scheduler
'
worker
.
info
=
"
Disconnected from the scheduler
"
worker
.
save
()
except
:
except
Exception
:
logger
.
error
(
"No worker named '%s' found in the database"
,
name
)
#----------------------------------------------------------
#
----------------------------------------------------------
def
remove_split_id_from
(
list
,
split_id
):
try
:
list
.
remove
(
list
.
index
(
split_id
))
except
:
except
ValueError
:
pass
#----------------------------------------------------------
#
----------------------------------------------------------
stop
=
False
def
main
(
user_input
=
None
):
# Parse the command-line arguments
...
...
@@ -138,50 +141,46 @@ def main(user_input=None):
arguments
=
sys
.
argv
[
1
:]
arguments
=
docopt
.
docopt
(
__doc__
%
dict
(
prog
=
os
.
path
.
basename
(
sys
.
argv
[
0
]),
),
__doc__
%
dict
(
prog
=
os
.
path
.
basename
(
sys
.
argv
[
0
]),),
argv
=
arguments
,
version
=
'
v%s
'
%
__version__
,
version
=
"
v%s
"
%
__version__
,
)
# Initialisation of the application
os
.
environ
.
setdefault
(
'DJANGO_SETTINGS_MODULE'
,
arguments
[
'--settings'
])
from
django.conf
import
settings
os
.
environ
.
setdefault
(
"DJANGO_SETTINGS_MODULE"
,
arguments
[
"--settings"
])
from
django
import
setup
setup
()
from
django.conf
import
settings
setup
()
# Importations of beat.web modules must be done after the call to django.setup()
from
..backend.models
import
JobSplit
from
..backend.models
import
Worker
from
..backend.helpers
import
split_new_jobs
from
..backend.helpers
import
process_newly_cancelled_experiments
from
..backend.helpers
import
assign_splits_to_workers
from
..backend.helpers
import
get_configuration_for_split
from
..backend.helpers
import
on_split_
start
ed
from
..backend.helpers
import
on_split_
cancell
ed
from
..backend.helpers
import
on_split_done
from
..backend.helpers
import
on_split_fail
from
..backend.helpers
import
on_split_cancelled
from
..backend.helpers
import
on_split_started
from
..backend.helpers
import
process_newly_cancelled_experiments
from
..backend.helpers
import
split_new_jobs
from
..backend.models
import
JobSplit
from
..backend.models
import
Worker
# Setup the logging
formatter
=
logging
.
Formatter
(
fmt
=
"[%(asctime)s - Scheduler - "
+
\
"%(name)s] %(levelname)s: %(message)s"
,
datefmt
=
"%d/%b/%Y %H:%M:%S"
)
formatter
=
logging
.
Formatter
(
fmt
=
"[%(asctime)s - Scheduler - "
+
"%(name)s] %(levelname)s: %(message)s"
,
datefmt
=
"%d/%b/%Y %H:%M:%S"
,
)
handler
=
logging
.
StreamHandler
()
handler
.
setFormatter
(
formatter
)
root_logger
=
logging
.
getLogger
(
'
beat.web
'
)
root_logger
=
logging
.
getLogger
(
"
beat.web
"
)
root_logger
.
handlers
=
[]
root_logger
.
addHandler
(
handler
)
if
arguments
[
'
--verbose
'
]
==
1
:
if
arguments
[
"
--verbose
"
]
==
1
:
root_logger
.
setLevel
(
logging
.
INFO
)
elif
arguments
[
'
--verbose
'
]
>=
2
:
elif
arguments
[
"
--verbose
"
]
>=
2
:
root_logger
.
setLevel
(
logging
.
DEBUG
)
else
:
root_logger
.
setLevel
(
logging
.
WARNING
)
...
...
@@ -190,7 +189,6 @@ def main(user_input=None):
logger
=
logging
.
getLogger
(
__name__
)
logger
.
handlers
=
[]
# Installs SIGTERM handler
def
handler
(
signum
,
frame
):
# Ignore further signals
...
...
@@ -204,32 +202,26 @@ def main(user_input=None):
signal
.
signal
(
signal
.
SIGTERM
,
handler
)
signal
.
signal
(
signal
.
SIGINT
,
handler
)
# Reset the status of all the workers in the database
for
worker
in
Worker
.
objects
.
filter
(
active
=
True
):
worker
.
active
=
False
worker
.
info
=
'
Did not connect to the scheduler yet
'
worker
.
info
=
"
Did not connect to the scheduler yet
"
worker
.
save
()
# Initialisation of the worker controller
# TODO: Default values
worker_controller
=
WorkerController
(
arguments
[
'--address'
],
int
(
arguments
[
'--port'
]),
callbacks
=
dict
(
onWorkerReady
=
onWorkerReady
,
onWorkerGone
=
onWorkerGone
,
)
arguments
[
"--address"
],
int
(
arguments
[
"--port"
]),
callbacks
=
dict
(
onWorkerReady
=
onWorkerReady
,
onWorkerGone
=
onWorkerGone
,),
)
# Processing loop
from
..backend.helpers
import
split_new_jobs
from
..backend.helpers
import
assign_splits_to_workers
interval
=
int
(
arguments
[
'--interval'
])
\
if
arguments
[
'--interval'
]
else
settings
.
SCHEDULING_INTERVAL
interval
=
(
int
(
arguments
[
"--interval"
])
if
arguments
[
"--interval"
]
else
settings
.
SCHEDULING_INTERVAL
)
logger
.
info
(
"Scheduling every %d seconds"
,
interval
)
running_job_splits
=
[]
...
...
@@ -257,43 +249,47 @@ def main(user_input=None):
logger
.
error
(
"Worker '%s' sent: %s"
,
address
,
data
)
continue
split_id
=
int
(
split_id
)
# Retrieve the job split
try
:
split
=
JobSplit
.
objects
.
get
(
id
=
split_id
)
except
:
logger
.
error
(
"Received message '%s' for unknown job split #%d"
,
status
,
split_id
)
except
JobSplit
.
DoesNotExist
:
logger
.
error
(
"Received message '%s' for unknown job split #%d"
,
status
,
split_id
)
continue
# Is the job done?
if
status
==
WorkerController
.
DONE
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
)
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
on_split_done
(
split
,
simplejson
.
loads
(
data
[
0
]))
remove_split_id_from
(
running_job_splits
,
split_id
)
# Has the job failed?
elif
status
==
WorkerController
.
JOB_ERROR
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
)
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
try
:
error
=
simplejson
.
loads
(
data
[
0
])
except
:
except
Exception
:
error
=
data
[
0
]
splits_to_cancel
.
extend
(
on_split_fail
(
split
,
error
))
...
...
@@ -301,13 +297,15 @@ def main(user_input=None):
# Was the job cancelled?
elif
status
==
WorkerController
.
CANCELLED
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
)
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
on_split_cancelled
(
split
)
remove_split_id_from
(
cancelling_jobs
,
split_id
)
...
...
@@ -315,13 +313,16 @@ def main(user_input=None):
# Was there an error?
elif
status
==
WorkerController
.
ERROR
:
if
split_id
in
running_job_splits
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
data
[
0
])
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
data
[
0
],
)
splits_to_cancel
.
extend
(
on_split_fail
(
split
,
data
[
0
]))
remove_split_id_from
(
running_job_splits
,
split_id
)
...
...
@@ -332,15 +333,19 @@ def main(user_input=None):
# Cancel the necessary jobs (if any)
for
split_to_cancel
in
splits_to_cancel
:
if
split_to_cancel
.
id
in
running_job_splits
:
logger
.
info
(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'"
,
split_to_cancel
.
id
,
split_to_cancel
.
job
.
block
.
name
,
split_to_cancel
.
split_index
,
split_to_cancel
.
job
.
splits
.
count
(),
split_to_cancel
.
job
.
block
.
experiment
.
fullname
(),
split_to_cancel
.
worker
.
name
)
worker_controller
.
cancel
(
split_to_cancel
.
worker
.
name
,
split_to_cancel
.
id
)
logger
.
info
(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'"
,
split_to_cancel
.
id
,
split_to_cancel
.
job
.
block
.
name
,
split_to_cancel
.
split_index
,
split_to_cancel
.
job
.
splits
.
count
(),
split_to_cancel
.
job
.
block
.
experiment
.
fullname
(),
split_to_cancel
.
worker
.
name
,
)
worker_controller
.
cancel
(
split_to_cancel
.
worker
.
name
,
split_to_cancel
.
id
)
remove_split_id_from
(
running_job_splits
,
split_to_cancel
.
id
)
cancelling_jobs
.
append
(
split_to_cancel
.
id
)
...
...
@@ -357,18 +362,19 @@ def main(user_input=None):
configuration
=
get_configuration_for_split
(
split
)
logger
.
info
(
"Starting job split #%d (%s %d/%d @ %s) on '%s'"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
)
logger
.
info
(
"Starting job split #%d (%s %d/%d @ %s) on '%s'"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
worker_controller
.
execute
(
split
.
worker
.
name
,
split
.
id
,
configuration
)
on_split_started
(
split
)
# Cleanup
logger
.
info
(
"Gracefully exiting the scheduler"
)
worker_controller
.
destroy
()
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment