Commit 706bdf51 authored by Philip ABBET's avatar Philip ABBET
Browse files

[experiments, backend] Move all scheduling-related code into helper functions of the backend module

parent 6ef27ec3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import transaction
from ..experiments.models import Experiment
from ..experiments.models import Block
from .models import Job
@transaction.atomic
def schedule_experiment(experiment):
'''Schedules the experiment for execution at the backend
Scheduling an experiment only means creating one :py:class:`.models.Job`
instance for each block of the experiment.
This function is expected to be called on the web server. The Scheduler
is tasked to notice the newly-scheduled experiment and execute it.
'''
# Lock the experiment, so nobody else can modify it
experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)
# Can't schedule an experiment not in the PENDING state
if experiment.status != Experiment.PENDING:
return
# Check that the queues and environments of all the blocks are still valid
for block in experiment.blocks.all():
if block.queue is None:
raise RuntimeError("Block `%s' does not have a queue assigned " \
"- this normally indicates the originally selected " \
"queue was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific queue" % block.name)
if block.environment is None:
raise RuntimeError("Block `%s' does not have an environment " \
"assigned - this normally indicates the originally selected " \
"environment was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific environment" % block.name)
# Process all the blocks of the experiment
for block in experiment.blocks.all():
# Lock the block, so nobody else can modify it
block = Block.objects.select_for_update().get(pk=block.pk)
# search for other jobs with similar outputs that have no children yet
# do this carefully, as other experiments may be scheduled at the same
# time, invalidating our "parent" choice
parent = Job.objects.filter(block__outputs__in=block.outputs.all(),
child=None).first()
if parent is not None: #(candidate only) try to lock it
while True:
parent = Job.objects.select_for_update().get(pk=parent.pk)
if parent.child_ is not None: #was taken meanwhile, retry
parent = parent.child
continue
job = Job(block=block, parent=parent)
break
else:
job = Job(block=block)
job.save()
# Mark the experiment as scheduled
experiment.status = Experiment.SCHEDULED
experiment.save()
#----------------------------------------------------------
......@@ -196,61 +196,6 @@ class Block(models.Model):
results = property(lambda self: self.__return_first__('results'))
def _schedule(self):
'''Schedules this block for execution at the backend
To "schedule" means solely creating a :py:class:`..backend.models.Job`
pointing to this object. This method **should only be called by the
owning experiment**. It is not part of the Block's public API.
'''
# lock self - avoids concurrent update from scheduler/worker subsystem
self_ = Block.objects.select_for_update().get(pk=self.pk)
# checks we have not, meanwhile, been cancelled
if self_.done():
return
# checks queue and environment
if self.queue is None:
raise RuntimeError("Block `%s' does not have a queue assigned " \
"- this normally indicates the originally selected " \
"queue was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific queue" % self.name)
if self.environment is None:
raise RuntimeError("Block `%s' does not have an environment " \
"assigned - this normally indicates the originally selected " \
"environment was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific environment" % self.name)
# search for other jobs with similar outputs that have no children yet
# do this carefully, as other experiments may be scheduled at the same
# time, invalidating our "parent" choice
parent = Job.objects.filter(block__outputs__in=self.outputs.all(),
child=None).first()
if parent is not None: #(candidate only) try to lock it
while True:
parent = Job.objects.select_for_update().get(pk=parent.pk)
if parent.child_ is not None: #was taken meanwhile, retry
parent = parent.child
continue
job = Job(block=self, parent=parent)
break
else:
job = Job(block=self)
job.save()
# checks if the job is immediately runnable - if so, tries to
# make it runnable (check caches and other)
if self.is_runnable():
self.job._make_runnable()
def done(self):
'''Says whether the block has finished or not'''
......
......@@ -818,35 +818,11 @@ class Experiment(Shareable):
self.save()
@transaction.atomic
def schedule(self):
'''Schedules this experiment for execution at the backend
Because the experiment is fully built on ``save()`` (including block
interdependence and cache requirements), to "schedule" means solely
creating :py:class:`..backend.models.Job`'s to address all
algorithm-equipped blocks in the experiment. A ``Job`` is the
reflection of the experiment's block for the backend and makes the
schedule aware of execution units that must be processed. Each ``Job``
is then split on the scheduler process, for as many times as required
by the :py:class:`Block`'s ``required_slots`` entry, effectively
creating one :py:class:`..backend.models.JobSplit` per split.
'''
self_ = Experiment.objects.select_for_update().get(pk=self.pk)
'''Schedules this experiment for execution at the backend'''
if self_.status != Experiment.PENDING:
return
for block in self.blocks.all():
block._schedule()
# notice that the previous call may decide all is done already
# so, we must respect that before setting the SCHEDULED status
self.refresh_from_db()
if not self.is_done():
self.status = Experiment.SCHEDULED
self.save()
from ...backend.helpers import schedule_experiment
schedule_experiment(self)
@transaction.atomic
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment