helpers.py 32.3 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
6
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

28
29
30
31
32
33
import glob
import logging
import os
from datetime import datetime

import simplejson
34
from django.conf import settings
35
from django.db import transaction
36
37
from django.db.models import Count
from django.db.models import Q
38

39
40
41
42
import beat.core.algorithm
import beat.core.data
import beat.core.hash
from beat.core.utils import NumpyJSONEncoder
43

44
from ..experiments.models import Block
45
from ..experiments.models import CachedFile
46
from ..experiments.models import Experiment
47
from ..experiments.models import Result as CacheResult
48
from .models import Job
49
from .models import JobSplit
50
from .models import Queue
51
from .models import Result
52
from .models import Worker
53

54
logger = logging.getLogger(__name__)
55
56


57
@transaction.atomic
58
def schedule_experiment(experiment):
59
    """Schedules the experiment for execution at the backend
60
61
62
63
64
65

    Scheduling an experiment only means creating one :py:class:`.models.Job`
    instance for each block of the experiment.

    This function is expected to be called on the web server. The Scheduler
    is tasked to notice the newly-scheduled experiment and execute it.
66
    """
67
68
69
70
71
72
73
74
75
76
77

    # Lock the experiment, so nobody else can modify it
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Can't schedule an experiment not in the PENDING state
    if experiment.status != Experiment.PENDING:
        return

    # Check that the queues and environments of all the blocks are still valid
    for block in experiment.blocks.all():
        if block.queue is None:
78
79
80
81
82
83
84
            raise RuntimeError(
                "Block `%s' does not have a queue assigned "
                "- this normally indicates the originally selected "
                "queue was deleted since the experiment was first "
                "configured. Re-configure this experiment and select a new "
                "default or block-specific queue" % block.name
            )
85
86

        if block.environment is None:
87
88
89
90
91
92
93
            raise RuntimeError(
                "Block `%s' does not have an environment "
                "assigned - this normally indicates the originally selected "
                "environment was deleted since the experiment was first "
                "configured. Re-configure this experiment and select a new "
                "default or block-specific environment" % block.name
            )
94
95

    # Process all the blocks of the experiment
96
97
    already_done = True

98
99
100
101
    for block in experiment.blocks.all():
        # Lock the block, so nobody else can modify it
        block = Block.objects.select_for_update().get(pk=block.pk)

102
        # Check if the block outputs aren't already in the cache
103
104
105
106
107
108
        must_skip = all(
            [
                cached_file.status == CachedFile.CACHED
                for cached_file in block.outputs.all()
            ]
        )
109

110
111
        if must_skip:
            block.status = Block.DONE
112
113
114
            block.creation_date = datetime.now()
            block.start_date = block.creation_date
            block.end_date = block.creation_date
115
            block.save()
116

117
118
119
120
            if block.analyzer:
                for cached_file in block.outputs.all():
                    load_results_from_cache(block, cached_file)

121
        else:
122
            Job.objects.create_job(block)
123
            block.status = Block.PENDING
124
125
            block.creation_date = datetime.now()
            block.save()
126
127
128
129
130
131
132
133
134
            already_done = False

    # Mark the experiment as scheduled (or done)
    if already_done:
        experiment.start_date = datetime.now()
        experiment.end_date = experiment.start_date
        experiment.status = Experiment.DONE
    else:
        experiment.status = Experiment.SCHEDULED
135
136
137
138

    experiment.save()


139
# ----------------------------------------------------------
140
141


142
@transaction.atomic
143
def cancel_experiment(experiment):
144
    """Cancel the execution of the experiment on the backend
145
146
147
148
149

    Cancelling an experiment only means marking the experiment as 'cancelling'.

    This function is expected to be called on the web server. The Scheduler
    is tasked to notice the newly-cancelled experiment and does what it takes.
150
    """
151
152
153
154
155
156
157
158
159
160
161
162
163

    # Lock the experiment, so nobody else can modify it
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Can't cancel an experiment not started or already finished
    if experiment.status not in [Experiment.SCHEDULED, Experiment.RUNNING]:
        return

    # Mark the experiment as cancelling
    experiment.status = Experiment.CANCELLING
    experiment.save()


164
# ----------------------------------------------------------
165
166
167


def split_new_jobs():
168
    """Retrieve all the jobs not already splitted, and create the appropriate splits"""
169
170
171
172
173
174
175
176

    def _process(candidate_jobs):
        additional_jobs = []

        # Iterate through all the candidate jobs
        for job in candidate_jobs:

            # Check that the files weren't generated since the scheduling of the job
177
178
179
180
181
182
            must_skip = all(
                [
                    cached_file.status == CachedFile.CACHED
                    for cached_file in job.block.outputs.all()
                ]
            )
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198

            if must_skip:
                job.block.status = Block.DONE
                job.block.start_date = datetime.now()
                job.block.end_date = job.block.start_date
                job.block.save()

                additional_jobs.extend(update_dependent_jobs(job))
                if len(additional_jobs) == 0:
                    update_experiment(job.block.experiment)

                job.delete()
                continue

            # Check that the job isn't a mirror of an currently running one
            nb_existing_splits = JobSplit.objects.filter(
199
200
201
202
                ~Q(status=JobSplit.QUEUED) | Q(worker__isnull=False),
                job__key=job.key,
                job__runnable_date__isnull=False,
                job__mirror=False,
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
            ).count()

            if nb_existing_splits > 0:
                job.mirror = True
                job.save()
                continue

            # Create the splits
            JobSplit.objects.create_splits(job)

        return additional_jobs

    # First retrieve all the candidate jobs from the database, process them and
    # if the processing mark any other job as a candidate one, process it too,
    # recursively, until no candidate job is left
218
219
220
    candidate_jobs = Job.objects.annotate(nb_splits=Count("splits")).filter(
        runnable_date__isnull=False, mirror=False, nb_splits=0
    )
221
222
223
224
225

    while len(candidate_jobs) > 0:
        candidate_jobs = _process(candidate_jobs)


226
# ----------------------------------------------------------
227
228
229


def process_newly_cancelled_experiments():
230
    """Retrieve all the experiments that must be cancelled, and do it"""
231
232
233
234
235
236
237
238

    # Retrieve all the experiments marked as cancelling
    cancelling_experiments = Experiment.objects.filter(status=Experiment.CANCELLING)

    splits_to_cancel = []

    for experiment in cancelling_experiments:
        # Only process those which have blocks that aren't already cancelling
239
240
241
242
243
244
        if (
            experiment.blocks.filter(
                Q(status=Block.PENDING) | Q(status=Block.PROCESSING)
            ).count()
            == 0
        ):
245
246
247
248
249
250
251
252
253
254
255
256
            continue

        new_splits_to_cancel = cancel_all_blocks(experiment)

        if len(new_splits_to_cancel) == 0:
            update_experiment(experiment)

        splits_to_cancel.extend(new_splits_to_cancel)

    return splits_to_cancel


257
# ----------------------------------------------------------
258
259
260


def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT):
261
    """Check that an entry of the cache is complete
262
263
264
265
266
267
268

    Due to the distributed nature of the platform, with volumes shared by
    several different machines, a (hopefully) small delay might occur between
    the writing of a file in the cache on a processing node and its availability
    on the current machine.

    This function checks that all the necessary files are there, and complete.
269
    """
270
271

    def _extract_indices_from_filename(filename):
272
        parts = filename.split(".")
273
274
275
        return (int(parts[-3]), int(parts[-2]), filename)

    def _verify_checksum(filename):
276
        checksum_file = filename + ".checksum"
277
278

        try:
279
            with open(checksum_file, "rt") as f:
280
281
282
                recorded = f.read().strip()

            actual = beat.core.hash.hashFileContents(filename)
283
        except Exception:
284
285
            return False

286
        return actual == recorded
287
288
289
290

    # Retrieve all the index files
    abs_path = os.path.join(cache, path)

291
292
    index_files = glob.glob(abs_path + "*.index")
    index_files = sorted([_extract_indices_from_filename(x) for x in index_files])
293
294
295
296

    # Check that there is no missing index
    if len(index_files) > 1:
        for i in range(1, len(index_files)):
297
            if index_files[i][0] != index_files[i - 1][1] + 1:
