helpers.py 5.46 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

from django.db import transaction

30
31
from datetime import datetime

32
33
from ..experiments.models import Experiment
from ..experiments.models import Block
34
from ..experiments.models import CachedFile
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from .models import Job


@transaction.atomic
def schedule_experiment(experiment):
    '''Schedules the experiment for execution at the backend

    Scheduling an experiment only means creating one :py:class:`.models.Job`
    instance for each block of the experiment.

    This function is expected to be called on the web server. The Scheduler
    is tasked to notice the newly-scheduled experiment and execute it.
    '''

    # Lock the experiment, so nobody else can modify it
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Can't schedule an experiment not in the PENDING state
    if experiment.status != Experiment.PENDING:
        return


    # Check that the queues and environments of all the blocks are still valid
    for block in experiment.blocks.all():
        if block.queue is None:
            raise RuntimeError("Block `%s' does not have a queue assigned " \
                "- this normally indicates the originally selected " \
                "queue was deleted since the experiment was first " \
                "configured. Re-configure this experiment and select a new " \
                "default or block-specific queue" % block.name)

        if block.environment is None:
            raise RuntimeError("Block `%s' does not have an environment " \
                "assigned - this normally indicates the originally selected " \
                "environment was deleted since the experiment was first " \
                "configured. Re-configure this experiment and select a new " \
                "default or block-specific environment" % block.name)


    # Process all the blocks of the experiment
75
76
    already_done = True

77
78
79
80
    for block in experiment.blocks.all():
        # Lock the block, so nobody else can modify it
        block = Block.objects.select_for_update().get(pk=block.pk)

81
82
83
        # Check if the block outputs aren't already in the cache
        must_skip = all([cached_file.status == CachedFile.CACHED
                         for cached_file in block.outputs.all()])
84

85
86
87
88
89
        if must_skip:
            block.status = Block.DONE
            block.start_date = datetime.now()
            block.end_date = block.start_date
            block.save()
90

91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
        else:
            # search for other jobs with similar outputs that have no children yet
            # do this carefully, as other experiments may be scheduled at the same
            # time, invalidating our "parent" choice
            parent = Job.objects.filter(block__outputs__in=block.outputs.all(),
                                        child=None).first()

            if parent is not None: #(candidate only) try to lock it
                while True:
                    parent = Job.objects.select_for_update().get(pk=parent.pk)
                    if parent.child_ is not None: #was taken meanwhile, retry
                        parent = parent.child
                        continue
                    job = Job(block=block, parent=parent)
                    break
            else:
                job = Job(block=block)

            job.save()

            already_done = False


    # Mark the experiment as scheduled (or done)
    if already_done:
        experiment.start_date = datetime.now()
        experiment.end_date = experiment.start_date
        experiment.status = Experiment.DONE
    else:
        experiment.status = Experiment.SCHEDULED
121
122
123
124
125
126
127

    experiment.save()


#----------------------------------------------------------