diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index dbf6d185a18798a6f7c779cf5cea61fb605267e3..4839e9ea62154d09a46100c28d063516153da307 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -26,6 +26,7 @@ ############################################################################### import os +import time import datetime import operator @@ -35,6 +36,7 @@ logger = logging.getLogger(__name__) import psutil import simplejson +from django.db import utils from django.db import models from django.db import transaction from django.conf import settings @@ -569,13 +571,12 @@ class Job(models.Model): - @transaction.atomic def _copy(self, other): '''Copy state from another block''' - self_ = Job.objects.select_for_update().get(pk=self.pk) + self.refresh_from_db() - if self_.done(): return + if self.done(): return self.start_date = other.start_date self.end_date = other.end_date @@ -587,8 +588,7 @@ class Job(models.Model): self.block._update_state(None) - @transaction.atomic - def make_runnable(self): + def _make_runnable(self): '''Tries to make this job runnable - if it is cached, we skip it''' # lock self - avoids concurrent update from scheduler/worker subsystem @@ -698,7 +698,7 @@ class Job(models.Model): child = self.child self.child.parent = None # does this unblock the child to run? - if child.block.is_runnable(): child.make_runnable() + if child.block.is_runnable(): child._make_runnable() else: self.child._copy(self) @@ -714,85 +714,83 @@ class Job(models.Model): called by children splits or itself. ''' - with transaction.atomic(): - - # lock - self_ = Job.objects.select_for_update().get(pk=self.pk) + # lock + self_ = Job.objects.select_for_update().get(pk=self.pk) - if self_.done(): return + if self_.done(): return - # If this process has a parent, then don't try to get split - # statuses - if not self.parent: + # If this process has a parent, then don't try to get split + # statuses + if not self.parent: - split_statuses = self.splits.values_list('status', flat=True) + split_statuses = self.splits.values_list('status', flat=True) - if self.start_date is None: - qs = self.splits.filter(start_date__isnull=False).\ - order_by('start_date') - if qs: - self.start_date = qs.first().start_date - else: - self.start_date = datetime.datetime.now() + if self.start_date is None: + qs = self.splits.filter(start_date__isnull=False).\ + order_by('start_date') + if qs: + self.start_date = qs.first().start_date + else: + self.start_date = datetime.datetime.now() - # Process main status and status from job results - if Job.FAILED in split_statuses: - self.status = Job.FAILED + # Process main status and status from job results + if Job.FAILED in split_statuses: + self.status = Job.FAILED - elif Job.CANCELLED in split_statuses: - self.status = Job.CANCELLED + elif Job.CANCELLED in split_statuses: + self.status = Job.CANCELLED - elif (Job.PROCESSING in split_statuses) or \ - (Job.QUEUED in split_statuses and \ - Job.COMPLETED in split_statuses) or \ - (Job.KILL in split_statuses): - self.status = Job.PROCESSING + elif (Job.PROCESSING in split_statuses) or \ + (Job.QUEUED in split_statuses and \ + Job.COMPLETED in split_statuses) or \ + (Job.KILL in split_statuses): + self.status = Job.PROCESSING - elif all([s == Job.SKIPPED for s in split_statuses]): - self.status = Job.SKIPPED + elif all([s == Job.SKIPPED for s in split_statuses]): + self.status = Job.SKIPPED - elif Job.QUEUED not in split_statuses: - self.status = Job.COMPLETED + elif Job.QUEUED not in split_statuses: + self.status = Job.COMPLETED + else: + self.status = Job.QUEUED + + # if required, erase dangling files, update own results + timings = None + if self.done() and self.status != Job.CANCELLED: + # compute final timings and update parent block + if self.status != Job.SKIPPED: + diff_timings = self._merge_results() + # delete all splits w/o results (still queued) + self.splits.filter(result__isnull=True).delete() + self.end_date = self.splits.order_by('-end_date').\ + first().end_date + updateStatistics(self.result.stats, self.end_date) + Result.objects.filter(split__in=self.splits.all()).delete() + seqtime = sum(diff_timings) + if self.start_date is None: + queuing = 0 else: - self.status = Job.QUEUED - - # if required, erase dangling files, update own results - timings = None - if self.done() and self.status != Job.CANCELLED: - # compute final timings and update parent block - if self.status != Job.SKIPPED: - diff_timings = self._merge_results() - # delete all splits w/o results (still queued) - self.splits.filter(result__isnull=True).delete() - self.end_date = self.splits.order_by('-end_date').\ - first().end_date - updateStatistics(self.result.stats, self.end_date) - Result.objects.filter(split__in=self.splits.all()).delete() - seqtime = sum(diff_timings) - if self.start_date is None: - queuing = 0 - else: - queuing = (self.start_date - \ - self.runnable_date).total_seconds() - if not seqtime: - speed_up_real = 1.0 - speed_up_maximal = 1.0 - else: - speed_up_real = float(seqtime) / \ - (self.end_date - self.start_date).total_seconds() - speed_up_maximal = float(seqtime) / max(diff_timings) - timings = dict( - queuing = queuing, - linear_execution = seqtime, - speed_up_real = speed_up_real, - speed_up_maximal = speed_up_maximal, - ) - self.runnable_date = None - self.erase_dangling_files() - - # updates the dependents and child state - self.save() + queuing = (self.start_date - \ + self.runnable_date).total_seconds() + if not seqtime: + speed_up_real = 1.0 + speed_up_maximal = 1.0 + else: + speed_up_real = float(seqtime) / \ + (self.end_date - self.start_date).total_seconds() + speed_up_maximal = float(seqtime) / max(diff_timings) + timings = dict( + queuing = queuing, + linear_execution = seqtime, + speed_up_real = speed_up_real, + speed_up_maximal = speed_up_maximal, + ) + self.runnable_date = None + self.erase_dangling_files() + + # updates the dependents and child state + self.save() self._cascade_updates() self.block._update_state(timings) @@ -813,7 +811,6 @@ class Job(models.Model): os.remove(f) - @transaction.atomic def _cancel(self, usrerr=None, syserr=None): '''Cancel the execution of this job @@ -931,6 +928,7 @@ class JobSplit(models.Model): ) + @transaction.atomic def schedule(self, worker): '''Schedules this split to be executed on a given worker @@ -941,13 +939,13 @@ class JobSplit(models.Model): ''' - with transaction.atomic(): - # lock self - avoids concurrent update from scheduler/worker - # subsystem - self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + # lock self - avoids concurrent update from scheduler/worker + # subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + worker_ = Worker.objects.select_for_update().get(pk=worker.pk) - self.worker = worker - self.save() + self.worker = worker + self.save() logger.info("Job split %s scheduled at `%s' was assigned to `%s'", self, self.job.block.queue, self.worker) @@ -957,29 +955,45 @@ class JobSplit(models.Model): '''Marks the job as started, acknowledging scheduler assignment Once this function is called, a second call no longer alters anything. + ''' + tries = 0 - Parameters: + while True: - worker (:py:class:Worker): The worker this job split was actually - submitted to, if there is one. + try: - ''' + with transaction.atomic(): - if self.start_date is not None: return + # lock self - avoids concurrent update from + # scheduler/worker subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - with transaction.atomic(): - # lock self - avoids concurrent update from scheduler/worker - # subsystem - self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + if self_.start_date is not None: return - self.start_date = datetime.datetime.now() - self.process_id = os.getpid() + self.start_date = datetime.datetime.now() + self.process_id = os.getpid() - self.status = Job.PROCESSING - self.save() + self.status = Job.PROCESSING - self.job._update_state() + self.save() + + logger.info("Job split `%s' was just started.", self) + + self.job._update_state() + + break + + except utils.OperationalError: + tries += 1 + if tries > settings.MAXIMUM_SPLIT_SAVE_RETRIES: + raise + else: + logger.info("Database error caught starting `%s': retrying " \ + "in 1 second (%d/%d)...", self, tries, + settings.MAXIMUM_SPLIT_SAVE_RETRIES) + # wait a second and retry + time.sleep(1) def _cancel(self): @@ -1002,51 +1016,73 @@ class JobSplit(models.Model): def end(self, result, status=None): - '''Marks the job as finished on the state database + '''Marks the job as finished on the state database + + Disassociates the worker from this job. Once this function is called, a + second call no longer alters anything. + + + Parameters: - Disassociates the worker from this job. Once this function is called, a - second call no longer alters anything. + result (:py:class:`Result`): The result of the task + status (str): One of the possible (single character) Job statuses, in + case ``result`` is not provided. Notice that, if ``result`` is + provided, this variable is **ignored** and the state + (``Job.COMPLETED`` or ``Job.FAILED``) is filled in from + ``result.status``. A ``result.status`` of 0 (zero) indicates a + successful task (set job status to ``Job.COMPLETED``), whereas if the + status is different than zero, the job status is set to + ``Job.FAILED``. - Parameters: + ''' + + tries = 0 - result (:py:class:`Result`): The result of the task + while True: - status (str): One of the possible (single character) Job statuses, in - case ``result`` is not provided. Notice that, if ``result`` is - provided, this variable is **ignored** and the state (``Job.COMPLETED`` - or ``Job.FAILED``) is filled in from ``result.status``. A - ``result.status`` of 0 (zero) indicates a successful task (set job - status to ``Job.COMPLETED``), whereas if the status is different than - zero, the job status is set to ``Job.FAILED``. + try: - ''' + with transaction.atomic(): - with transaction.atomic(): + # lock self - avoids concurrent update from + # scheduler/worker subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - # lock self - avoids concurrent update from scheduler/worker - # subsystem - self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + if self_.done(): return - if self_.done(): return + if status: + logger.info("Marking job split `%s' as '%s'", self, + status) - if status: - logger.info("Marking job split `%s' as '%s'", self, status) + if self.start_date is None: + self.start_date = datetime.datetime.now() + self.end_date = datetime.datetime.now() + self.worker = None #frees worker slot + + if result: + self.status = Job.COMPLETED if result.status == 0 \ + else Job.FAILED + if result.id is None: result.save() + self.result = result + else: + self.status = status - if self.start_date is None: - self.start_date = datetime.datetime.now() - self.end_date = datetime.datetime.now() - self.worker = None #frees worker slot + self.save() - if result: - self.status = Job.COMPLETED if result.status == 0 else Job.FAILED - if result.id is None: result.save() - self.result = result - else: - self.status = status + logger.info("Job split `%s' finished executing", self) - self.save() + self.job._update_state() - logger.info("Job split `%s' finished executing", self) + break - self.job._update_state() + except utils.OperationalError: + tries += 1 + if tries > settings.MAXIMUM_SPLIT_SAVE_RETRIES: + raise + else: + logger.info("Database error caught ending `%s': retrying " \ + "in 1 second (%d/%d)...", self, tries, + settings.MAXIMUM_SPLIT_SAVE_RETRIES) + # wait a second and retry + time.sleep(1) diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index 5d9f3cbe7be5ca582ef1a943ec743831c6c6795a..e515e92307eb187409058c6397254c78e36e8834 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -111,7 +111,6 @@ def send_experiment_emails(): qs.update(email_dispatch=False) -@transaction.atomic def schedule(): '''Schedules job splits that can run now, respecting user/queue usage @@ -200,7 +199,7 @@ def schedule(): ''' # updates jobs with split errors, cancel experiments if problems occur - for j in Job.objects.select_for_update().filter(split_errors__gt=0): + for j in Job.objects.filter(split_errors__gt=0): j._split_indices() # get queues in a good order @@ -217,7 +216,7 @@ def schedule(): # workers that can run job splits whitelist = {} - for worker in Worker.objects.select_for_update().filter(active=True): + for worker in Worker.objects.filter(active=True): availability = worker.available_cores() if availability <= 0: continue whitelist[worker] = availability @@ -258,16 +257,15 @@ def schedule(): return assigned_splits -@transaction.atomic -def _atomic_split_end(split_pk, result): - '''Tries to, atomically end the split by calling split.end()''' +def _split_end(split, result): + '''Tries to end the split - ignores if the split was deleted''' try: - split = JobSplit.objects.select_for_update().get(pk=split_pk) + split.refresh_from_db() split.end(result) except JobSplit.DoesNotExist: logger.warn("Job split(pk=%d) does not exist. Likely cancelled, " \ - "so ignoring result `%s'", split_pk, result) + "so ignoring result `%s'", split.pk, result) def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): @@ -308,15 +306,13 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ''' - with transaction.atomic(): - # lock split - try: - split = JobSplit.objects.select_for_update().get(pk=split_pk) - split.start() - except JobSplit.DoesNotExist: - logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ - "so, ignoring.", split_pk) - return + try: + split = JobSplit.objects.get(pk=split_pk) + split.start() + except JobSplit.DoesNotExist: + logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ + "so, ignoring.", split_pk) + return config = simplejson.loads(split.job.block.command) @@ -331,14 +327,13 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): split.process_id, split.worker, split.job.block.name, split.job.block.experiment.fullname()) logger.warn(message) - _atomic_split_end(split_pk, Result(status=1, syserr=message, + _split_end(split, Result(status=1, syserr=message, usrerr=settings.DEFAULT_USER_ERROR)) config['range'] = [split.start_index, split.end_index] # For reference, this bit of code should match (or be very similar) to the # one at beat.cmdline.experiments:run_experiment() - split = JobSplit.objects.get(pk=split_pk) #notice: no locking from beat.core.execution import Executor try: @@ -380,7 +375,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): daemon=0, ) - _atomic_split_end(split_pk, Result( + split.end(Result( status=result['status'], stdout=result['stdout'], stderr=result['stderr'], @@ -395,7 +390,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): from traceback import format_exc logger.warn("Process `%s' for split `%s' ended with an error: %s", split.process_id, split, format_exc()) - _atomic_split_end(split_pk, Result(status=1, + _split_end(split, Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) @@ -409,7 +404,6 @@ def worker_update(): worker.update_state() -@transaction.atomic def work(environments, cpulimit, process, django_settings): '''Launches user code on isolated processes @@ -443,8 +437,8 @@ def work(environments, cpulimit, process, django_settings): worker = Worker.objects.get(name=socket.gethostname()) # cancel job splits by killing associated processes - for j in JobSplit.objects.select_for_update().filter(worker=worker, - status=Job.KILL, end_date__isnull=True, process_id__isnull=False): + for j in JobSplit.objects.filter(worker=worker, status=Job.KILL, + end_date__isnull=True, process_id__isnull=False): os.kill(j.process_id, signal.SIGKILL) j.end(None, Job.CANCELLED) @@ -454,8 +448,8 @@ def work(environments, cpulimit, process, django_settings): if settings.DEBUG: cmdline += ['-vv'] # start newly assigned job splits - for j in JobSplit.objects.select_for_update().filter(worker=worker, - status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True): + for j in JobSplit.objects.filter(worker=worker, status=Job.QUEUED, + start_date__isnull=True, process_id__isnull=True): execute = pick_execute(j, environments) if execute is None: message = "Environment `%s' is not available for split %d/%d " \ @@ -479,7 +473,7 @@ def work(environments, cpulimit, process, django_settings): def cleanup_zombies(): '''Cleans-up eventual zombie subprocesses launched by the call above''' - for child in psutil.Process().children(): + for child in psutil.Process().children(recursive=True): if child.status == psutil.STATUS_ZOMBIE: child.wait() @@ -539,7 +533,6 @@ def find_environments(paths=None): return discover_environments([path]) -@transaction.atomic def worker_shutdown(): """Standard worker shutdown procedure @@ -547,30 +540,30 @@ def worker_shutdown(): """ # myself, raises if I cannot find me - worker = Worker.objects.select_for_update().get(name=socket.gethostname()) + worker = Worker.objects.get(name=socket.gethostname()) message = 'Cancelled on forced worker shutdown (maintenance)' \ ' - you may retry submitting your experiment shortly' # cancel job splits which are running - for j in JobSplit.objects.select_for_update().filter(worker=worker, + for j in JobSplit.objects.filter(worker=worker, status=(Job.KILL, Job.PROCESSING), end_date__isnull=True, process_id__isnull=False): signal.signal(signal.SIGKILL, j.process_id) j.end(Result(status=1, usrerr=message)) # cancel job splits which were not yet started - for j in JobSplit.objects.select_for_update().filter(worker=worker, + for j in JobSplit.objects.filter(worker=worker, status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True): j.end(Result(status=1, usrerr=message)) - # lock worker and modify it - worker.active = False - worker.used_cores = 0 - worker.used_memory = 0 - worker.info = 'Worker shutdown by system administrator' - worker.save() - + with transaction.atomic(): + worker = Worker.objects.select_for_update().get(pk=worker.pk) + worker.active = False + worker.used_cores = 0 + worker.used_memory = 0 + worker.info = 'Worker shutdown by system administrator' + worker.save() def pick_execute(split, environments): diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index b0313276e6def9f01c6943f2dea86d525297d5f2..8001c7122dc9056d4947087cc55f321622f6a4e8 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -739,9 +739,12 @@ class Experiment(Shareable): return storage.get_file_content(self, 'declaration_file') - @transaction.atomic - def update_state(self): - '''Update self state based on associated block states''' + def _update_state(self): + '''Update self state based on associated block states + + This method is called by the underlying block. It is not part of the + Experiment's public API and must not be called by any other user code. + ''' self_ = Experiment.objects.select_for_update().get(pk=self.pk) @@ -784,6 +787,7 @@ class Experiment(Shareable): self.save() + @transaction.atomic def schedule(self): '''Schedules this experiment for execution at the backend @@ -798,7 +802,7 @@ class Experiment(Shareable): creating one :py:class:`..backend.models.JobSplit` per split. ''' - self_ = Experiment.objects.get(pk=self.pk) + self_ = Experiment.objects.select_for_update().get(pk=self.pk) if self_.status != Experiment.PENDING: return @@ -806,13 +810,13 @@ class Experiment(Shareable): # notice that the previous call may decide all is done already # so, we must respect that before setting the SCHEDULED status - with transaction.atomic(): - self_ = Experiment.objects.select_for_update().get(pk=self.pk) - if not self_.is_done(): - self.status = Experiment.SCHEDULED - self.save() + self.refresh_from_db() + if not self.is_done(): + self.status = Experiment.SCHEDULED + self.save() + @transaction.atomic def cancel(self): '''Cancels the execution of this experiment on the backend. @@ -830,7 +834,8 @@ class Experiment(Shareable): if self_.status not in (Experiment.SCHEDULED, Experiment.RUNNING): return - for b in self.blocks.all(): b._cancel() + with transaction.atomic(): + for b in self.blocks.all(): b._cancel() def fork(self, username=None, name=None): @@ -985,13 +990,12 @@ class Block(models.Model): results = property(lambda self: self.__return_first__('results')) - @transaction.atomic def _schedule(self): '''Schedules this block for execution at the backend To "schedule" means solely creating a :py:class:`..backend.models.Job` - pointing to this object. This method should only be called by the - owning experiment. + pointing to this object. This method **should only be called by the + owning experiment**. It is not part of the Block's public API. ''' # lock self - avoids concurrent update from scheduler/worker subsystem @@ -1025,7 +1029,7 @@ class Block(models.Model): # checks if the job is immediately runnable - if so, tries to # make it runnable (check caches and other) - if self.is_runnable(): self.job.make_runnable() + if self.is_runnable(): self.job._make_runnable() def done(self): @@ -1034,11 +1038,11 @@ class Block(models.Model): return self.status not in (Block.NOT_CACHED, Block.PROCESSING) - @transaction.atomic def _cancel(self): '''Cancels the execution of this block on the backend. - This method should only be called from the experiment equivalent. + This method should only be called from the experiment equivalent. It is + not part of the Block's public API. ''' # lock self - avoids concurrent update from scheduler/worker subsystem @@ -1051,7 +1055,7 @@ class Block(models.Model): else: self.status = Block.CANCELLED self.save() - self.experiment.update_state() + self.experiment._update_state() def is_runnable(self): @@ -1070,10 +1074,9 @@ class Block(models.Model): if any([k.status in (Block.FAILED, Block.CANCELLED) \ for k in b.dependencies.all()]): b._cancel() - if b.is_runnable(): b.job.make_runnable() + if b.is_runnable(): b.job._make_runnable() - @transaction.atomic def _update_state(self, timings=None): '''Updates self state as a result of backend running @@ -1090,7 +1093,7 @@ class Block(models.Model): This method is supposed to be called only by the underlying job - instance. + instance. It is not part of the Block's public API. ''' @@ -1173,7 +1176,7 @@ class Block(models.Model): self.save() self._cascade_updates() - self.experiment.update_state() + self.experiment._update_state() #---------------------------------------------------------- diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 731e0928a0509d0745b4159d30d1302643f11b01..7e673efcf7002c158642d84828e0c1a6c341a01b 100644 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -261,6 +261,14 @@ SCHEDULING_PANEL = True # block execution (and, by consequence), the experiment. MAXIMUM_SPLIT_ERRORS = 1 #attempts to split without errors +# The maximum number of retries for saving (mostly at start() and end()), job +# splits from remote processes. If you're using a SQLite database backend, this +# number should be higher than 1 (recommended value is 3 to 5). In case you're +# using another database, then this can value can be ignored. If set to a value +# of one or more and there are "database is locked" errors, then job split +# saving at ``start()`` or ``end()`` will be retried with a 1-second interval. +MAXIMUM_SPLIT_SAVE_RETRIES = 5 + # The default user error is a message string that is set upon the failure of # execution of a particular block if the root cause of the problem is NOT a # user error, but the systems'. In this case, the system administrators receive