298
299
300
301
302
303
304
305
306
307
308
                return False

    # Sum the number of blocks represented by each index file
    nb_blocks = 0
    for start, end, index_file in index_files:

        # Check that the file is complete
        if not _verify_checksum(index_file):
            return False

        # Check that the data file is complete
309
        data_file = index_file.replace(".index", ".data")
310
311
312
313
        if not _verify_checksum(data_file):
            return False

        # Retrieve the number of blocks from the file
314
        with open(index_file, "rt") as f:
315
316
317
            lines = f.readlines()
            nb_blocks += len(lines)

318
    return nb_blocks == nb_expected_blocks
319
320


321
# ----------------------------------------------------------
322
323
324


def assign_splits_to_workers():
325
    """Assign existing job splits to available workers from the appropriate queues"""
326
327

    # Retrieve the queues in a good order
328
    queues = Queue.objects.order_by("-cores_per_slot", "max_slots_per_user")
329
330

    # Retrieve the candidate jobs on each queue
331
332
333
334
    candidate_splits_per_queue = [
        (q, retrieve_candidate_splits_for_queue(q)) for q in queues
    ]
    candidate_splits_per_queue = [x for x in candidate_splits_per_queue if x[1]]
335
336
337
338

    if not candidate_splits_per_queue:
        return []

339
    logger.debug("Considering splits: %s", candidate_splits_per_queue)
340
341

    # Build a "white list" of available workers
342
343
344
345
346
347
    whitelist = dict(
        [
            (worker, worker.available_cores())
            for worker in Worker.objects.filter(active=True)
        ]
    )
348

349
    logger.debug("Worker availability: %s", whitelist)
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374

    # Process the candidates of each queue
    assigned_splits = []

    for queue, candidate_splits in candidate_splits_per_queue:

        candidate_workers = queue.worker_availability()
        required_cores = queue.cores_per_slot

        for candidate_split in candidate_splits:

            # Check that the job wasn't marked as a mirror during a previous
            # iteration
            candidate_split.job.refresh_from_db()
            if candidate_split.job.mirror:
                continue

            # Search an available worker
            for candidate_worker in candidate_workers:

                # Check that there are enough available cores on the worker
                available_cores = whitelist.get(candidate_worker, 0)
                if available_cores < required_cores:
                    continue

375
376
377
                logger.debug(
                    "Assigning `%s' to worker `%s'", candidate_split, candidate_worker
                )
378
379
380
381
382
383
384

                assign_split_to_worker(candidate_split, candidate_worker)
                assigned_splits.append(candidate_split)

                mark_similar_jobs_as_mirror(candidate_split.job)

                whitelist[candidate_worker] -= required_cores
385
386
387
388
389
                logger.debug(
                    "`%s' cores available: %d",
                    candidate_worker,
                    whitelist[candidate_worker],
                )
390
391
                break

392
    return JobSplit.objects.filter(id__in=[x.id for x in assigned_splits])
393
394


395
# ----------------------------------------------------------
396
397
398


def get_configuration_for_split(split):
399
    """Retrieve the configuration to be used to execute the provided job split
400
    on a worker node
401
    """
402
403
404
405
406
407

    # Retrieve the block configuration
    configuration = simplejson.loads(str(split.job.block.command))

    # (If necessary) Add the infos needed to access the database files
    if settings.DATASETS_UID is not None:
408
        configuration["datasets_uid"] = settings.DATASETS_UID
409
410

    if settings.DATASETS_ROOT_PATH is not None:
411
        configuration["datasets_root_path"] = settings.DATASETS_ROOT_PATH
412
413
414

    # (If necessary) Add the range of indices to process
    if (split.start_index is not None) and (split.end_index is not None):
415
        configuration["range"] = [split.start_index, split.end_index]
416
417
418
419

    return configuration


420
# ----------------------------------------------------------
421
422
423


