#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.web module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### import glob import logging import os from datetime import datetime import simplejson from django.conf import settings from django.db import transaction from django.db.models import Count from django.db.models import Q import beat.core.algorithm import beat.core.data import beat.core.hash from beat.core.utils import NumpyJSONEncoder from ..experiments.models import Block from ..experiments.models import CachedFile from ..experiments.models import Experiment from ..experiments.models import Result as CacheResult from .models import Job from .models import JobSplit from .models import Queue from .models import Result from .models import Worker logger = logging.getLogger(__name__) @transaction.atomic def schedule_experiment(experiment): """Schedules the experiment for execution at the backend Scheduling an experiment only means creating one :py:class:`.models.Job` instance for each block of the experiment. This function is expected to be called on the web server. The Scheduler is tasked to notice the newly-scheduled experiment and execute it. """ # Lock the experiment, so nobody else can modify it experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) # Can't schedule an experiment not in the PENDING state if experiment.status != Experiment.PENDING: return # Check that the queues and environments of all the blocks are still valid for block in experiment.blocks.all(): if block.queue is None: raise RuntimeError( "Block `%s' does not have a queue assigned " "- this normally indicates the originally selected " "queue was deleted since the experiment was first " "configured. Re-configure this experiment and select a new " "default or block-specific queue" % block.name ) if block.environment is None: raise RuntimeError( "Block `%s' does not have an environment " "assigned - this normally indicates the originally selected " "environment was deleted since the experiment was first " "configured. Re-configure this experiment and select a new " "default or block-specific environment" % block.name ) # Process all the blocks of the experiment already_done = True for block in experiment.blocks.all(): # Lock the block, so nobody else can modify it block = Block.objects.select_for_update().get(pk=block.pk) # Check if the block outputs aren't already in the cache must_skip = all( [ cached_file.status == CachedFile.CACHED for cached_file in block.outputs.all() ] ) if must_skip: block.status = Block.DONE block.creation_date = datetime.now() block.start_date = block.creation_date block.end_date = block.creation_date block.save() if block.analyzer: for cached_file in block.outputs.all(): load_results_from_cache(block, cached_file) else: Job.objects.create_job(block) block.status = Block.PENDING block.creation_date = datetime.now() block.save() already_done = False # Mark the experiment as scheduled (or done) if already_done: experiment.start_date = datetime.now() experiment.end_date = experiment.start_date experiment.status = Experiment.DONE else: experiment.status = Experiment.SCHEDULED experiment.save() # ---------------------------------------------------------- @transaction.atomic def cancel_experiment(experiment): """Cancel the execution of the experiment on the backend Cancelling an experiment only means marking the experiment as 'cancelling'. This function is expected to be called on the web server. The Scheduler is tasked to notice the newly-cancelled experiment and does what it takes. """ # Lock the experiment, so nobody else can modify it experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) # Can't cancel an experiment not started or already finished if experiment.status not in [Experiment.SCHEDULED, Experiment.RUNNING]: return # Mark the experiment as cancelling experiment.status = Experiment.CANCELLING experiment.save() # ---------------------------------------------------------- def split_new_jobs(): """Retrieve all the jobs not already splitted, and create the appropriate splits""" def _process(candidate_jobs): additional_jobs = [] # Iterate through all the candidate jobs for job in candidate_jobs: # Check that the files weren't generated since the scheduling of the job must_skip = all( [ cached_file.status == CachedFile.CACHED for cached_file in job.block.outputs.all() ] ) if must_skip: job.block.status = Block.DONE job.block.start_date = datetime.now() job.block.end_date = job.block.start_date job.block.save() additional_jobs.extend(update_dependent_jobs(job)) if len(additional_jobs) == 0: update_experiment(job.block.experiment) job.delete() continue # Check that the job isn't a mirror of an currently running one nb_existing_splits = JobSplit.objects.filter( ~Q(status=JobSplit.QUEUED) | Q(worker__isnull=False), job__key=job.key, job__runnable_date__isnull=False, job__mirror=False, ).count() if nb_existing_splits > 0: job.mirror = True job.save() continue # Create the splits JobSplit.objects.create_splits(job) return additional_jobs # First retrieve all the candidate jobs from the database, process them and # if the processing mark any other job as a candidate one, process it too, # recursively, until no candidate job is left candidate_jobs = Job.objects.annotate(nb_splits=Count("splits")).filter( runnable_date__isnull=False, mirror=False, nb_splits=0 ) while len(candidate_jobs) > 0: candidate_jobs = _process(candidate_jobs) # ---------------------------------------------------------- def process_newly_cancelled_experiments(): """Retrieve all the experiments that must be cancelled, and do it""" # Retrieve all the experiments marked as cancelling cancelling_experiments = Experiment.objects.filter(status=Experiment.CANCELLING) splits_to_cancel = [] for experiment in cancelling_experiments: # Only process those which have blocks that aren't already cancelling if ( experiment.blocks.filter( Q(status=Block.PENDING) | Q(status=Block.PROCESSING) ).count() == 0 ): continue new_splits_to_cancel = cancel_all_blocks(experiment) if len(new_splits_to_cancel) == 0: update_experiment(experiment) splits_to_cancel.extend(new_splits_to_cancel) return splits_to_cancel # ---------------------------------------------------------- def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT): """Check that an entry of the cache is complete Due to the distributed nature of the platform, with volumes shared by several different machines, a (hopefully) small delay might occur between the writing of a file in the cache on a processing node and its availability on the current machine. This function checks that all the necessary files are there, and complete. """ def _extract_indices_from_filename(filename): parts = filename.split(".") return (int(parts[-3]), int(parts[-2]), filename) def _verify_checksum(filename): checksum_file = filename + ".checksum" try: with open(checksum_file, "rt") as f: recorded = f.read().strip() actual = beat.core.hash.hashFileContents(filename) except Exception: return False return actual == recorded # Retrieve all the index files abs_path = os.path.join(cache, path) index_files = glob.glob(abs_path + "*.index") index_files = sorted([_extract_indices_from_filename(x) for x in index_files]) # Check that there is no missing index if len(index_files) > 1: for i in range(1, len(index_files)): if index_files[i][0] != index_files[i - 1][1] + 1: return False # Sum the number of blocks represented by each index file nb_blocks = 0 for start, end, index_file in index_files: # Check that the file is complete if not _verify_checksum(index_file): return False # Check that the data file is complete data_file = index_file.replace(".index", ".data") if not _verify_checksum(data_file): return False # Retrieve the number of blocks from the file with open(index_file, "rt") as f: lines = f.readlines() nb_blocks += len(lines) return nb_blocks == nb_expected_blocks # ---------------------------------------------------------- def assign_splits_to_workers(): """Assign existing job splits to available workers from the appropriate queues""" # Retrieve the queues in a good order queues = Queue.objects.order_by("-cores_per_slot", "max_slots_per_user") # Retrieve the candidate jobs on each queue candidate_splits_per_queue = [ (q, retrieve_candidate_splits_for_queue(q)) for q in queues ] candidate_splits_per_queue = [x for x in candidate_splits_per_queue if x[1]] if not candidate_splits_per_queue: return [] logger.debug("Considering splits: %s", candidate_splits_per_queue) # Build a "white list" of available workers whitelist = dict( [ (worker, worker.available_cores()) for worker in Worker.objects.filter(active=True) ] ) logger.debug("Worker availability: %s", whitelist) # Process the candidates of each queue assigned_splits = [] for queue, candidate_splits in candidate_splits_per_queue: candidate_workers = queue.worker_availability() required_cores = queue.cores_per_slot for candidate_split in candidate_splits: # Check that the job wasn't marked as a mirror during a previous # iteration candidate_split.job.refresh_from_db() if candidate_split.job.mirror: continue # Search an available worker for candidate_worker in candidate_workers: # Check that there are enough available cores on the worker available_cores = whitelist.get(candidate_worker, 0) if available_cores < required_cores: continue logger.debug( "Assigning `%s' to worker `%s'", candidate_split, candidate_worker ) assign_split_to_worker(candidate_split, candidate_worker) assigned_splits.append(candidate_split) mark_similar_jobs_as_mirror(candidate_split.job) whitelist[candidate_worker] -= required_cores logger.debug( "`%s' cores available: %d", candidate_worker, whitelist[candidate_worker], ) break return JobSplit.objects.filter(id__in=[x.id for x in assigned_splits]) # ---------------------------------------------------------- def get_configuration_for_split(split): """Retrieve the configuration to be used to execute the provided job split on a worker node """ # Retrieve the block configuration configuration = simplejson.loads(str(split.job.block.command)) if split.job.block.experiment.author.has_perm("can_share_databases"): configuration["share_databases"] = True # (If necessary) Add the infos needed to access the database files if settings.DATASETS_UID is not None: configuration["datasets_uid"] = settings.DATASETS_UID if settings.DATASETS_ROOT_PATH is not None: configuration["datasets_root_path"] = settings.DATASETS_ROOT_PATH # (If necessary) Add the range of indices to process if (split.start_index is not None) and (split.end_index is not None): configuration["range"] = [split.start_index, split.end_index] return configuration # ---------------------------------------------------------- def on_split_started(split): """Must be called each time a split job is started""" now = datetime.now() # Mark the split job as running split.status = JobSplit.PROCESSING split.start_date = now split.save() # (If necessary) Mark the job and block as running split.job.refresh_from_db() if split.job.start_date is None: split.job.start_date = now split.job.save() split.job.block.status = Block.PROCESSING split.job.block.start_date = now split.job.block.save() # (If necessary) Mark the experiment as running split.job.block.experiment.refresh_from_db() if split.job.block.experiment.status == Experiment.SCHEDULED: split.job.block.experiment.status = Experiment.RUNNING split.job.block.experiment.start_date = now split.job.block.experiment.save() # Mark the mirror jobs and their blocks as running mirror_jobs = Job.objects.filter(key=split.job.key, mirror=True) for mirror_job in mirror_jobs: mirror_job.start_date = now mirror_job.save() mirror_job.block.status = Block.PROCESSING mirror_job.block.start_date = now mirror_job.block.save() # (If necessary) Mark the experiment as running if mirror_job.block.experiment.status == Experiment.SCHEDULED: mirror_job.block.experiment.status = Experiment.RUNNING mirror_job.block.experiment.start_date = now mirror_job.block.experiment.save() # ---------------------------------------------------------- def on_split_done(split, result): """Must be called each time a split job is successfully completed""" result = Result( status=result["status"], stdout=result["stdout"], stderr=result["stderr"], usrerr=result["user_error"], _stats=simplejson.dumps(result["statistics"], indent=2), ) result.save() split.status = JobSplit.COMPLETED split.end_date = datetime.now() split.result = result split.worker = None split.save() update_job(split.job) # ---------------------------------------------------------- def on_split_fail(split, result): """Must be called each time a split job is successfully completed""" if isinstance(result, dict): result = Result( status=result["status"], stdout=result["stdout"], stderr=result["stderr"], usrerr=result["user_error"], _stats=simplejson.dumps(result["statistics"], indent=2), ) else: result = Result(status=1, stdout="", stderr=result, usrerr="",) result.save() split.status = JobSplit.FAILED split.end_date = datetime.now() split.result = result split.worker = None split.save() return update_job(split.job) # ---------------------------------------------------------- def on_split_cancelled(split): """Must be called each time a split job is successfully cancelled""" split.status = JobSplit.CANCELLED split.end_date = datetime.now() split.worker = None split.save() return update_job(split.job) # ---------------------------------------------------------- def retrieve_candidate_splits_for_queue(queue): """Retrieve the splits assigned to the given queue that could be considered for execution """ # Retrieve the pending jobs assigned to the queue, from oldest to newest splits = JobSplit.objects.filter( job__block__queue=queue, status=JobSplit.QUEUED, worker__isnull=True ).order_by("job__runnable_date") # Retrieve the list of the users that submitted those jobs users = set(splits.values_list("job__block__experiment__author", flat=True)) # Determine how much slots each user is already using on the queue user_current_slots = [ JobSplit.objects.filter( job__block__experiment__author=k, job__block__queue=queue, status=JobSplit.PROCESSING, ).count() for k in users ] # Determine how much slots each user is still afforded on the queue allowance = [queue.max_slots_per_user - k for k in user_current_slots] allowance = dict(zip(users, allowance)) # Limit runnable splits so we reach a maximum of allowed user slots candidates = [] for split in splits: author = split.job.block.experiment.author.id if allowance[author] > 0: candidates.append(split) allowance[author] -= 1 # Return the list of candidates splits return candidates # ---------------------------------------------------------- @transaction.atomic def assign_split_to_worker(split, worker): """Schedules the split to be executed on a given worker""" split = JobSplit.objects.select_for_update().get(pk=split.pk) worker = Worker.objects.select_for_update().get(pk=worker.pk) split.worker = worker split.save() logger.info( "Job split %s scheduled at `%s' was assigned to `%s'", split, split.job.block.queue, worker, ) # ---------------------------------------------------------- @transaction.atomic def mark_similar_jobs_as_mirror(job): """Mark all similar jobs as mirror, and delete their job splits""" similar_jobs = ( Job.objects.select_for_update().filter(key=job.key).exclude(pk=job.pk) ) for similar_job in similar_jobs: similar_job.mirror = True similar_job.save() for split in similar_job.splits.all(): split.delete() logger.info("Job `%s' is now marked as a mirror of `%s'", similar_job, job) # ---------------------------------------------------------- def update_job(job): def _collect_results(splits): cached_files_infos = dict( cpu_time=0.0, max_memory=0, stdout="", stderr="", error_report="", data_read_size=0, data_written_size=0, data_read_nb_blocks=0, data_written_nb_blocks=0, data_read_time=0.0, data_written_time=0.0, queuing_time=0.0, linear_execution_time=0.0, speed_up_real=1.0, speed_up_maximal=1.0, ) split_durations = [] for split in splits: split_durations.append((split.end_date - split.start_date).total_seconds()) statistics = split.result.stats cached_files_infos["cpu_time"] += ( statistics.cpu["user"] + statistics.cpu["system"] ) cached_files_infos["max_memory"] += statistics.memory["rss"] header = "" if split.start_index is not None: header = "Split #%d (from indices %d to %d):" % ( split.split_index, split.start_index, split.end_index, ) header += "\n" + ("=" * len(header)) + "\n" stdout = split.result.stdout if split.result.stdout != "\n" else "" stderr = split.result.stderr if split.result.stderr != "\n" else "" if stdout != "": cached_files_infos["stdout"] += header + stdout + "\n" if stderr != "": cached_files_infos["stderr"] += header + stderr + "\n" if split.result.usrerr != "": cached_files_infos["error_report"] += ( header + split.result.usrerr + "\n" ) if "volume" in statistics.data: cached_files_infos["data_read_size"] += statistics.data["volume"].get( "read", 0 ) cached_files_infos["data_written_size"] += statistics.data[ "volume" ].get("write", 0) if "blocks" in statistics.data: cached_files_infos["data_read_nb_blocks"] += statistics.data[ "blocks" ].get("read", 0) cached_files_infos["data_written_nb_blocks"] += statistics.data[ "blocks" ].get("write", 0) if "time" in statistics.data: cached_files_infos["data_read_time"] += statistics.data["time"].get( "read", 0 ) cached_files_infos["data_written_time"] += statistics.data["time"].get( "write", 0 ) job = splits[0].job cached_files_infos["queuing_time"] = ( job.start_date - job.runnable_date ).total_seconds() cached_files_infos["linear_execution_time"] = sum(split_durations) if job.block.required_slots > 1: cached_files_infos["speed_up_real"] = ( float(cached_files_infos["linear_execution_time"]) / (job.end_date - job.start_date).total_seconds() ) cached_files_infos["speed_up_maximal"] = float( cached_files_infos["linear_execution_time"] ) / max(split_durations) return cached_files_infos splits_to_cancel = [] # If the job is failed if job.splits.filter(status=JobSplit.FAILED).count() > 0: # Mark queued splits of the same job as cancelled for split in job.splits.filter(status=JobSplit.QUEUED): split.status = JobSplit.CANCELLED split.start_date = datetime.now() split.end_date = split.start_date split.save() # Cancel running splits splits_to_cancel = list(job.splits.filter(status=JobSplit.PROCESSING).all()) for split in splits_to_cancel: split.status = JobSplit.CANCELLING split.save() # Check that all the splits are done if ( job.splits.filter( Q(status=JobSplit.QUEUED) | Q(status=JobSplit.PROCESSING) | Q(status=JobSplit.CANCELLING) ).count() > 0 ): return splits_to_cancel # Save the end date job.end_date = job.splits.order_by("-end_date")[0].end_date job.save() # Did the job fail? if job.splits.filter(status=JobSplit.FAILED).count() > 0: # (If necessary) Update the cached files splits = job.splits.filter( Q(status=JobSplit.FAILED) | Q(status=JobSplit.COMPLETED) ) cached_files_infos = _collect_results(splits) job.block.outputs.update(**cached_files_infos) for cached_file in job.block.outputs.all(): cached_file.update(Block.FAILED) # Update the block job.block.set_failed(job.end_date) # Cancel all the remaining blocks of the experiment splits_to_cancel.extend(cancel_all_blocks(job.block.experiment)) # Update the experiment update_experiment(job.block.experiment) # Mark the blocks of the mirror jobs as failed mirror_jobs = Job.objects.filter(key=job.key, mirror=True) for mirror_job in mirror_jobs: mirror_job.end_date = job.end_date mirror_job.save() mirror_job.block.set_failed(job.end_date) # Cancel all the remaining blocks of the experiment splits_to_cancel.extend(cancel_all_blocks(mirror_job.block.experiment)) # Update the experiment update_experiment(mirror_job.block.experiment) mirror_jobs.delete() # Delete the job job.delete() # Did the job succeed? elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0: # Update the cached files cached_files_infos = _collect_results(job.splits.all()) job.block.outputs.update(**cached_files_infos) for cached_file in job.block.outputs.all(): cached_file.update(Block.DONE) if job.block.analyzer: load_results_from_cache(job.block, cached_file) # Update the block job.block.status = Block.DONE job.block.end_date = job.end_date job.block.save() # Update the dependent jobs additional_jobs = update_dependent_jobs(job) # (If necessary) Update the experiment if len(additional_jobs) == 0: update_experiment(job.block.experiment) # Mark the blocks of the mirror jobs as completed mirror_jobs = Job.objects.filter(key=job.key, mirror=True) for mirror_job in mirror_jobs: mirror_job.block.status = Block.DONE mirror_job.block.end_date = job.end_date mirror_job.block.save() # Update the dependent jobs additional_jobs = update_dependent_jobs(mirror_job) # (If necessary) Update the experiment if len(additional_jobs) == 0: update_experiment(mirror_job.block.experiment) mirror_jobs.delete() # Delete the job job.delete() # Was the job cancelled? elif job.splits.filter(status=JobSplit.CANCELLED).count() > 0: for cached_file in job.block.outputs.all(): cached_file.update(Block.CANCELLED) # Update the block job.block.set_canceled(job.end_date) # Update the experiment update_experiment(job.block.experiment) # Delete the job job.delete() return splits_to_cancel # ---------------------------------------------------------- def update_dependent_jobs(job): """Mark the dependent jobs of the provided one as runnable Intended to be called after a job is done """ updated_jobs = [] for dependent_block in job.block.dependents.all(): if dependent_block.is_runnable(): dependent_block.job.runnable_date = datetime.now() dependent_block.job.save() updated_jobs.append(dependent_block.job) return updated_jobs # ---------------------------------------------------------- def cancel_all_blocks(experiment): """Mark the all the blocks of the provided experiment as cancelled Intended to be called after a job has failed """ splits_to_cancel = [] # Retrieve all the blocks to cancel blocks_to_cancel = experiment.blocks.filter( Q(status=Block.PROCESSING) | Q(status=Block.PENDING) ).exclude(job__mirror=True) for block in blocks_to_cancel: # If a mirror job exists, reassign any existing split mirror_jobs = Job.objects.filter(key=block.job.key, mirror=True) if len(mirror_jobs) > 0: mirror_job = mirror_jobs[0] mirror_job.mirror = False mirror_job.save() for split in block.job.splits.all(): split.job = mirror_job split.save() else: # Queued splits: Mark them as cancelled for split in block.job.splits.filter(status=JobSplit.QUEUED): split.status = JobSplit.CANCELLED split.start_date = datetime.now() split.end_date = split.start_date split.save() # Processing splits splits: Cancel them for split in block.job.splits.filter(status=JobSplit.PROCESSING): split.status = JobSplit.CANCELLING split.save() splits_to_cancel.append(split) # (If possible) Mark the block as cancelled if block.job.splits.filter(status=JobSplit.CANCELLING).count() == 0: block.set_canceled() block.job.delete() # Retrieve all the mirror blocks mirror_blocks_to_cancel = experiment.blocks.filter( Q(status=Block.PROCESSING) | Q(status=Block.PENDING) ).filter(job__mirror=True) for block in mirror_blocks_to_cancel: block.set_canceled() block.job.delete() return splits_to_cancel # ---------------------------------------------------------- @transaction.atomic def update_experiment(experiment): experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) # Experiment done? if experiment.blocks.exclude(status=Block.DONE).count() == 0: experiment.status = Experiment.DONE experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() # Experiment failed? elif experiment.blocks.filter(status=Block.FAILED).count() > 0: if experiment.blocks.filter(status=Block.PROCESSING).count() == 0: experiment.status = Experiment.FAILED experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() # Experiment cancelled? elif experiment.blocks.filter(status=Block.CANCELLED).count() > 0: if experiment.blocks.filter(status=Block.PROCESSING).count() == 0: experiment.status = Experiment.PENDING experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() # ---------------------------------------------------------- def load_results_from_cache(block, cached_file): if not block.analyzer: return if cached_file.results.count() > 0: return data_source = beat.core.data.CachedDataSource() data_source.setup( os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(cached_file.hash)), settings.PREFIX, ) (output_data, start_index, end_index) = data_source[0] if output_data is not None: algorithm = beat.core.algorithm.Algorithm( settings.PREFIX, block.algorithm.fullname() ) for field, value in output_data.as_dict().items(): res, _ = CacheResult.objects.get_or_create(name=field, cache=cached_file) res.primary = algorithm.results[field]["display"] res.type = algorithm.results[field]["type"] if res.type in ["int32", "float32", "bool", "string"]: res.data_value = str(value) else: res.data_value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder) res.save() data_source.close()