From 8109f8bce5d3b4b03ba9990b939a3e4315d9a301 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.anjos@idiap.ch> Date: Wed, 25 May 2016 10:34:59 +0200 Subject: [PATCH] [backend] Implement optional termination of worker processes --- beat/web/backend/models.py | 50 ++++++++++++++++++++++++----------- beat/web/settings/settings.py | 6 +++++ 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 5cb4951c3..052390ffb 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -344,11 +344,15 @@ class Worker(models.Model): self.save() - def shutdown(self): - '''Cleanly shuts down a particular worker at the database + def terminate(self): + '''Cleanly terminates a particular worker at the database + + .. note:: - Stop all running/assigned splits and then mark the worker as - inactive. + This method does not destroy running or assigned processes that may + be running or assigned to this worker. This is implemented in this + way to allow for a clean replacement of the worker program w/o an + interruption of the backend service. ''' @@ -358,9 +362,29 @@ class Worker(models.Model): self_.active = False self_.used_cores = 0 self_.used_memory = 0 - self_.info = 'Worker shutdown by system administrator' + self_.info = 'Worker deactivated by system administrator' self_.save() + # cancel job splits which should be cancelled anyways + for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, + end_date__isnull=True, process_id__isnull=False): + if psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + + # cleans-up zombie processes that may linger + _cleanup_zombies() + + + def shutdown(self): + '''Removes all running/assigned jobs from the queue, shuts down + + This method should be used with care as it may potentially cancel all + assigned splits for the current worker. + + ''' + + self.terminate() + message = 'Cancelled on forced worker shutdown (maintenance)' \ ' - you may retry submitting your experiment shortly' @@ -374,14 +398,6 @@ class Worker(models.Model): end_date__isnull=True, process_id__isnull=False): j._cancel() - # cancel job splits which should be cancelled - for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, - end_date__isnull=True, process_id__isnull=False): - if psutil.pid_exists(j.process_id): - os.kill(j.process_id, signal.SIGTERM) - - # cleans-up zombie processes that may linger - _cleanup_zombies() def work(self, environments, cpulimit, process): @@ -452,7 +468,11 @@ class Worker(models.Model): continue # if we get to this point, then we launch the user process - subprocess.Popen(cmdline + [execute, str(split.pk)]) + # -> see settings.WORKER_DETACH_CHILDREN for more info + kwargs = dict() + if settings.WORKER_DETACH_CHILDREN: + kwargs['preexec_fn'] = os.setpgrp + subprocess.Popen(cmdline + [execute, str(split.pk)], **kwargs) split.status = Job.PROCESSING #avoids re-running split.save() @@ -466,7 +486,7 @@ class Worker(models.Model): def __exit__(self, *exc): - self.shutdown() + self.terminate() return False #propagate exceptions diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 357a8efb8..2583579d6 100644 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -286,6 +286,12 @@ DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \ "algorithm.\nThe administrators have been informed.\nYou may try to run " \ "the experiment again at another moment." +# If set, then detach children processes (I/O daemons) from the worker process +# group. In this case, signals sent to the worker process will not be forwarded +# to the processing children. This is desirable in a production environment, to +# avoid user processes to be terminated in case the worker is updated. +WORKER_DETACH_CHILDREN = False + ############################################################################## # -- GitLab