helpers.py 32.4 KB
Newer Older
1 2 3 4 5
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
6
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/           #
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################

28 29 30 31 32 33
import glob
import logging
import os
from datetime import datetime

import simplejson
34
from django.conf import settings
35
from django.db import transaction
36 37
from django.db.models import Count
from django.db.models import Q
38

39 40 41 42
import beat.core.algorithm
import beat.core.data
import beat.core.hash
from beat.core.utils import NumpyJSONEncoder
43

44
from ..experiments.models import Block
45
from ..experiments.models import CachedFile
46
from ..experiments.models import Experiment
47
from ..experiments.models import Result as CacheResult
48
from .models import Job
49
from .models import JobSplit
50
from .models import Queue
51
from .models import Result
52
from .models import Worker
53

54
logger = logging.getLogger(__name__)
55 56


57
@transaction.atomic
58
def schedule_experiment(experiment):
59
    """Schedules the experiment for execution at the backend
60 61 62 63 64 65

    Scheduling an experiment only means creating one :py:class:`.models.Job`
    instance for each block of the experiment.

    This function is expected to be called on the web server. The Scheduler
    is tasked to notice the newly-scheduled experiment and execute it.
66
    """
67 68 69 70 71 72 73 74 75 76 77

    # Lock the experiment, so nobody else can modify it
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Can't schedule an experiment not in the PENDING state
    if experiment.status != Experiment.PENDING:
        return

    # Check that the queues and environments of all the blocks are still valid
    for block in experiment.blocks.all():
        if block.queue is None:
78 79 80 81 82 83 84
            raise RuntimeError(
                "Block `%s' does not have a queue assigned "
                "- this normally indicates the originally selected "
                "queue was deleted since the experiment was first "
                "configured. Re-configure this experiment and select a new "
                "default or block-specific queue" % block.name
            )
85 86

        if block.environment is None:
87 88 89 90 91 92 93
            raise RuntimeError(
                "Block `%s' does not have an environment "
                "assigned - this normally indicates the originally selected "
                "environment was deleted since the experiment was first "
                "configured. Re-configure this experiment and select a new "
                "default or block-specific environment" % block.name
            )
94 95

    # Process all the blocks of the experiment
96 97
    already_done = True

98 99 100 101
    for block in experiment.blocks.all():
        # Lock the block, so nobody else can modify it
        block = Block.objects.select_for_update().get(pk=block.pk)

102
        # Check if the block outputs aren't already in the cache
103 104 105 106 107 108
        must_skip = all(
            [
                cached_file.status == CachedFile.CACHED
                for cached_file in block.outputs.all()
            ]
        )
109

110 111
        if must_skip:
            block.status = Block.DONE
112 113 114
            block.creation_date = datetime.now()
            block.start_date = block.creation_date
            block.end_date = block.creation_date
115
            block.save()
116

117 118 119 120
            if block.analyzer:
                for cached_file in block.outputs.all():
                    load_results_from_cache(block, cached_file)

121
        else:
122
            Job.objects.create_job(block)
123
            block.status = Block.PENDING
124 125
            block.creation_date = datetime.now()
            block.save()
126 127 128 129 130 131 132 133 134
            already_done = False

    # Mark the experiment as scheduled (or done)
    if already_done:
        experiment.start_date = datetime.now()
        experiment.end_date = experiment.start_date
        experiment.status = Experiment.DONE
    else:
        experiment.status = Experiment.SCHEDULED
135 136 137 138

    experiment.save()


139
# ----------------------------------------------------------
140 141


142
@transaction.atomic
143
def cancel_experiment(experiment):
144
    """Cancel the execution of the experiment on the backend
145 146 147 148 149

    Cancelling an experiment only means marking the experiment as 'cancelling'.

    This function is expected to be called on the web server. The Scheduler
    is tasked to notice the newly-cancelled experiment and does what it takes.
150
    """
151 152 153 154 155 156 157 158 159 160 161 162 163

    # Lock the experiment, so nobody else can modify it
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Can't cancel an experiment not started or already finished
    if experiment.status not in [Experiment.SCHEDULED, Experiment.RUNNING]:
        return

    # Mark the experiment as cancelling
    experiment.status = Experiment.CANCELLING
    experiment.save()


164
# ----------------------------------------------------------
165 166 167


def split_new_jobs():
168
    """Retrieve all the jobs not already splitted, and create the appropriate splits"""
169 170 171 172 173 174 175 176

    def _process(candidate_jobs):
        additional_jobs = []

        # Iterate through all the candidate jobs
        for job in candidate_jobs:

            # Check that the files weren't generated since the scheduling of the job
177 178 179 180 181 182
            must_skip = all(
                [
                    cached_file.status == CachedFile.CACHED
                    for cached_file in job.block.outputs.all()
                ]
            )
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198

            if must_skip:
                job.block.status = Block.DONE
                job.block.start_date = datetime.now()
                job.block.end_date = job.block.start_date
                job.block.save()

                additional_jobs.extend(update_dependent_jobs(job))
                if len(additional_jobs) == 0:
                    update_experiment(job.block.experiment)

                job.delete()
                continue

            # Check that the job isn't a mirror of an currently running one
            nb_existing_splits = JobSplit.objects.filter(
199 200 201 202
                ~Q(status=JobSplit.QUEUED) | Q(worker__isnull=False),
                job__key=job.key,
                job__runnable_date__isnull=False,
                job__mirror=False,
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
            ).count()

            if nb_existing_splits > 0:
                job.mirror = True
                job.save()
                continue

            # Create the splits
            JobSplit.objects.create_splits(job)

        return additional_jobs

    # First retrieve all the candidate jobs from the database, process them and
    # if the processing mark any other job as a candidate one, process it too,
    # recursively, until no candidate job is left
218 219 220
    candidate_jobs = Job.objects.annotate(nb_splits=Count("splits")).filter(
        runnable_date__isnull=False, mirror=False, nb_splits=0
    )
221 222 223 224 225

    while len(candidate_jobs) > 0:
        candidate_jobs = _process(candidate_jobs)


226
# ----------------------------------------------------------
227 228 229


def process_newly_cancelled_experiments():
230
    """Retrieve all the experiments that must be cancelled, and do it"""
231 232 233 234 235 236 237 238

    # Retrieve all the experiments marked as cancelling
    cancelling_experiments = Experiment.objects.filter(status=Experiment.CANCELLING)

    splits_to_cancel = []

    for experiment in cancelling_experiments:
        # Only process those which have blocks that aren't already cancelling
239 240 241 242 243 244
        if (
            experiment.blocks.filter(
                Q(status=Block.PENDING) | Q(status=Block.PROCESSING)
            ).count()
            == 0
        ):
245 246 247 248 249 250 251 252 253 254 255 256
            continue

        new_splits_to_cancel = cancel_all_blocks(experiment)

        if len(new_splits_to_cancel) == 0:
            update_experiment(experiment)

        splits_to_cancel.extend(new_splits_to_cancel)

    return splits_to_cancel


257
# ----------------------------------------------------------
258 259 260


def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT):
261
    """Check that an entry of the cache is complete
262 263 264 265 266 267 268

    Due to the distributed nature of the platform, with volumes shared by
    several different machines, a (hopefully) small delay might occur between
    the writing of a file in the cache on a processing node and its availability
    on the current machine.

    This function checks that all the necessary files are there, and complete.
269
    """
270 271

    def _extract_indices_from_filename(filename):
272
        parts = filename.split(".")
273 274 275
        return (int(parts[-3]), int(parts[-2]), filename)

    def _verify_checksum(filename):
276
        checksum_file = filename + ".checksum"
277 278

        try:
279
            with open(checksum_file, "rt") as f:
280 281 282
                recorded = f.read().strip()

            actual = beat.core.hash.hashFileContents(filename)
283
        except Exception:
284 285
            return False

286
        return actual == recorded
287 288 289 290

    # Retrieve all the index files
    abs_path = os.path.join(cache, path)

291 292
    index_files = glob.glob(abs_path + "*.index")
    index_files = sorted([_extract_indices_from_filename(x) for x in index_files])
293 294 295 296

    # Check that there is no missing index
    if len(index_files) > 1:
        for i in range(1, len(index_files)):
297
            if index_files[i][0] != index_files[i - 1][1] + 1:
298 299 300 301 302 303 304 305 306 307 308
                return False

    # Sum the number of blocks represented by each index file
    nb_blocks = 0
    for start, end, index_file in index_files:

        # Check that the file is complete
        if not _verify_checksum(index_file):
            return False

        # Check that the data file is complete
309
        data_file = index_file.replace(".index", ".data")
310 311 312 313
        if not _verify_checksum(data_file):
            return False

        # Retrieve the number of blocks from the file
314
        with open(index_file, "rt") as f:
315 316 317
            lines = f.readlines()
            nb_blocks += len(lines)

318
    return nb_blocks == nb_expected_blocks
319 320


321
# ----------------------------------------------------------
322 323 324


def assign_splits_to_workers():
325
    """Assign existing job splits to available workers from the appropriate queues"""
326 327

    # Retrieve the queues in a good order
328
    queues = Queue.objects.order_by("-cores_per_slot", "max_slots_per_user")
329 330

    # Retrieve the candidate jobs on each queue
331 332 333 334
    candidate_splits_per_queue = [
        (q, retrieve_candidate_splits_for_queue(q)) for q in queues
    ]
    candidate_splits_per_queue = [x for x in candidate_splits_per_queue if x[1]]
335 336 337 338

    if not candidate_splits_per_queue:
        return []

339
    logger.debug("Considering splits: %s", candidate_splits_per_queue)
340 341

    # Build a "white list" of available workers
342 343 344 345 346 347
    whitelist = dict(
        [
            (worker, worker.available_cores())
            for worker in Worker.objects.filter(active=True)
        ]
    )
348

349
    logger.debug("Worker availability: %s", whitelist)
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374

    # Process the candidates of each queue
    assigned_splits = []

    for queue, candidate_splits in candidate_splits_per_queue:

        candidate_workers = queue.worker_availability()
        required_cores = queue.cores_per_slot

        for candidate_split in candidate_splits:

            # Check that the job wasn't marked as a mirror during a previous
            # iteration
            candidate_split.job.refresh_from_db()
            if candidate_split.job.mirror:
                continue

            # Search an available worker
            for candidate_worker in candidate_workers:

                # Check that there are enough available cores on the worker
                available_cores = whitelist.get(candidate_worker, 0)
                if available_cores < required_cores:
                    continue

375 376 377
                logger.debug(
                    "Assigning `%s' to worker `%s'", candidate_split, candidate_worker
                )
378 379 380 381 382 383 384

                assign_split_to_worker(candidate_split, candidate_worker)
                assigned_splits.append(candidate_split)

                mark_similar_jobs_as_mirror(candidate_split.job)

                whitelist[candidate_worker] -= required_cores
385 386 387 388 389
                logger.debug(
                    "`%s' cores available: %d",
                    candidate_worker,
                    whitelist[candidate_worker],
                )
390 391
                break

392
    return JobSplit.objects.filter(id__in=[x.id for x in assigned_splits])
393 394


395
# ----------------------------------------------------------
396 397 398


def get_configuration_for_split(split):
399
    """Retrieve the configuration to be used to execute the provided job split
400
    on a worker node
401
    """
402 403 404 405

    # Retrieve the block configuration
    configuration = simplejson.loads(str(split.job.block.command))

406 407 408
    if split.job.block.experiment.author.has_perm("can_share_databases"):
        configuration["share_databases"] = True

409 410
    # (If necessary) Add the infos needed to access the database files
    if settings.DATASETS_UID is not None:
411
        configuration["datasets_uid"] = settings.DATASETS_UID
412 413

    if settings.DATASETS_ROOT_PATH is not None:
414
        configuration["datasets_root_path"] = settings.DATASETS_ROOT_PATH
415 416 417

    # (If necessary) Add the range of indices to process
    if (split.start_index is not None) and (split.end_index is not None):
418
        configuration["range"] = [split.start_index, split.end_index]
419 420 421 422

    return configuration


423
# ----------------------------------------------------------
424 425 426


def on_split_started(split):
427
    """Must be called each time a split job is started"""
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

    now = datetime.now()

    # Mark the split job as running
    split.status = JobSplit.PROCESSING
    split.start_date = now
    split.save()

    # (If necessary) Mark the job and block as running
    split.job.refresh_from_db()
    if split.job.start_date is None:
        split.job.start_date = now
        split.job.save()

        split.job.block.status = Block.PROCESSING
        split.job.block.start_date = now
        split.job.block.save()

        # (If necessary) Mark the experiment as running
        split.job.block.experiment.refresh_from_db()
        if split.job.block.experiment.status == Experiment.SCHEDULED:
            split.job.block.experiment.status = Experiment.RUNNING
            split.job.block.experiment.start_date = now
            split.job.block.experiment.save()

        # Mark the mirror jobs and their blocks as running
        mirror_jobs = Job.objects.filter(key=split.job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.start_date = now
            mirror_job.save()

            mirror_job.block.status = Block.PROCESSING
            mirror_job.block.start_date = now
            mirror_job.block.save()

            # (If necessary) Mark the experiment as running
            if mirror_job.block.experiment.status == Experiment.SCHEDULED:
                mirror_job.block.experiment.status = Experiment.RUNNING
                mirror_job.block.experiment.start_date = now
                mirror_job.block.experiment.save()


470
# ----------------------------------------------------------
471 472 473


def on_split_done(split, result):
474
    """Must be called each time a split job is successfully completed"""
475 476

    result = Result(
477 478 479 480 481
        status=result["status"],
        stdout=result["stdout"],
        stderr=result["stderr"],
        usrerr=result["user_error"],
        _stats=simplejson.dumps(result["statistics"], indent=2),
482 483 484 485 486 487 488 489 490 491 492 493
    )
    result.save()

    split.status = JobSplit.COMPLETED
    split.end_date = datetime.now()
    split.result = result
    split.worker = None
    split.save()

    update_job(split.job)


494
# ----------------------------------------------------------
495 496 497


def on_split_fail(split, result):
498
    """Must be called each time a split job is successfully completed"""
499 500 501

    if isinstance(result, dict):
        result = Result(
502 503 504 505 506
            status=result["status"],
            stdout=result["stdout"],
            stderr=result["stderr"],
            usrerr=result["user_error"],
            _stats=simplejson.dumps(result["statistics"], indent=2),
507 508
        )
    else:
509
        result = Result(status=1, stdout="", stderr=result, usrerr="",)
510 511 512 513 514 515 516 517 518 519 520 521

    result.save()

    split.status = JobSplit.FAILED
    split.end_date = datetime.now()
    split.result = result
    split.worker = None
    split.save()

    return update_job(split.job)


522
# ----------------------------------------------------------
523 524 525


def on_split_cancelled(split):
526
    """Must be called each time a split job is successfully cancelled"""
527 528 529 530 531 532 533 534 535

    split.status = JobSplit.CANCELLED
    split.end_date = datetime.now()
    split.worker = None
    split.save()

    return update_job(split.job)


536
# ----------------------------------------------------------
537 538 539


def retrieve_candidate_splits_for_queue(queue):
540
    """Retrieve the splits assigned to the given queue that could be considered
541
    for execution
542
    """
543 544

    # Retrieve the pending jobs assigned to the queue, from oldest to newest
545 546 547
    splits = JobSplit.objects.filter(
        job__block__queue=queue, status=JobSplit.QUEUED, worker__isnull=True
    ).order_by("job__runnable_date")
548 549

    # Retrieve the list of the users that submitted those jobs
550
    users = set(splits.values_list("job__block__experiment__author", flat=True))
551 552

    # Determine how much slots each user is already using on the queue
553 554 555 556 557 558 559 560
    user_current_slots = [
        JobSplit.objects.filter(
            job__block__experiment__author=k,
            job__block__queue=queue,
            status=JobSplit.PROCESSING,
        ).count()
        for k in users
    ]
561 562

    # Determine how much slots each user is still afforded on the queue
563
    allowance = [queue.max_slots_per_user - k for k in user_current_slots]
564 565 566 567 568 569 570 571 572 573 574 575 576 577
    allowance = dict(zip(users, allowance))

    # Limit runnable splits so we reach a maximum of allowed user slots
    candidates = []
    for split in splits:
        author = split.job.block.experiment.author.id
        if allowance[author] > 0:
            candidates.append(split)
            allowance[author] -= 1

    # Return the list of candidates splits
    return candidates


578
# ----------------------------------------------------------
579 580


581
@transaction.atomic
582
def assign_split_to_worker(split, worker):
583
    """Schedules the split to be executed on a given worker"""
584 585 586 587 588 589 590

    split = JobSplit.objects.select_for_update().get(pk=split.pk)
    worker = Worker.objects.select_for_update().get(pk=worker.pk)

    split.worker = worker
    split.save()

591 592 593 594 595 596
    logger.info(
        "Job split %s scheduled at `%s' was assigned to `%s'",
        split,
        split.job.block.queue,
        worker,
    )
597 598


599
# ----------------------------------------------------------
600 601


602
@transaction.atomic
603
def mark_similar_jobs_as_mirror(job):
604
    """Mark all similar jobs as mirror, and delete their job splits"""
605

606 607 608
    similar_jobs = (
        Job.objects.select_for_update().filter(key=job.key).exclude(pk=job.pk)
    )
609 610 611 612 613 614 615 616 617 618 619

    for similar_job in similar_jobs:
        similar_job.mirror = True
        similar_job.save()

        for split in similar_job.splits.all():
            split.delete()

        logger.info("Job `%s' is now marked as a mirror of `%s'", similar_job, job)


620
# ----------------------------------------------------------
621 622 623 624 625


def update_job(job):
    def _collect_results(splits):
        cached_files_infos = dict(
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
            cpu_time=0.0,
            max_memory=0,
            stdout="",
            stderr="",
            error_report="",
            data_read_size=0,
            data_written_size=0,
            data_read_nb_blocks=0,
            data_written_nb_blocks=0,
            data_read_time=0.0,
            data_written_time=0.0,
            queuing_time=0.0,
            linear_execution_time=0.0,
            speed_up_real=1.0,
            speed_up_maximal=1.0,
641 642 643 644 645 646 647 648 649
        )

        split_durations = []

        for split in splits:
            split_durations.append((split.end_date - split.start_date).total_seconds())

            statistics = split.result.stats

650 651 652 653
            cached_files_infos["cpu_time"] += (
                statistics.cpu["user"] + statistics.cpu["system"]
            )
            cached_files_infos["max_memory"] += statistics.memory["rss"]
654

655
            header = ""
656
            if split.start_index is not None:
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
                header = "Split #%d (from indices %d to %d):" % (
                    split.split_index,
                    split.start_index,
                    split.end_index,
                )
                header += "\n" + ("=" * len(header)) + "\n"

            stdout = split.result.stdout if split.result.stdout != "\n" else ""
            stderr = split.result.stderr if split.result.stderr != "\n" else ""

            if stdout != "":
                cached_files_infos["stdout"] += header + stdout + "\n"

            if stderr != "":
                cached_files_infos["stderr"] += header + stderr + "\n"

            if split.result.usrerr != "":
                cached_files_infos["error_report"] += (
                    header + split.result.usrerr + "\n"
                )

            if "volume" in statistics.data:
                cached_files_infos["data_read_size"] += statistics.data["volume"].get(
                    "read", 0
                )
                cached_files_infos["data_written_size"] += statistics.data[
                    "volume"
                ].get("write", 0)

            if "blocks" in statistics.data:
                cached_files_infos["data_read_nb_blocks"] += statistics.data[
                    "blocks"
                ].get("read", 0)
                cached_files_infos["data_written_nb_blocks"] += statistics.data[
                    "blocks"
                ].get("write", 0)

            if "time" in statistics.data:
                cached_files_infos["data_read_time"] += statistics.data["time"].get(
                    "read", 0
                )
                cached_files_infos["data_written_time"] += statistics.data["time"].get(
                    "write", 0
                )
701 702 703

        job = splits[0].job

704 705 706 707
        cached_files_infos["queuing_time"] = (
            job.start_date - job.runnable_date
        ).total_seconds()
        cached_files_infos["linear_execution_time"] = sum(split_durations)
708 709

        if job.block.required_slots > 1:
710 711 712 713 714 715 716
            cached_files_infos["speed_up_real"] = (
                float(cached_files_infos["linear_execution_time"])
                / (job.end_date - job.start_date).total_seconds()
            )
            cached_files_infos["speed_up_maximal"] = float(
                cached_files_infos["linear_execution_time"]
            ) / max(split_durations)
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738

        return cached_files_infos

    splits_to_cancel = []

    # If the job is failed
    if job.splits.filter(status=JobSplit.FAILED).count() > 0:

        # Mark queued splits of the same job as cancelled
        for split in job.splits.filter(status=JobSplit.QUEUED):
            split.status = JobSplit.CANCELLED
            split.start_date = datetime.now()
            split.end_date = split.start_date
            split.save()

        # Cancel running splits
        splits_to_cancel = list(job.splits.filter(status=JobSplit.PROCESSING).all())
        for split in splits_to_cancel:
            split.status = JobSplit.CANCELLING
            split.save()

    # Check that all the splits are done
739 740 741 742 743 744 745 746
    if (
        job.splits.filter(
            Q(status=JobSplit.QUEUED)
            | Q(status=JobSplit.PROCESSING)
            | Q(status=JobSplit.CANCELLING)
        ).count()
        > 0
    ):
747 748 749
        return splits_to_cancel

    # Save the end date
750
    job.end_date = job.splits.order_by("-end_date")[0].end_date
751 752 753 754 755 756
    job.save()

    # Did the job fail?
    if job.splits.filter(status=JobSplit.FAILED).count() > 0:

        # (If necessary) Update the cached files
757 758 759
        splits = job.splits.filter(
            Q(status=JobSplit.FAILED) | Q(status=JobSplit.COMPLETED)
        )
760 761 762 763 764 765 766
        cached_files_infos = _collect_results(splits)
        job.block.outputs.update(**cached_files_infos)

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.FAILED)

        # Update the block
767
        job.block.set_failed(job.end_date)
768 769 770 771 772 773 774 775 776 777 778 779 780

        # Cancel all the remaining blocks of the experiment
        splits_to_cancel.extend(cancel_all_blocks(job.block.experiment))

        # Update the experiment
        update_experiment(job.block.experiment)

        # Mark the blocks of the mirror jobs as failed
        mirror_jobs = Job.objects.filter(key=job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.end_date = job.end_date
            mirror_job.save()

781
            mirror_job.block.set_failed(job.end_date)
782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803

            # Cancel all the remaining blocks of the experiment
            splits_to_cancel.extend(cancel_all_blocks(mirror_job.block.experiment))

            # Update the experiment
            update_experiment(mirror_job.block.experiment)

        mirror_jobs.delete()

        # Delete the job
        job.delete()

    # Did the job succeed?
    elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0:

        # Update the cached files
        cached_files_infos = _collect_results(job.splits.all())
        job.block.outputs.update(**cached_files_infos)

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.DONE)

804 805 806
            if job.block.analyzer:
                load_results_from_cache(job.block, cached_file)

807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
        # Update the block
        job.block.status = Block.DONE
        job.block.end_date = job.end_date
        job.block.save()

        # Update the dependent jobs
        additional_jobs = update_dependent_jobs(job)

        # (If necessary) Update the experiment
        if len(additional_jobs) == 0:
            update_experiment(job.block.experiment)

        # Mark the blocks of the mirror jobs as completed
        mirror_jobs = Job.objects.filter(key=job.key, mirror=True)
        for mirror_job in mirror_jobs:
            mirror_job.block.status = Block.DONE
            mirror_job.block.end_date = job.end_date
            mirror_job.block.save()

826
            # Update the dependent jobs
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
            additional_jobs = update_dependent_jobs(mirror_job)

            # (If necessary) Update the experiment
            if len(additional_jobs) == 0:
                update_experiment(mirror_job.block.experiment)

        mirror_jobs.delete()

        # Delete the job
        job.delete()

    # Was the job cancelled?
    elif job.splits.filter(status=JobSplit.CANCELLED).count() > 0:

        for cached_file in job.block.outputs.all():
            cached_file.update(Block.CANCELLED)

        # Update the block
845
        job.block.set_canceled(job.end_date)
846 847 848 849 850 851 852 853 854 855

        # Update the experiment
        update_experiment(job.block.experiment)

        # Delete the job
        job.delete()

    return splits_to_cancel


856
# ----------------------------------------------------------
857 858 859


def update_dependent_jobs(job):
860
    """Mark the dependent jobs of the provided one as runnable
861 862

    Intended to be called after a job is done
863
    """
864 865 866 867 868 869 870 871 872 873 874 875

    updated_jobs = []

    for dependent_block in job.block.dependents.all():
        if dependent_block.is_runnable():
            dependent_block.job.runnable_date = datetime.now()
            dependent_block.job.save()
            updated_jobs.append(dependent_block.job)

    return updated_jobs


876
# ----------------------------------------------------------
877 878 879


def cancel_all_blocks(experiment):
880
    """Mark the all the blocks of the provided experiment as cancelled
881 882

    Intended to be called after a job has failed
883
    """
884 885 886 887

    splits_to_cancel = []

    # Retrieve all the blocks to cancel
888 889 890
    blocks_to_cancel = experiment.blocks.filter(
        Q(status=Block.PROCESSING) | Q(status=Block.PENDING)
    ).exclude(job__mirror=True)
891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920

    for block in blocks_to_cancel:

        # If a mirror job exists, reassign any existing split
        mirror_jobs = Job.objects.filter(key=block.job.key, mirror=True)
        if len(mirror_jobs) > 0:
            mirror_job = mirror_jobs[0]
            mirror_job.mirror = False
            mirror_job.save()

            for split in block.job.splits.all():
                split.job = mirror_job
                split.save()

        else:
            # Queued splits: Mark them as cancelled
            for split in block.job.splits.filter(status=JobSplit.QUEUED):
                split.status = JobSplit.CANCELLED
                split.start_date = datetime.now()
                split.end_date = split.start_date
                split.save()

            # Processing splits splits: Cancel them
            for split in block.job.splits.filter(status=JobSplit.PROCESSING):
                split.status = JobSplit.CANCELLING
                split.save()
                splits_to_cancel.append(split)

        # (If possible) Mark the block as cancelled
        if block.job.splits.filter(status=JobSplit.CANCELLING).count() == 0:
921
            block.set_canceled()
922 923 924
            block.job.delete()

    # Retrieve all the mirror blocks
925 926 927
    mirror_blocks_to_cancel = experiment.blocks.filter(
        Q(status=Block.PROCESSING) | Q(status=Block.PENDING)
    ).filter(job__mirror=True)
928 929

    for block in mirror_blocks_to_cancel:
930
        block.set_canceled()
931 932 933 934 935
        block.job.delete()

    return splits_to_cancel


936
# ----------------------------------------------------------
937 938


939
@transaction.atomic
940 941 942 943 944 945
def update_experiment(experiment):
    experiment = Experiment.objects.select_for_update().get(pk=experiment.pk)

    # Experiment done?
    if experiment.blocks.exclude(status=Block.DONE).count() == 0:
        experiment.status = Experiment.DONE
946
        experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
947 948 949 950 951 952
        experiment.save()

    # Experiment failed?
    elif experiment.blocks.filter(status=Block.FAILED).count() > 0:
        if experiment.blocks.filter(status=Block.PROCESSING).count() == 0:
            experiment.status = Experiment.FAILED
953
            experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
954 955 956 957 958 959
            experiment.save()

    # Experiment cancelled?
    elif experiment.blocks.filter(status=Block.CANCELLED).count() > 0:
        if experiment.blocks.filter(status=Block.PROCESSING).count() == 0:
            experiment.status = Experiment.PENDING
960
            experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date
961
            experiment.save()
962 963


964
# ----------------------------------------------------------
965 966 967 968 969 970 971 972 973 974


def load_results_from_cache(block, cached_file):
    if not block.analyzer:
        return

    if cached_file.results.count() > 0:
        return

    data_source = beat.core.data.CachedDataSource()
975 976 977 978
    data_source.setup(
        os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(cached_file.hash)),
        settings.PREFIX,
    )
979

980
    (output_data, start_index, end_index) = data_source[0]
981 982
    if output_data is not None:
        algorithm = beat.core.algorithm.Algorithm(
983 984
            settings.PREFIX, block.algorithm.fullname()
        )
985 986

        for field, value in output_data.as_dict().items():
987 988
            res, _ = CacheResult.objects.get_or_create(name=field, cache=cached_file)
            res.primary = algorithm.results[field]["display"]
989 990
            res.type = algorithm.results[field]["type"]

991
            if res.type in ["int32", "float32", "bool", "string"]:
992 993
                res.data_value = str(value)
            else:
994
                res.data_value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder)
995 996 997 998

            res.save()

    data_source.close()