def on_split_started(split):
424
    """Must be called each time a split job is started"""
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466

    now = datetime.now()

    # Mark the split job as running
    split.status = JobSplit.PROCESSING
    split.start_date = now
    split.save()

    # (If necessary) Mark the job and block as running
    split.job.refresh_from_db()
    if split.job.start_date is None:
        split.job.start_date = now
        split.job.save()

        split.job.block.status = Block.PROCESSING
        split.job.block.start_date = now
        split.job.block.save()

        # (If necessary) Mark the experiment as running
        split.job.block.experiment.refresh_from_db()
        if split.job.block.experiment.status == Experiment.SCHEDULED:
            split.job.block.experiment.status = Experiment.RUNNING
            split.job.block.experiment.start_date = now
            split.job.block.experiment.save()

        # Mark the mirror jobs and their blocks as running
        mirror_jobs = Job.objects.filter(key=split.job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.start_date = now
            mirror_job.save()

            mirror_job.block.status = Block.PROCESSING
            mirror_job.block.start_date = now
            mirror_job.block.save()

            # (If necessary) Mark the experiment as running
            if mirror_job.block.experiment.status == Experiment.SCHEDULED:
                mirror_job.block.experiment.status = Experiment.RUNNING
                mirror_job.block.experiment.start_date = now
                mirror_job.block.experiment.save()


467
# ----------------------------------------------------------
468
469
470


def on_split_done(split, result):
471
    """Must be called each time a split job is successfully completed"""
472
473

    result = Result(
474
475
476
477
478
        status=result["status"],
        stdout=result["stdout"],
        stderr=result["stderr"],
        usrerr=result["user_error"],
        _stats=simplejson.dumps(result["statistics"], indent=2),
479
480
481
482
483
484
485
486
487
488
489
490
    )
    result.save()

    split.status = JobSplit.COMPLETED
    split.end_date = datetime.now()
    split.result = result
    split.worker = None
    split.save()

    update_job(split.job)


491
# ----------------------------------------------------------
492
493
494


def on_split_fail(split, result):
495
    """Must be called each time a split job is successfully completed"""
496
497
498

    if isinstance(result, dict):
        result = Result(
499
500
501
502
503
            status=result["status"],
            stdout=result["stdout"],
            stderr=result["stderr"],
            usrerr=result["user_error"],
            _stats=simplejson.dumps(result["statistics"], indent=2),
504
505
        )
    else:
506
        result = Result(status=1, stdout="", stderr=result, usrerr="",)
507
508
509
510
511
512
513
514
515
516
517
518

    result.save()

    split.status = JobSplit.FAILED
    split.end_date = datetime.now()
    split.result = result
    split.worker = None
    split.save()

    return update_job(split.job)


519
# ----------------------------------------------------------
520
521
522


def on_split_cancelled(split):
523
    """Must be called each time a split job is successfully cancelled"""
524
525
526
527
528
529
530
531
532

    split.status = JobSplit.CANCELLED
    split.end_date = datetime.now()
    split.worker = None
    split.save()

    return update_job(split.job)


533
# ----------------------------------------------------------
534
535
536


def retrieve_candidate_splits_for_queue(queue):
537
    """Retrieve the splits assigned to the given queue that could be considered
538
    for execution
539
    """
540
541

    # Retrieve the pending jobs assigned to the queue, from oldest to newest
542
543
544
    splits = JobSplit.objects.filter(
        job__block__queue=queue, status=JobSplit.QUEUED, worker__isnull=True
    ).order_by("job__runnable_date")
545
546

    # Retrieve the list of the users that submitted those jobs
547
    users = set(splits.values_list("job__block__experiment__author", flat=True))
548
549

    # Determine how much slots each user is already using on the queue
550
551
552
553
554
555
556
557
    user_current_slots = [
        JobSplit.objects.filter(
            job__block__experiment__author=k,
            job__block__queue=queue,
            status=JobSplit.PROCESSING,
        ).count()
        for k in users
    ]
558
559

    # Determine how much slots each user is still afforded on the queue
560
    allowance = [queue.max_slots_per_user - k for k in user_current_slots]
561
562
563
564
565
566
567
568
569
570
571
572
573
574
    allowance = dict(zip(users, allowance))

    # Limit runnable splits so we reach a maximum of allowed user slots
    candidates = []
    for split in splits:
        author = split.job.block.experiment.author.id
        if allowance[author] > 0:
            candidates.append(split)
            allowance[author] -= 1

    # Return the list of candidates splits
    return candidates


575
# ----------------------------------------------------------
576
577


578
@transaction.atomic
579
def assign_split_to_worker(split, worker):
580
    """Schedules the split to be executed on a given worker"""
581
582
583
584
585
586
587

    split = JobSplit.objects.select_for_update().get(pk=split.pk)
    worker = Worker.objects.select_for_update().get(pk=worker.pk)

    split.worker = worker
    split.save()

588
589
590
591
592
593
    logger.info(
        "Job split %s scheduled at `%s' was assigned to `%s'",
        split,
        split.job.block.queue,
        worker,
    )
594
595


596
# ----------------------------------------------------------
597
598


599
@transaction.atomic
600
def mark_similar_jobs_as_mirror(job):
601
    """Mark all similar jobs as mirror, and delete their job splits"""
602

603
604
605
    similar_jobs = (
        Job.objects.select_for_update().filter(key=job.key).exclude(pk=job.pk)
    )
606
607
608
609
610
611
612
613
614
615
616

    for similar_job in similar_jobs:
        similar_job.mirror = True
        similar_job.save()

        for split in similar_job.splits.all():
            split.delete()

        logger.info("Job `%s' is now marked as a mirror of `%s'", similar_job, job)


617
# ----------------------------------------------------------
618
619
620
621
622


def update_job(job):
    def _collect_results(splits):
        cached_files_infos = dict(
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
            cpu_time=0.0,
            max_memory=0,
            stdout="",
            stderr="",
            error_report="",
            data_read_size=0,
            data_written_size=0,
            data_read_nb_blocks=0,
            data_written_nb_blocks=0,
            data_read_time=0.0,
            data_written_time=0.0,
            queuing_time=0.0,
            linear_execution_time=0.0,
            speed_up_real=1.0,
            speed_up_maximal=1.0,
638
639
640
641
642
643
644
645
646
        )

        split_durations = []

        for split in splits:
            split_durations.append((split.end_date - split.start_date).total_seconds())

            statistics = split.result.stats

647
648
649
650
            cached_files_infos["cpu_time"] += (
                statistics.cpu["user"] + statistics.cpu["system"]
            )
            cached_files_infos["max_memory"] += statistics.memory["rss"]
651

652
            header = ""
653
            if split.start_index is not None:
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
                header = "Split #%d (from indices %d to %d):" % (
                    split.split_index,
                    split.start_index,
                    split.end_index,
                )
                header += "\n" + ("=" * len(header)) + "\n"

            stdout = split.result.stdout if split.result.stdout != "\n" else ""
            stderr = split.result.stderr if split.result.stderr != "\n" else ""

            if stdout != "":
                cached_files_infos["stdout"] += header + stdout + "\n"

            if stderr != "":
                cached_files_infos["stderr"] += header + stderr + "\n"

            if split.result.usrerr != "":
                cached_files_infos["error_report"] += (
                    header + split.result.usrerr + "\n"
                )

            if "volume" in statistics.data:
                cached_files_infos["data_read_size"] += statistics.data["volume"].get(
                    "read", 0
                )
                cached_files_infos["data_written_size"] += statistics.data[
                    "volume"
                ].get("write", 0)

            if "blocks" in statistics.data:
                cached_files_infos["data_read_nb_blocks"] += statistics.data[
                    "blocks"
                ].get("read", 0)
                cached_files_infos["data_written_nb_blocks"] += statistics.data[
                    "blocks"
                ].get("write", 0)

            if "time" in statistics.data:
                cached_files_infos["data_read_time"] += statistics.data["time"].get(
                    "read", 0
                )
                cached_files_infos["data_written_time"] += statistics.data["time"].get(
                    "write", 0
                )
698
699
700

        job = splits[0].job

701
702
703
704
        cached_files_infos["queuing_time"] = (
            job.start_date - job.runnable_date
        ).total_seconds()
        cached_files_infos["linear_execution_time"] = sum(split_durations)
705
706

        if job.block.required_slots > 1:
707
708
709
710
711
712
713
            cached_files_infos["speed_up_real"] = (
                float(cached_files_infos["linear_execution_time"])
                / (job.end_date - job.start_date).total_seconds()
            )
            cached_files_infos["speed_up_maximal"] = float(
                cached_files_infos["linear_execution_time"]
            ) / max(split_durations)
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735

        return cached_files_infos

    splits_to_cancel = []

    # If the job is failed
    if job.splits.filter(status=JobSplit.FAILED).count() > 0:

        # Mark queued splits of the same job as cancelled
        for split in job.splits.filter(status=JobSplit.QUEUED):
            split.status = JobSplit.CANCELLED
            split.start_date = datetime.now()
            split.end_date = split.start_date
            split.save()

        # Cancel running splits
        splits_to_cancel = list(job.splits.filter(status=JobSplit.PROCESSING).all())
        for split in splits_to_cancel:
            split.status = JobSplit.CANCELLING
            split.save()

    # Check that all the splits are done
736
737
738
739
740
741
742
743
    if (
        job.splits.filter(
            Q(status=JobSplit.QUEUED)
            | Q(status=JobSplit.PROCESSING)
            | Q(status=JobSplit.CANCELLING)
        ).count()
        > 0
    ):
744
745
746
        return splits_to_cancel

    # Save the end date
747
    job.end_date = job.splits.order_by("-end_date")[0].end_date
748
749
750
751
752
753
    job.save()

    # Did the job fail?
    if job.splits.filter(status=JobSplit.FAILED).count() > 0:

        # (If necessary) Update the cached files
754
755
756
        splits = job.splits.filter(
            Q(status=JobSplit.FAILED) | Q(status=JobSplit.COMPLETED)
        )
757
758
759
760
761
762
763
        cached_files_infos = _collect_results(splits)
        job.block.outputs.update(**cached_files_infos)

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.FAILED)

        # Update the block
764
        job.block.set_failed(job.end_date)
765
766
767
768
769
770
771
772
773
774
775
776
777

        # Cancel all the remaining blocks of the experiment
        splits_to_cancel.extend(cancel_all_blocks(job.block.experiment))

        # Update the experiment
        update_experiment(job.block.experiment)

        # Mark the blocks of the mirror jobs as failed
        mirror_jobs = Job.objects.filter(key=job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.end_date = job.end_date
            mirror_job.save()

778
            mirror_job.block.set_failed(job.end_date)
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800

            # Cancel all the remaining blocks of the experiment
            splits_to_cancel.extend(cancel_all_blocks(mirror_job.block.experiment))

            # Update the experiment
            update_experiment(mirror_job.block.experiment)

        mirror_jobs.delete()

        # Delete the job
        job.delete()

    # Did the job succeed?
    elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0:

        # Update the cached files
        cached_files_infos = _collect_results(job.splits.all())
        job.block.outputs.update(**cached_files_infos)

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.DONE)

801
802
803
            if job.block.analyzer:
                load_results_from_cache(job.block, cached_file)

804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
        # Update the block
        job.block.status = Block.DONE
        job.block.end_date = job.end_date
        job.block.save()

        # Update the dependent jobs
        additional_jobs = update_dependent_jobs(job)

        # (If necessary) Update the experiment
        if len(additional_jobs) == 0:
            update_experiment(job.block.experiment)

        # Mark the blocks of the mirror jobs as completed
        mirror_jobs = Job.objects.filter(key=job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.block.status = Block.DONE
            mirror_job.block.end_date = job.end_date
            mirror_job.block.save()

823
            # Update the dependent jobs
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
            additional_jobs = update_dependent_jobs(mirror_job)

            # (If necessary) Update the experiment
            if len(additional_jobs) == 0:
                update_experiment(mirror_job.block.experiment)

        mirror_jobs.delete()

        # Delete the job
        job.delete()

    # Was the job cancelled?
    elif job.splits.filter(status=JobSplit.CANCELLED).count() > 0:

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.CANCELLED)

        # Update the block
842
        job.block.set_canceled(job.end_date)
843
844
845
846
847
848
849
850
851
852

        # Update the experiment
        update_experiment(job.block.experiment)

        # Delete the job
        job.delete()

    return splits_to_cancel


853
# ----------------------------------------------------------
854
855
856


def update_dependent_jobs(job):
857
    """Mark the dependent jobs of the provided one as runnable
858
859

    Intended to be called after a job is done
860
    """
861
862
863
864
865
866
867
868
869
870
871
872

    updated_jobs = []

    for dependent_block in job.block.dependents.all():
        if dependent_block.is_runnable():
            dependent_block.job.runnable_date = datetime.now()
            dependent_block.job.save()
            updated_jobs.append(dependent_block.job)

    return updated_jobs


873
# ----------------------------------------------------------
874
875
876


def cancel_all_blocks(experiment):
877
    """Mark the all the blocks of the provided experiment as cancelled
878
879

    Intended to be called after a job has failed
880
    """
881
882
883
884

    splits_to_cancel = []

    # Retrieve all the blocks to cancel
885
886
887
    blocks_to_cancel = experiment.blocks.filter(
        Q(status=Block.PROCESSING) | Q(status=Block.PENDING)
    ).exclude(job__mirror=True)
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917

    for block in blocks_to_cancel:

        # If a mirror job exists, reassign any existing split
        mirror_jobs = Job.objects.filter(key=block.job.key, mirror=True)
        if len(mirror_jobs) > 0:
            mirror_job = mirror_jobs[0]
            mirror_job.mirror = False
            mirror_job.save()

            for split in block.job.splits.all():
                split.job = mirror_job
                split.save()

        else:
            # Queued splits: Mark them as cancelled
            for split in block.job.splits.filter(status=JobSplit.QUEUED):
                split.status = JobSplit.CANCELLED
                split.start_date = datetime.now()
                split.end_date = split.start_date
                split.save()

            # Processing splits splits: Cancel them
            for split in block.job.splits.filter(status=JobSplit.PROCESSING):
                split.status = JobSplit.CANCELLING
                split.save()
                splits_to_cancel.append(split)

        # (If possible) Mark the block as cancelled
        if block.job.splits.filter(status=JobSplit.CANCELLING).count() == 0:
918
            block.set_canceled()
919
920
921
            block.job.delete()

    # Retrieve all the mirror blocks
922
923
924
    mirror_blocks_to_cancel = experiment.blocks.filter(
        Q(status=Block.PROCESSING) | Q(status=Block.PENDING)
    ).filter(job__mirror=True)
925
926

    for block in mirror_blocks_to_cancel:
927
        block.set_canceled()
928
929
930
931
932
        block.job.delete()

    return splits_to_cancel


933
# ----------------------------------------------------------
934
935


936
@transaction.atomic
937
938
939
940
941
942
def update_experiment(experiment):
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Experiment done?
    if experiment.blocks.exclude(status=Block.DONE).count() == 0:
        experiment.status = Experiment.DONE
943
        experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
944
945
946
947
948
949
        experiment.save()

    # Experiment failed?
    elif experiment.blocks.filter(status=Block.FAILED).count() > 0:
        if experiment.blocks.filter(status=Block.PROCESSING).count() == 0:
            experiment.status = Experiment.FAILED
950
            experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
951
952
953
954
955
956
            experiment.save()

    # Experiment cancelled?
    elif experiment.blocks.filter(status=Block.CANCELLED).count() > 0:
        if experiment.blocks.filter(status=Block.PROCESSING).count() == 0:
            experiment.status = Experiment.PENDING
957
            experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
958
            experiment.save()
959
960


961
# ----------------------------------------------------------
962
963
964
965
966
967
968
969
970
971


def load_results_from_cache(block, cached_file):
    if not block.analyzer:
        return

    if cached_file.results.count() > 0:
        return

    data_source = beat.core.data.CachedDataSource()
972
973
974
975
    data_source.setup(
        os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(cached_file.hash)),
        settings.PREFIX,
    )
976

977
    (output_data, start_index, end_index) = data_source[0]
978
979
    if output_data is not None:
        algorithm = beat.core.algorithm.Algorithm(
980
981
            settings.PREFIX, block.algorithm.fullname()
        )
982
983

        for field, value in output_data.as_dict().items():
984
985
            res, _ = CacheResult.objects.get_or_create(name=field, cache=cached_file)
            res.primary = algorithm.results[field]["display"]
986
987
            res.type = algorithm.results[field]["type"]

988
            if res.type in ["int32", "float32", "bool", "string"]:
989
990
                res.data_value = str(value)
            else:
991
                res.data_value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder)
992
993
994
995

            res.save()

    data_source.close()