diff --git a/beat/web/backend/migrations/0002_scheduler_addons.py b/beat/web/backend/migrations/0002_scheduler_addons.py index ee14da860549143d2ff2c478d93ebe05eddbbead..8bda50ee8a09beb5c089160911345d89830970e8 100644 --- a/beat/web/backend/migrations/0002_scheduler_addons.py +++ b/beat/web/backend/migrations/0002_scheduler_addons.py @@ -173,6 +173,7 @@ class Migration(migrations.Migration): ('split_index', models.PositiveIntegerField()), ('start_index', models.PositiveIntegerField(null=True)), ('end_index', models.PositiveIntegerField(null=True)), + ('cache_errors', models.PositiveIntegerField(default=0)), ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Kill')])), ('start_date', models.DateTimeField(null=True)), ('end_date', models.DateTimeField(null=True)), diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 4839e9ea62154d09a46100c28d063516153da307..5c828de6116e11444ed795fe80076ded5beb18dc 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -674,7 +674,7 @@ class Job(models.Model): self.split_errors += 1 self.save() - if self.split_errors >= settings.MAXIMUM_SPLIT_ERRORS: #stop + if self.split_errors > settings.MAXIMUM_SPLIT_ERRORS: #stop from traceback import format_exc message = "Index splitting for block `%s' of experiment " \ "`%s' could not be completed due to an index split " \ @@ -893,6 +893,8 @@ class JobSplit(models.Model): end_index = models.PositiveIntegerField(null=True) + cache_errors = models.PositiveIntegerField(default=0) + status = models.CharField(max_length=1, choices=Job.STATUS, default=Job.QUEUED) @@ -951,6 +953,41 @@ class JobSplit(models.Model): self, self.job.block.queue, self.worker) + def signal_io_error(self): + '''Marks the split as having an IOError (cache sync issues, likely) + ''' + + tries = 0 + + while True: + + try: + + with transaction.atomic(): + + # lock self - avoids concurrent update from + # scheduler/worker subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + + if self_.start_date is not None: return + + self.cache_errors += 1 + self.save() + + break + + except utils.OperationalError: + tries += 1 + if tries > settings.MAXIMUM_SPLIT_SAVE_RETRIES: + raise + else: + logger.info("Database error caught starting `%s': retrying " \ + "in 1 second (%d/%d)...", self, tries, + settings.MAXIMUM_SPLIT_SAVE_RETRIES) + # wait a second and retry + time.sleep(1) + + def start(self): '''Marks the job as started, acknowledging scheduler assignment diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index e515e92307eb187409058c6397254c78e36e8834..aa18347bb4084e9beddefb60a3b231e2301fe07a 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -308,7 +308,6 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): try: split = JobSplit.objects.get(pk=split_pk) - split.start() except JobSplit.DoesNotExist: logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ "so, ignoring.", split_pk) @@ -324,7 +323,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): "block `%s' of experiment `%s' could not " \ "be completed: indexes are missing!" % \ (split.split_index+1, split.job.block.required_slots, - split.process_id, split.worker, split.job.block.name, + os.getpid(), split.worker, split.job.block.name, split.job.block.experiment.fullname()) logger.warn(message) _split_end(split, Result(status=1, syserr=message, @@ -366,6 +365,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): # n.b.: with executor may crash on the database view setup with executor: + split.start() result = executor.process( execute_path=execute, virtual_memory_in_megabytes=queue.memory_limit, @@ -375,7 +375,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): daemon=0, ) - split.end(Result( + _split_end(split, Result( status=result['status'], stdout=result['stdout'], stderr=result['stderr'], @@ -383,13 +383,24 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): syserr=result['system_error'], _stats=simplejson.dumps(result['statistics'], indent=2), )) - logger.info("Process `%s' for split `%s' ended gracefully", - split.process_id, split) + logger.info("Split `%s' (pid=%d) ended gracefully", split, os.getpid()) - except: + except IOError: from traceback import format_exc - logger.warn("Process `%s' for split `%s' ended with an error: %s", - split.process_id, split, format_exc()) + logger.warn("Split `%s' (pid=%d) execution raised an IOError: %s", + split, os.getpid(), format_exc()) + split.signal_io_error() + if split.cache_errors > settings.MAXIMUM_IO_ERRORS: + _split_end(split, Result(status=1, + usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) + else: + logger.info("Split `%s' will be retried (%d/%d)", + split, split.cache_errors, settings.MAXIMUM_IO_ERRORS) + + except Exception: + from traceback import format_exc + logger.warn("Split `%s' (pid=%d) ended with an error: %s", + split, os.getpid(), format_exc()) _split_end(split, Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),))