Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
beat
beat.web
Commits
af8183b7
Commit
af8183b7
authored
Sep 11, 2020
by
Samuel GAIST
Browse files
[scripts] Remove the scheduler script
It's not used anymore. Part of
#567
parent
8c2840f6
Pipeline
#42711
passed with stage
in 15 minutes
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
beat/web/scripts/__init__.py
deleted
100644 → 0
View file @
8c2840f6
beat/web/scripts/scheduler.py
deleted
100755 → 0
View file @
8c2840f6
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""
\
Starts the scheduling process.
Usage:
%(prog)s [-v ... | --verbose ...] [--settings=<file>] [--interval=<seconds>]
[--address=<address>] [--port=<port>]
%(prog)s (-h | --help)
%(prog)s (-V | --version)
Options:
-h, --help Show this help message
-V, --version Show program's version number
-v, --verbose Increases the output verbosity level
-S <file>, --settings=<file> The module name of the Django settings
file [default: beat.web.settings.settings]
-i <seconds>, --interval=<seconds> The time, in seconds, in which this
scheduler will try to allocate job splits
to existing workers. If not set, use the
value available on the Django settings
file, at the variable `SCHEDULING_INTERVAL`.
-a <address>, --address=<address> The address to which the processing nodes
must establish a connection to
-p <port>, --port=<port> The port to which the processing nodes
must establish a connection to
Examples:
To start the scheduling process do the following:
$ %(prog)s
You can pass the ``-v`` flag to start the scheduler with the logging level
set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging
level is set to ``WARNING`` if no ``-v`` flag is passed.
"""
import
logging
import
os
import
signal
import
sys
import
docopt
import
simplejson
from
beat.core.worker
import
WorkerController
from
..version
import
__version__
logger
=
None
# ----------------------------------------------------------
def
onWorkerReady
(
name
):
from
..backend.models
import
Worker
logger
.
info
(
"Worker '%s' is ready"
,
name
)
try
:
worker
=
Worker
.
objects
.
get
(
name
=
name
)
worker
.
active
=
True
worker
.
info
=
"Connected to the scheduler"
worker
.
save
()
except
Exception
:
import
traceback
print
(
traceback
.
format_exc
())
logger
.
error
(
"No worker named '%s' found in the database"
,
name
)
# ----------------------------------------------------------
def
onWorkerGone
(
name
):
from
..backend.models
import
Worker
logger
.
info
(
"Worker '%s' is gone"
,
name
)
try
:
worker
=
Worker
.
objects
.
get
(
name
=
name
)
worker
.
active
=
False
worker
.
info
=
"Disconnected from the scheduler"
worker
.
save
()
except
Exception
:
logger
.
error
(
"No worker named '%s' found in the database"
,
name
)
# ----------------------------------------------------------
def
remove_split_id_from
(
list
,
split_id
):
try
:
list
.
remove
(
list
.
index
(
split_id
))
except
ValueError
:
pass
# ----------------------------------------------------------
stop
=
False
def
main
(
user_input
=
None
):
# Parse the command-line arguments
if
user_input
is
not
None
:
arguments
=
user_input
else
:
arguments
=
sys
.
argv
[
1
:]
arguments
=
docopt
.
docopt
(
__doc__
%
dict
(
prog
=
os
.
path
.
basename
(
sys
.
argv
[
0
]),),
argv
=
arguments
,
version
=
"v%s"
%
__version__
,
)
# Initialisation of the application
os
.
environ
.
setdefault
(
"DJANGO_SETTINGS_MODULE"
,
arguments
[
"--settings"
])
from
django
import
setup
from
django.conf
import
settings
setup
()
# Importations of beat.web modules must be done after the call to django.setup()
from
..backend.helpers
import
assign_splits_to_workers
from
..backend.helpers
import
get_configuration_for_split
from
..backend.helpers
import
on_split_cancelled
from
..backend.helpers
import
on_split_done
from
..backend.helpers
import
on_split_fail
from
..backend.helpers
import
on_split_started
from
..backend.helpers
import
process_newly_cancelled_experiments
from
..backend.helpers
import
split_new_jobs
from
..backend.models
import
JobSplit
from
..backend.models
import
Worker
# Setup the logging
formatter
=
logging
.
Formatter
(
fmt
=
"[%(asctime)s - Scheduler - "
+
"%(name)s] %(levelname)s: %(message)s"
,
datefmt
=
"%d/%b/%Y %H:%M:%S"
,
)
handler
=
logging
.
StreamHandler
()
handler
.
setFormatter
(
formatter
)
root_logger
=
logging
.
getLogger
(
"beat.web"
)
root_logger
.
handlers
=
[]
root_logger
.
addHandler
(
handler
)
if
arguments
[
"--verbose"
]
==
1
:
root_logger
.
setLevel
(
logging
.
INFO
)
elif
arguments
[
"--verbose"
]
>=
2
:
root_logger
.
setLevel
(
logging
.
DEBUG
)
else
:
root_logger
.
setLevel
(
logging
.
WARNING
)
global
logger
logger
=
logging
.
getLogger
(
__name__
)
logger
.
handlers
=
[]
# Installs SIGTERM handler
def
handler
(
signum
,
frame
):
# Ignore further signals
signal
.
signal
(
signal
.
SIGTERM
,
signal
.
SIG_IGN
)
signal
.
signal
(
signal
.
SIGINT
,
signal
.
SIG_IGN
)
logger
.
info
(
"Signal %d caught, terminating..."
,
signum
)
global
stop
stop
=
True
signal
.
signal
(
signal
.
SIGTERM
,
handler
)
signal
.
signal
(
signal
.
SIGINT
,
handler
)
# Reset the status of all the workers in the database
for
worker
in
Worker
.
objects
.
filter
(
active
=
True
):
worker
.
active
=
False
worker
.
info
=
"Did not connect to the scheduler yet"
worker
.
save
()
# Initialisation of the worker controller
# TODO: Default values
worker_controller
=
WorkerController
(
arguments
[
"--address"
],
int
(
arguments
[
"--port"
]),
callbacks
=
dict
(
onWorkerReady
=
onWorkerReady
,
onWorkerGone
=
onWorkerGone
,),
)
# Processing loop
interval
=
(
int
(
arguments
[
"--interval"
])
if
arguments
[
"--interval"
]
else
settings
.
SCHEDULING_INTERVAL
)
logger
.
info
(
"Scheduling every %d seconds"
,
interval
)
running_job_splits
=
[]
cancelling_jobs
=
[]
global
stop
while
not
stop
:
logger
.
debug
(
"Starting scheduler cycle..."
)
# Process all the incoming messages
splits_to_cancel
=
[]
while
True
:
# Wait for a message
message
=
worker_controller
.
process
(
interval
*
1000
)
if
message
is
None
:
break
(
address
,
status
,
split_id
,
data
)
=
message
# Was there an error?
if
status
==
WorkerController
.
ERROR
:
if
split_id
is
None
:
if
data
!=
"Worker isn't busy"
:
logger
.
error
(
"Worker '%s' sent: %s"
,
address
,
data
)
continue
split_id
=
int
(
split_id
)
# Retrieve the job split
try
:
split
=
JobSplit
.
objects
.
get
(
id
=
split_id
)
except
JobSplit
.
DoesNotExist
:
logger
.
error
(
"Received message '%s' for unknown job split #%d"
,
status
,
split_id
)
continue
# Is the job done?
if
status
==
WorkerController
.
DONE
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
on_split_done
(
split
,
simplejson
.
loads
(
data
[
0
]))
remove_split_id_from
(
running_job_splits
,
split_id
)
# Has the job failed?
elif
status
==
WorkerController
.
JOB_ERROR
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
try
:
error
=
simplejson
.
loads
(
data
[
0
])
except
Exception
:
error
=
data
[
0
]
splits_to_cancel
.
extend
(
on_split_fail
(
split
,
error
))
remove_split_id_from
(
running_job_splits
,
split_id
)
# Was the job cancelled?
elif
status
==
WorkerController
.
CANCELLED
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
on_split_cancelled
(
split
)
remove_split_id_from
(
cancelling_jobs
,
split_id
)
# Was there an error?
elif
status
==
WorkerController
.
ERROR
:
if
split_id
in
running_job_splits
:
logger
.
info
(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
data
[
0
],
)
splits_to_cancel
.
extend
(
on_split_fail
(
split
,
data
[
0
]))
remove_split_id_from
(
running_job_splits
,
split_id
)
# Effectively cancel newly-cancelled experiments
splits_to_cancel
.
extend
(
process_newly_cancelled_experiments
())
# Cancel the necessary jobs (if any)
for
split_to_cancel
in
splits_to_cancel
:
if
split_to_cancel
.
id
in
running_job_splits
:
logger
.
info
(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'"
,
split_to_cancel
.
id
,
split_to_cancel
.
job
.
block
.
name
,
split_to_cancel
.
split_index
,
split_to_cancel
.
job
.
splits
.
count
(),
split_to_cancel
.
job
.
block
.
experiment
.
fullname
(),
split_to_cancel
.
worker
.
name
,
)
worker_controller
.
cancel
(
split_to_cancel
.
worker
.
name
,
split_to_cancel
.
id
)
remove_split_id_from
(
running_job_splits
,
split_to_cancel
.
id
)
cancelling_jobs
.
append
(
split_to_cancel
.
id
)
# If we must stop, don't start new jobs
if
stop
:
break
# Start new jobs
split_new_jobs
()
assigned_splits
=
assign_splits_to_workers
()
for
split
in
assigned_splits
:
running_job_splits
.
append
(
split
.
id
)
configuration
=
get_configuration_for_split
(
split
)
logger
.
info
(
"Starting job split #%d (%s %d/%d @ %s) on '%s'"
,
split
.
id
,
split
.
job
.
block
.
name
,
split
.
split_index
,
split
.
job
.
splits
.
count
(),
split
.
job
.
block
.
experiment
.
fullname
(),
split
.
worker
.
name
,
)
worker_controller
.
execute
(
split
.
worker
.
name
,
split
.
id
,
configuration
)
on_split_started
(
split
)
# Cleanup
logger
.
info
(
"Gracefully exiting the scheduler"
)
worker_controller
.
destroy
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment