Skip to content
Snippets Groups Projects
Commit 782f1d87 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Provision to avoid cache sync errors

parent f32f79a4
No related branches found
No related tags found
1 merge request!194Scheduler
...@@ -173,6 +173,7 @@ class Migration(migrations.Migration): ...@@ -173,6 +173,7 @@ class Migration(migrations.Migration):
('split_index', models.PositiveIntegerField()), ('split_index', models.PositiveIntegerField()),
('start_index', models.PositiveIntegerField(null=True)), ('start_index', models.PositiveIntegerField(null=True)),
('end_index', models.PositiveIntegerField(null=True)), ('end_index', models.PositiveIntegerField(null=True)),
('cache_errors', models.PositiveIntegerField(default=0)),
('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Kill')])), ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Kill')])),
('start_date', models.DateTimeField(null=True)), ('start_date', models.DateTimeField(null=True)),
('end_date', models.DateTimeField(null=True)), ('end_date', models.DateTimeField(null=True)),
......
...@@ -674,7 +674,7 @@ class Job(models.Model): ...@@ -674,7 +674,7 @@ class Job(models.Model):
self.split_errors += 1 self.split_errors += 1
self.save() self.save()
if self.split_errors >= settings.MAXIMUM_SPLIT_ERRORS: #stop if self.split_errors > settings.MAXIMUM_SPLIT_ERRORS: #stop
from traceback import format_exc from traceback import format_exc
message = "Index splitting for block `%s' of experiment " \ message = "Index splitting for block `%s' of experiment " \
"`%s' could not be completed due to an index split " \ "`%s' could not be completed due to an index split " \
...@@ -893,6 +893,8 @@ class JobSplit(models.Model): ...@@ -893,6 +893,8 @@ class JobSplit(models.Model):
end_index = models.PositiveIntegerField(null=True) end_index = models.PositiveIntegerField(null=True)
cache_errors = models.PositiveIntegerField(default=0)
status = models.CharField(max_length=1, choices=Job.STATUS, status = models.CharField(max_length=1, choices=Job.STATUS,
default=Job.QUEUED) default=Job.QUEUED)
...@@ -951,6 +953,41 @@ class JobSplit(models.Model): ...@@ -951,6 +953,41 @@ class JobSplit(models.Model):
self, self.job.block.queue, self.worker) self, self.job.block.queue, self.worker)
def signal_io_error(self):
'''Marks the split as having an IOError (cache sync issues, likely)
'''
tries = 0
while True:
try:
with transaction.atomic():
# lock self - avoids concurrent update from
# scheduler/worker subsystem
self_ = JobSplit.objects.select_for_update().get(pk=self.pk)
if self_.start_date is not None: return
self.cache_errors += 1
self.save()
break
except utils.OperationalError:
tries += 1
if tries > settings.MAXIMUM_SPLIT_SAVE_RETRIES:
raise
else:
logger.info("Database error caught starting `%s': retrying " \
"in 1 second (%d/%d)...", self, tries,
settings.MAXIMUM_SPLIT_SAVE_RETRIES)
# wait a second and retry
time.sleep(1)
def start(self): def start(self):
'''Marks the job as started, acknowledging scheduler assignment '''Marks the job as started, acknowledging scheduler assignment
......
...@@ -308,7 +308,6 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -308,7 +308,6 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
try: try:
split = JobSplit.objects.get(pk=split_pk) split = JobSplit.objects.get(pk=split_pk)
split.start()
except JobSplit.DoesNotExist: except JobSplit.DoesNotExist:
logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \
"so, ignoring.", split_pk) "so, ignoring.", split_pk)
...@@ -324,7 +323,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -324,7 +323,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
"block `%s' of experiment `%s' could not " \ "block `%s' of experiment `%s' could not " \
"be completed: indexes are missing!" % \ "be completed: indexes are missing!" % \
(split.split_index+1, split.job.block.required_slots, (split.split_index+1, split.job.block.required_slots,
split.process_id, split.worker, split.job.block.name, os.getpid(), split.worker, split.job.block.name,
split.job.block.experiment.fullname()) split.job.block.experiment.fullname())
logger.warn(message) logger.warn(message)
_split_end(split, Result(status=1, syserr=message, _split_end(split, Result(status=1, syserr=message,
...@@ -366,6 +365,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -366,6 +365,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
# n.b.: with executor may crash on the database view setup # n.b.: with executor may crash on the database view setup
with executor: with executor:
split.start()
result = executor.process( result = executor.process(
execute_path=execute, execute_path=execute,
virtual_memory_in_megabytes=queue.memory_limit, virtual_memory_in_megabytes=queue.memory_limit,
...@@ -375,7 +375,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -375,7 +375,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
daemon=0, daemon=0,
) )
split.end(Result( _split_end(split, Result(
status=result['status'], status=result['status'],
stdout=result['stdout'], stdout=result['stdout'],
stderr=result['stderr'], stderr=result['stderr'],
...@@ -383,13 +383,24 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ...@@ -383,13 +383,24 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT):
syserr=result['system_error'], syserr=result['system_error'],
_stats=simplejson.dumps(result['statistics'], indent=2), _stats=simplejson.dumps(result['statistics'], indent=2),
)) ))
logger.info("Process `%s' for split `%s' ended gracefully", logger.info("Split `%s' (pid=%d) ended gracefully", split, os.getpid())
split.process_id, split)
except: except IOError:
from traceback import format_exc from traceback import format_exc
logger.warn("Process `%s' for split `%s' ended with an error: %s", logger.warn("Split `%s' (pid=%d) execution raised an IOError: %s",
split.process_id, split, format_exc()) split, os.getpid(), format_exc())
split.signal_io_error()
if split.cache_errors > settings.MAXIMUM_IO_ERRORS:
_split_end(split, Result(status=1,
usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),))
else:
logger.info("Split `%s' will be retried (%d/%d)",
split, split.cache_errors, settings.MAXIMUM_IO_ERRORS)
except Exception:
from traceback import format_exc
logger.warn("Split `%s' (pid=%d) ended with an error: %s",
split, os.getpid(), format_exc())
_split_end(split, Result(status=1, _split_end(split, Result(status=1,
usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment