diff --git a/beat/web/backend/helpers.py b/beat/web/backend/helpers.py new file mode 100755 index 0000000000000000000000000000000000000000..d2e01cbba326dcab2834facae64b535c35617f00 --- /dev/null +++ b/beat/web/backend/helpers.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import transaction + +from ..experiments.models import Experiment +from ..experiments.models import Block +from .models import Job + + +@transaction.atomic +def schedule_experiment(experiment): + '''Schedules the experiment for execution at the backend + + Scheduling an experiment only means creating one :py:class:`.models.Job` + instance for each block of the experiment. + + This function is expected to be called on the web server. The Scheduler + is tasked to notice the newly-scheduled experiment and execute it. + ''' + + # Lock the experiment, so nobody else can modify it + experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) + + # Can't schedule an experiment not in the PENDING state + if experiment.status != Experiment.PENDING: + return + + + # Check that the queues and environments of all the blocks are still valid + for block in experiment.blocks.all(): + if block.queue is None: + raise RuntimeError("Block `%s' does not have a queue assigned " \ + "- this normally indicates the originally selected " \ + "queue was deleted since the experiment was first " \ + "configured. Re-configure this experiment and select a new " \ + "default or block-specific queue" % block.name) + + if block.environment is None: + raise RuntimeError("Block `%s' does not have an environment " \ + "assigned - this normally indicates the originally selected " \ + "environment was deleted since the experiment was first " \ + "configured. Re-configure this experiment and select a new " \ + "default or block-specific environment" % block.name) + + + # Process all the blocks of the experiment + for block in experiment.blocks.all(): + # Lock the block, so nobody else can modify it + block = Block.objects.select_for_update().get(pk=block.pk) + + # search for other jobs with similar outputs that have no children yet + # do this carefully, as other experiments may be scheduled at the same + # time, invalidating our "parent" choice + parent = Job.objects.filter(block__outputs__in=block.outputs.all(), + child=None).first() + + if parent is not None: #(candidate only) try to lock it + while True: + parent = Job.objects.select_for_update().get(pk=parent.pk) + if parent.child_ is not None: #was taken meanwhile, retry + parent = parent.child + continue + job = Job(block=block, parent=parent) + break + else: + job = Job(block=block) + + job.save() + + + # Mark the experiment as scheduled + experiment.status = Experiment.SCHEDULED + experiment.save() + + +#---------------------------------------------------------- + + diff --git a/beat/web/experiments/models/block.py b/beat/web/experiments/models/block.py index 8c21605e418f6adc0480edb2c34a566b22d5ad33..78bd834e0a747743200afdaf22e9122c368a45c9 100755 --- a/beat/web/experiments/models/block.py +++ b/beat/web/experiments/models/block.py @@ -196,61 +196,6 @@ class Block(models.Model): results = property(lambda self: self.__return_first__('results')) - def _schedule(self): - '''Schedules this block for execution at the backend - - To "schedule" means solely creating a :py:class:`..backend.models.Job` - pointing to this object. This method **should only be called by the - owning experiment**. It is not part of the Block's public API. - ''' - - # lock self - avoids concurrent update from scheduler/worker subsystem - self_ = Block.objects.select_for_update().get(pk=self.pk) - - # checks we have not, meanwhile, been cancelled - if self_.done(): - return - - # checks queue and environment - if self.queue is None: - raise RuntimeError("Block `%s' does not have a queue assigned " \ - "- this normally indicates the originally selected " \ - "queue was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific queue" % self.name) - - if self.environment is None: - raise RuntimeError("Block `%s' does not have an environment " \ - "assigned - this normally indicates the originally selected " \ - "environment was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific environment" % self.name) - - # search for other jobs with similar outputs that have no children yet - # do this carefully, as other experiments may be scheduled at the same - # time, invalidating our "parent" choice - parent = Job.objects.filter(block__outputs__in=self.outputs.all(), - child=None).first() - - if parent is not None: #(candidate only) try to lock it - while True: - parent = Job.objects.select_for_update().get(pk=parent.pk) - if parent.child_ is not None: #was taken meanwhile, retry - parent = parent.child - continue - job = Job(block=self, parent=parent) - break - else: - job = Job(block=self) - - job.save() - - # checks if the job is immediately runnable - if so, tries to - # make it runnable (check caches and other) - if self.is_runnable(): - self.job._make_runnable() - - def done(self): '''Says whether the block has finished or not''' diff --git a/beat/web/experiments/models/experiment.py b/beat/web/experiments/models/experiment.py index 6b4f35ccbc8f09544f347e96dc332c5c6fa33570..7fe06d1a495a774282b090cbdd029aa1993159f4 100755 --- a/beat/web/experiments/models/experiment.py +++ b/beat/web/experiments/models/experiment.py @@ -818,35 +818,11 @@ class Experiment(Shareable): self.save() - @transaction.atomic def schedule(self): - '''Schedules this experiment for execution at the backend - - Because the experiment is fully built on ``save()`` (including block - interdependence and cache requirements), to "schedule" means solely - creating :py:class:`..backend.models.Job`'s to address all - algorithm-equipped blocks in the experiment. A ``Job`` is the - reflection of the experiment's block for the backend and makes the - schedule aware of execution units that must be processed. Each ``Job`` - is then split on the scheduler process, for as many times as required - by the :py:class:`Block`'s ``required_slots`` entry, effectively - creating one :py:class:`..backend.models.JobSplit` per split. - ''' - - self_ = Experiment.objects.select_for_update().get(pk=self.pk) + '''Schedules this experiment for execution at the backend''' - if self_.status != Experiment.PENDING: - return - - for block in self.blocks.all(): - block._schedule() - - # notice that the previous call may decide all is done already - # so, we must respect that before setting the SCHEDULED status - self.refresh_from_db() - if not self.is_done(): - self.status = Experiment.SCHEDULED - self.save() + from ...backend.helpers import schedule_experiment + schedule_experiment(self) @transaction.atomic