Skip to content
Snippets Groups Projects
Commit dfbc4f22 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Improve worker job handling (cannot use multiprocessing)

parent cf4e7bca
No related branches found
No related tags found
1 merge request!194Scheduler
...@@ -27,12 +27,16 @@ ...@@ -27,12 +27,16 @@
'''Scheduling functions and utilities''' '''Scheduling functions and utilities'''
import os
import sys
import time
import socket import socket
import signal
import subprocess
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import multiprocessing import psutil
import simplejson import simplejson
from django.conf import settings from django.conf import settings
...@@ -242,7 +246,8 @@ def schedule(): ...@@ -242,7 +246,8 @@ def schedule():
return assigned_splits return assigned_splits
def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): @transaction.atomic
def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
'''Process assigned job splits using beat.core '''Process assigned job splits using beat.core
This task executes the user algorithm on a subprocess. It also serves the This task executes the user algorithm on a subprocess. It also serves the
...@@ -265,10 +270,10 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -265,10 +270,10 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
Parameters: Parameters:
split (:py:class:JobSplit): The JobSplit to process split_pk (int): The primary-key of the JobSplit to process
environments (dict): A dictionary mapping environment objects from the execute (str): The path to the ``execute`` program to use for running the
Django database to their actual location on the file system user code associated with this job split.
cpulimit (str, Optional): The path to the ``cpulimit`` program to use for cpulimit (str, Optional): The path to the ``cpulimit`` program to use for
limiting the user code in CPU usage. If not set, then don't use it, limiting the user code in CPU usage. If not set, then don't use it,
...@@ -280,7 +285,8 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -280,7 +285,8 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
''' '''
split.start() #officially starts the split # lock split
split = JobSplit.objects.select_for_update().get(pk=split_pk)
config = simplejson.loads(split.job.block.command) config = simplejson.loads(split.job.block.command)
...@@ -288,11 +294,11 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -288,11 +294,11 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
if split.job.block.required_slots > 1: if split.job.block.required_slots > 1:
if (split.start_index) is None or (split.end_index is None): if (split.start_index) is None or (split.end_index is None):
message = "The split %d/%d running on worker `%s' for " \ message = "The split %d/%d (pid=%d) running on worker `%s' for " \
"block `%s' of experiment `%s' could not " \ "block `%s' of experiment `%s' could not " \
"be completed: indexes are missing!" % \ "be completed: indexes are missing!" % \
(split.split_index+1, split.job.block.required_slots, (split.split_index+1, split.job.block.required_slots,
split.worker, split.job.block.name, split.process_id, split.worker, split.job.block.name,
split.job.block.experiment.fullname()) split.job.block.experiment.fullname())
logger.error(message) logger.error(message)
split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR,
...@@ -323,25 +329,6 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -323,25 +329,6 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
) )
raise RuntimeError(message) raise RuntimeError(message)
# Check we have a compatible environment to execute the user algorithm
envkey = config['environment']
envinfo = environments.get('%(name)s (%(version)s)' % envkey)
execute_path = envinfo['execute'] if envinfo else None
if execute_path is None:
message = "Environment `%s' is not available for split %d/%d " \
"running at worker `%s', for block `%s' of experiment " \
"`%s': %s" % (split.job.block.environment,
split.split_index+1,
split.job.block.required_slots,
split.worker,
split.job.block.name,
split.job.block.experiment.fullname(),
"Available environments are `%s'" % \
'|'.join(environments.keys()),
)
raise RuntimeError(message)
queue = split.job.block.queue queue = split.job.block.queue
nb_cores = queue.cores_per_slot nb_cores = queue.cores_per_slot
if (nb_cores > 0) and (cpulimit is None): if (nb_cores > 0) and (cpulimit is None):
...@@ -355,7 +342,7 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -355,7 +342,7 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
# n.b.: with executor may crash on the database view setup # n.b.: with executor may crash on the database view setup
with executor: with executor:
result = executor.process( result = executor.process(
execute_path=execute_path, execute_path=execute,
virtual_memory_in_megabytes=queue.memory_limit, virtual_memory_in_megabytes=queue.memory_limit,
max_cpu_percent=int(100*float(nb_cores)), #allows for 150% max_cpu_percent=int(100*float(nb_cores)), #allows for 150%
cpulimit_path=cpulimit, cpulimit_path=cpulimit,
...@@ -371,35 +358,54 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -371,35 +358,54 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT):
syserr=result['system_error'], syserr=result['system_error'],
_stats=simplejson.dumps(result['statistics'], indent=2), _stats=simplejson.dumps(result['statistics'], indent=2),
)) ))
logger.info("Process `%s' for split `%s' ended gracefully",
split.process_id, split)
except: except:
from traceback import format_exc from traceback import format_exc
from beat.core.stats import Statistics
logger.error(format_exc()) logger.error(format_exc())
logger.warn("Process `%s' for split `%s' ended with an error",
split.process_id, split)
split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR,
syserr=format_exc(),)) syserr=format_exc(),))
def multiprocess(*args, **kwargs): @transaction.atomic
'''Runs :py:func:`process` through a forked subprocess def worker_update():
'''Updates the worker state in the database from local readings'''
Input arguments are the same as for :py:func:`process`. # myself, raises if I cannot find me
worker = Worker.objects.select_for_update().get(name=socket.gethostname())
Returns: # check I have at least all cores and memory I'm supposed to have
cores = psutil.cpu_count()
ram = psutil.virtual_memory().total/(1024*1024)
worker.info = ''
multiprocessing.Process: an instance of a Process you can call if cores < worker.cores:
``join()`` at. logger.warn("Worker `%s' only has %d cores which is less then the " \
"value declared on the database - it's not a problem, but note " \
"this worker may get overloaded", worker, cores)
worker.info += 'only %d cores;' % cores
''' if ram < worker.memory:
logger.warn("Worker `%s' only has %d Mb of RAM which is less then " \
"the value declared on the database - it's not a problem, but " \
"note this worker may get overloaded", worker, ram)
worker.info += 'only %d Mb of RAM;' % ram
retval = multiprocessing.Process(target=process, args=args, kwargs=kwargs) # update process and memory usage
retval.start() worker.used_cores = int(psutil.cpu_percent())
return retval worker.used_memory = int(psutil.virtual_memory().percent)
# save current worker state
worker.info += 'updated: ' + time.asctime()
worker.active = True
worker.save()
@transaction.atomic @transaction.atomic
def work(environments, cpulimit): def work(environments, cpulimit, process):
'''Launches user code on isolated processes '''Launches user code on isolated processes
This function is supposed to be called asynchronously, by a scheduling This function is supposed to be called asynchronously, by a scheduling
...@@ -409,26 +415,74 @@ def work(environments, cpulimit): ...@@ -409,26 +415,74 @@ def work(environments, cpulimit):
Job split cancelling is executed by setting the split state as Job split cancelling is executed by setting the split state as
``CANCELLED`` and waiting for this function to handle it. ``CANCELLED`` and waiting for this function to handle it.
Parameters:
environments (dict): A dictionary containing installed environments,
their description and execute-file paths.
cpulimit (str): The path to the ``cpulimit`` program to use for
limiting the user code in CPU usage. If set to ``None``, then don't use
it, even if the select user queue has limits.
process (str): The path to the ``process.py`` program to use for running
the user code on isolated processes.
''' '''
# myself, raises if I cannot find me # myself, raises if I cannot find me
worker = Worker.objects.select_for_update().get(name=socket.gethostname()) worker = Worker.objects.get(name=socket.gethostname())
# cancel job splits # cancel job splits
for j in JobSplit.objects.select_for_update().filter(worker=worker, for j in JobSplit.objects.select_for_update().filter(worker=worker,
status=Job.CANCELLED, end_date__isnull=True, process_id__isnull=False): status=Job.CANCELLED, end_date__isnull=True, process_id__isnull=False):
import signal
signal.signal(signal.SIGKILL, j.process_id) signal.signal(signal.SIGKILL, j.process_id)
j.end(None, Job.CANCELLED) j.end(None, Job.CANCELLED)
# cmdline base argument
cmdline = [process]
if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit]
if settings.DEBUG: cmdline += ['-vv']
# start newly assigned job splits # start newly assigned job splits
for j in JobSplit.objects.select_for_update().filter(worker=worker, for j in JobSplit.objects.select_for_update().filter(worker=worker,
status=Job.SCHEDULED, start_date__isnull=True, process_id__isnull=True): status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True):
multiprocess(j, environments, cpulimit) execute = pick_execute(j, environments)
if execute is None:
message = "Environment `%s' is not available for split %d/%d " \
"running at worker `%s', for block `%s' of experiment " \
"`%s': %s" % (split.job.block.environment,
split.split_index+1,
split.job.block.required_slots,
split.worker,
split.job.block.name,
split.job.block.experiment.fullname(),
"Available environments are `%s'" % \
'|'.join(environments.keys()),
)
j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR,
syserr=message))
# if we get to this point, then we launch the user process
subprocess.Popen(cmdline + [execute, str(j.pk)])
def refresh_environments(paths=None):
'''Refresh current list of known environments def resolve_process_path():
'''Returns the path to cpulimit'''
basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
r = os.path.join(basedir, 'process.py')
if not os.path.exists(r):
raise RuntimeError("Cannot find `process.py' at `%s' - please check " \
"your installation" % basedir)
return r
def find_environments(paths=None):
'''Finds list of known environments
Parameters: Parameters:
...@@ -467,3 +521,45 @@ def refresh_environments(paths=None): ...@@ -467,3 +521,45 @@ def refresh_environments(paths=None):
path = pkg_resources.resource_filename(__name__, 'environments') path = pkg_resources.resource_filename(__name__, 'environments')
logger.debug("Search for environments at `%s'", path) logger.debug("Search for environments at `%s'", path)
return discover_environments([path]) return discover_environments([path])
@transaction.atomic
def worker_shutdown():
"""Standard worker shutdown procedure
Stop all running/assigned splits and then mark the worker as inactive.
"""
# myself, raises if I cannot find me
worker = Worker.objects.select_for_update().get(name=socket.gethostname())
message = 'Cancelled on forced worker shutdown (maintenance)' \
' - you may retry submitting your experiment shortly'
# cancel job splits which are running
for j in JobSplit.objects.select_for_update().filter(worker=worker,
status=(Job.CANCELLED, Job.PROCESSING), end_date__isnull=True,
process_id__isnull=False):
signal.signal(signal.SIGKILL, j.process_id)
j.end(Result(status=1, usrerr=message))
# cancel job splits which were not yet started
for j in JobSplit.objects.select_for_update().filter(worker=worker,
status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True):
j.end(Result(status=1, usrerr=message))
# lock worker and modify it
worker.active = False
worker.used_cores = 0
worker.used_memory = 0
worker.info = 'Worker shutdown by system administrator'
worker.save()
def pick_execute(split, environments):
"""Resolves the path to the ``execute`` program to use for the split"""
# Check we have a compatible environment to execute the user algorithm
envinfo = environments.get(split.job.block.environment.natural_key())
return envinfo['execute'] if envinfo else None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment