diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 5cb4951c39d4853fcb5a4ff7c63c07f2bca0f570..052390ffb302f53c2f368dca741e8f3ac5b39bca 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -344,11 +344,15 @@ class Worker(models.Model): self.save() - def shutdown(self): - '''Cleanly shuts down a particular worker at the database + def terminate(self): + '''Cleanly terminates a particular worker at the database + + .. note:: - Stop all running/assigned splits and then mark the worker as - inactive. + This method does not destroy running or assigned processes that may + be running or assigned to this worker. This is implemented in this + way to allow for a clean replacement of the worker program w/o an + interruption of the backend service. ''' @@ -358,9 +362,29 @@ class Worker(models.Model): self_.active = False self_.used_cores = 0 self_.used_memory = 0 - self_.info = 'Worker shutdown by system administrator' + self_.info = 'Worker deactivated by system administrator' self_.save() + # cancel job splits which should be cancelled anyways + for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, + end_date__isnull=True, process_id__isnull=False): + if psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + + # cleans-up zombie processes that may linger + _cleanup_zombies() + + + def shutdown(self): + '''Removes all running/assigned jobs from the queue, shuts down + + This method should be used with care as it may potentially cancel all + assigned splits for the current worker. + + ''' + + self.terminate() + message = 'Cancelled on forced worker shutdown (maintenance)' \ ' - you may retry submitting your experiment shortly' @@ -374,14 +398,6 @@ class Worker(models.Model): end_date__isnull=True, process_id__isnull=False): j._cancel() - # cancel job splits which should be cancelled - for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, - end_date__isnull=True, process_id__isnull=False): - if psutil.pid_exists(j.process_id): - os.kill(j.process_id, signal.SIGTERM) - - # cleans-up zombie processes that may linger - _cleanup_zombies() def work(self, environments, cpulimit, process): @@ -452,7 +468,11 @@ class Worker(models.Model): continue # if we get to this point, then we launch the user process - subprocess.Popen(cmdline + [execute, str(split.pk)]) + # -> see settings.WORKER_DETACH_CHILDREN for more info + kwargs = dict() + if settings.WORKER_DETACH_CHILDREN: + kwargs['preexec_fn'] = os.setpgrp + subprocess.Popen(cmdline + [execute, str(split.pk)], **kwargs) split.status = Job.PROCESSING #avoids re-running split.save() @@ -466,7 +486,7 @@ class Worker(models.Model): def __exit__(self, *exc): - self.shutdown() + self.terminate() return False #propagate exceptions diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 357a8efb8ee1611bdf9791c8473da13591fd791a..2583579d640a60d0cce4edaf46a984796d090594 100644 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -286,6 +286,12 @@ DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \ "algorithm.\nThe administrators have been informed.\nYou may try to run " \ "the experiment again at another moment." +# If set, then detach children processes (I/O daemons) from the worker process +# group. In this case, signals sent to the worker process will not be forwarded +# to the processing children. This is desirable in a production environment, to +# avoid user processes to be terminated in case the worker is updated. +WORKER_DETACH_CHILDREN = False + ############################################################################## #