Skip to content
Snippets Groups Projects
Commit 8109f8bc authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Implement optional termination of worker processes

parent bcca87c7
No related branches found
No related tags found
1 merge request!194Scheduler
Pipeline #
...@@ -344,11 +344,15 @@ class Worker(models.Model): ...@@ -344,11 +344,15 @@ class Worker(models.Model):
self.save() self.save()
def shutdown(self): def terminate(self):
'''Cleanly shuts down a particular worker at the database '''Cleanly terminates a particular worker at the database
.. note::
Stop all running/assigned splits and then mark the worker as This method does not destroy running or assigned processes that may
inactive. be running or assigned to this worker. This is implemented in this
way to allow for a clean replacement of the worker program w/o an
interruption of the backend service.
''' '''
...@@ -358,9 +362,29 @@ class Worker(models.Model): ...@@ -358,9 +362,29 @@ class Worker(models.Model):
self_.active = False self_.active = False
self_.used_cores = 0 self_.used_cores = 0
self_.used_memory = 0 self_.used_memory = 0
self_.info = 'Worker shutdown by system administrator' self_.info = 'Worker deactivated by system administrator'
self_.save() self_.save()
# cancel job splits which should be cancelled anyways
for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL,
end_date__isnull=True, process_id__isnull=False):
if psutil.pid_exists(j.process_id):
os.kill(j.process_id, signal.SIGTERM)
# cleans-up zombie processes that may linger
_cleanup_zombies()
def shutdown(self):
'''Removes all running/assigned jobs from the queue, shuts down
This method should be used with care as it may potentially cancel all
assigned splits for the current worker.
'''
self.terminate()
message = 'Cancelled on forced worker shutdown (maintenance)' \ message = 'Cancelled on forced worker shutdown (maintenance)' \
' - you may retry submitting your experiment shortly' ' - you may retry submitting your experiment shortly'
...@@ -374,14 +398,6 @@ class Worker(models.Model): ...@@ -374,14 +398,6 @@ class Worker(models.Model):
end_date__isnull=True, process_id__isnull=False): end_date__isnull=True, process_id__isnull=False):
j._cancel() j._cancel()
# cancel job splits which should be cancelled
for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL,
end_date__isnull=True, process_id__isnull=False):
if psutil.pid_exists(j.process_id):
os.kill(j.process_id, signal.SIGTERM)
# cleans-up zombie processes that may linger
_cleanup_zombies()
def work(self, environments, cpulimit, process): def work(self, environments, cpulimit, process):
...@@ -452,7 +468,11 @@ class Worker(models.Model): ...@@ -452,7 +468,11 @@ class Worker(models.Model):
continue continue
# if we get to this point, then we launch the user process # if we get to this point, then we launch the user process
subprocess.Popen(cmdline + [execute, str(split.pk)]) # -> see settings.WORKER_DETACH_CHILDREN for more info
kwargs = dict()
if settings.WORKER_DETACH_CHILDREN:
kwargs['preexec_fn'] = os.setpgrp
subprocess.Popen(cmdline + [execute, str(split.pk)], **kwargs)
split.status = Job.PROCESSING #avoids re-running split.status = Job.PROCESSING #avoids re-running
split.save() split.save()
...@@ -466,7 +486,7 @@ class Worker(models.Model): ...@@ -466,7 +486,7 @@ class Worker(models.Model):
def __exit__(self, *exc): def __exit__(self, *exc):
self.shutdown() self.terminate()
return False #propagate exceptions return False #propagate exceptions
......
...@@ -286,6 +286,12 @@ DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \ ...@@ -286,6 +286,12 @@ DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \
"algorithm.\nThe administrators have been informed.\nYou may try to run " \ "algorithm.\nThe administrators have been informed.\nYou may try to run " \
"the experiment again at another moment." "the experiment again at another moment."
# If set, then detach children processes (I/O daemons) from the worker process
# group. In this case, signals sent to the worker process will not be forwarded
# to the processing children. This is desirable in a production environment, to
# avoid user processes to be terminated in case the worker is updated.
WORKER_DETACH_CHILDREN = False
############################################################################## ##############################################################################
# #
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment