Commit 48d6b63b authored by Philip ABBET's avatar Philip ABBET
Browse files

[backend] Refactoring of the Job mechanism

parent b3c6d35b
......@@ -201,10 +201,10 @@ class JobSplitInline(admin.TabularInline):
class Job(admin.ModelAdmin):
list_display = ('id', 'status', 'runnable_date', 'block', 'splits')
list_display = ('id', 'key', 'runnable_date', 'start_date', 'block', 'splits')
search_fields = ['block__name', 'block__experiment__name']
list_display_links = ('id', 'block')
ordering = ('runnable_date', 'id')
list_display_links = ('id', 'block', 'key')
ordering = ('runnable_date', 'start_date', 'id')
inlines = [JobSplitInline]
# to avoid very slow loading of cached files
......
This diff is collapsed.
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2017-09-29 16:54
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('backend', '0004_environmentlanguage'),
]
operations = [
migrations.RemoveField(
model_name='job',
name='parent',
),
migrations.RemoveField(
model_name='job',
name='split_errors',
),
migrations.AddField(
model_name='job',
name='key',
field=models.CharField(default='', max_length=64),
preserve_default=False,
),
migrations.AddField(
model_name='job',
name='mirror',
field=models.BooleanField(default=False),
),
migrations.RemoveField(
model_name='job',
name='status',
),
migrations.RemoveField(
model_name='jobsplit',
name='cache_errors',
),
migrations.RemoveField(
model_name='jobsplit',
name='process_id',
),
migrations.AlterField(
model_name='jobsplit',
name='status',
field=models.CharField(choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'L', b'Cancelled'), (b'K', b'Cancelling')], default=b'N', max_length=1),
),
]
This diff is collapsed.
......@@ -120,7 +120,7 @@ class Queue(models.Model):
from ..models import Job
running = JobSplit.objects.filter(job__block__in=self.blocks.all(),
status=Job.PROCESSING).count()
status=JobSplit.PROCESSING).count()
return max(self.number_of_slots() - running, 0)
......
......@@ -128,7 +128,7 @@ class Worker(models.Model):
def current_load(self):
'''Calculates the number of cores being used currently'''
return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)])
return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=JobSplit.PROCESSING)])
def available_cores(self):
......@@ -137,18 +137,18 @@ class Worker(models.Model):
return max(self.cores - self.load(), 0)
def deactivate(self, reason):
'''Deactivates the current worker for a reason, that is registered'''
self.info = reason
self.active = False
def activate(self, reason=None):
'''Reactivates the worker, deletes any associated information'''
self.info = reason
self.active = True
# def deactivate(self, reason):
# '''Deactivates the current worker for a reason, that is registered'''
#
# self.info = reason
# self.active = False
#
#
# def activate(self, reason=None):
# '''Reactivates the worker, deletes any associated information'''
#
# self.info = reason
# self.active = True
def as_dict(self):
......@@ -157,220 +157,220 @@ class Worker(models.Model):
return dict(cores=self.cores, memory=self.memory)
def check_environments(self, environments):
'''Checks that this worker has access to all environments it needs
This method will check if the found set of environments (in the
dictionary ``environments``) contains, at least, one environment for
each environment object this worker is supposed to be able to execute
user algorithms for.
Parameters:
environments (dict): A dictionary of environments found by using
:py:func:`utils.find_environments` in which, keys represent the
natural keys of Django database environments.
Returns:
list: A list of missing environments this worker can be assigned to
work with, but where not found
list: A list of unused environments this worker cannot be assigned to
work with, but where nevertheless found
'''
slots = Slot.objects.filter(worker=self)
queues = Queue.objects.filter(slots__in=slots)
wishlist = Environment.objects.filter(queues__in=queues, active=True)
wishlist = wishlist.order_by('id').distinct()
required = [k.fullname() for k in wishlist]
missing = [k for k in required if k not in environments]
unused = [k for k in environments if k not in required]
return missing, unused
def update_state(self):
'''Updates state on the database based on current machine readings'''
# check I have at least all cores and memory I'm supposed to have
cores = psutil.cpu_count()
ram = psutil.virtual_memory().total / (1024 * 1024)
self.info = ''
if cores < self.cores:
logger.warn("Worker `%s' only has %d cores which is less then " \
"the value declared on the database - it's not a problem, " \
"but note this self may get overloaded", self, cores)
self.info += 'only %d cores;' % cores
if ram < self.memory:
logger.warn("Worker `%s' only has %d Mb of RAM which is less " \
"then the value declared on the database - it's not a " \
"problem, but note this self may get overloaded", self,
ram)
self.info += 'only %d Mb of RAM;' % ram
with transaction.atomic():
self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock
# update process and memory usage
self.used_cores = int(psutil.cpu_percent())
self.used_memory = int(psutil.virtual_memory().percent)
# save current self state
self.active = True
self.update = False
self.save()
def terminate(self):
'''Cleanly terminates a particular worker at the database
.. note::
This method does not destroy running or assigned processes that may
be running or assigned to this worker. This is implemented in this
way to allow for a clean replacement of the worker program w/o an
interruption of the backend service.
'''
from ..models import JobSplit
from ..models import Job
# disables worker, so no more splits can be assigned to it
with transaction.atomic():
self_ = Worker.objects.select_for_update().get(pk=self.pk)
self_.active = False
self_.used_cores = 0
self_.used_memory = 0
self_.info = 'Worker deactivated by system administrator'
self_.save()
# cancel job splits which should be cancelled anyways
for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL,
end_date__isnull=True, process_id__isnull=False):
if psutil.pid_exists(j.process_id):
os.kill(j.process_id, signal.SIGTERM)
# cleans-up zombie processes that may linger
_cleanup_zombies()
def shutdown(self):
'''Removes all running/assigned jobs from the queue, shuts down
This method should be used with care as it may potentially cancel all
assigned splits for the current worker.
'''
from ..models import JobSplit
from ..models import Job
self.terminate()
message = 'Cancelled on forced worker shutdown (maintenance)' \
' - you may retry submitting your experiment shortly'
# cancel job splits which were not yet started
for j in JobSplit.objects.filter(worker=self, status=Job.QUEUED,
start_date__isnull=True, process_id__isnull=True):
j.end(Result(status=1, usrerr=message))
# cancel job splits which are running
for j in JobSplit.objects.filter(worker=self, status=Job.PROCESSING,
end_date__isnull=True, process_id__isnull=False):
j._cancel()
def work(self, environments, process):
'''Launches user code on isolated processes
This function is supposed to be called asynchronously, by a
scheduled agent, every few seconds. It examines job splits assigned
to the current host and launches an individual process to handle
these splits. The process is started locally and the process ID
stored with the split.
Job split cancelling is executed by setting the split state as
``CANCEL`` and waiting for this function to handle it.
Parameters:
environments (dict): A dictionary containing installed
environments, their description and execute-file paths.
process (str): The path to the ``process.py`` program to use for
running the user code on isolated processes.
'''
from ..models import JobSplit
from ..models import Job
# refresh state from database and update state if required
self.refresh_from_db()
if self.update: self.update_state()
# cancel job splits by killing associated processes
for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL,
end_date__isnull=True):
if j.process_id is not None and psutil.pid_exists(j.process_id):
os.kill(j.process_id, signal.SIGTERM)
else: # process went away without any apparent reason
with transaction.atomic():
message = "Split %d/%d running at worker `%s' for " \
"block `%s' of experiment `%s' finished before " \
"even starting. Force-cancelling job split at " \
"database..." % (j.split_index+1,
j.job.block.required_slots,
self,
j.job.block.name,
j.job.block.experiment.fullname(),
)
logger.error(message)
j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR))
# cmdline base argument
cmdline = [process]
if settings.DEBUG:
cmdline += ['-vv']
else:
cmdline += ['-v']
# start newly assigned job splits
with transaction.atomic():
splits = JobSplit.objects.select_for_update().filter(worker=self,
status=Job.QUEUED, start_date__isnull=True,
process_id__isnull=True)
for split in splits:
# if we get to this point, then we launch the user process
# -> see settings.WORKER_DETACH_CHILDREN for more info
kwargs = dict()
if settings.WORKER_DETACH_CHILDREN:
kwargs['preexec_fn'] = os.setpgrp
subprocess.Popen(cmdline + [str(split.pk)], **kwargs)
split.status = Job.PROCESSING #avoids re-running
split.save()
# cleans-up zombie processes that may linger
_cleanup_zombies()
def __enter__(self):
self.update_state()
return self
def __exit__(self, *exc):
self.terminate()
return False #propagate exceptions
# def check_environments(self, environments):
# '''Checks that this worker has access to all environments it needs
#
# This method will check if the found set of environments (in the
# dictionary ``environments``) contains, at least, one environment for
# each environment object this worker is supposed to be able to execute
# user algorithms for.
#
#
# Parameters:
#
# environments (dict): A dictionary of environments found by using
# :py:func:`utils.find_environments` in which, keys represent the
# natural keys of Django database environments.
#
#
# Returns:
#
# list: A list of missing environments this worker can be assigned to
# work with, but where not found
#
# list: A list of unused environments this worker cannot be assigned to
# work with, but where nevertheless found
#
# '''
#
# slots = Slot.objects.filter(worker=self)
# queues = Queue.objects.filter(slots__in=slots)
# wishlist = Environment.objects.filter(queues__in=queues, active=True)
# wishlist = wishlist.order_by('id').distinct()
#
# required = [k.fullname() for k in wishlist]
# missing = [k for k in required if k not in environments]
# unused = [k for k in environments if k not in required]
#
# return missing, unused
#
#
# def update_state(self):
# '''Updates state on the database based on current machine readings'''
#
# # check I have at least all cores and memory I'm supposed to have
# cores = psutil.cpu_count()
# ram = psutil.virtual_memory().total / (1024 * 1024)
# self.info = ''
#
# if cores < self.cores:
# logger.warn("Worker `%s' only has %d cores which is less then " \
# "the value declared on the database - it's not a problem, " \
# "but note this self may get overloaded", self, cores)
# self.info += 'only %d cores;' % cores
#
# if ram < self.memory:
# logger.warn("Worker `%s' only has %d Mb of RAM which is less " \
# "then the value declared on the database - it's not a " \
# "problem, but note this self may get overloaded", self,
# ram)
# self.info += 'only %d Mb of RAM;' % ram
#
# with transaction.atomic():
# self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock
#
# # update process and memory usage
# self.used_cores = int(psutil.cpu_percent())
# self.used_memory = int(psutil.virtual_memory().percent)
#
# # save current self state
# self.active = True
# self.update = False
# self.save()
#
#
# def terminate(self):
# '''Cleanly terminates a particular worker at the database
#
# .. note::
#
# This method does not destroy running or assigned processes that may
# be running or assigned to this worker. This is implemented in this
# way to allow for a clean replacement of the worker program w/o an
# interruption of the backend service.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# # disables worker, so no more splits can be assigned to it
# with transaction.atomic():
# self_ = Worker.objects.select_for_update().get(pk=self.pk)
# self_.active = False
# self_.used_cores = 0
# self_.used_memory = 0
# self_.info = 'Worker deactivated by system administrator'
# self_.save()
#
# # cancel job splits which should be cancelled anyways
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
# end_date__isnull=True, process_id__isnull=False):
# if psutil.pid_exists(j.process_id):
# os.kill(j.process_id, signal.SIGTERM)
#
# # cleans-up zombie processes that may linger
# _cleanup_zombies()
#
#
# def shutdown(self):
# '''Removes all running/assigned jobs from the queue, shuts down
#
# This method should be used with care as it may potentially cancel all
# assigned splits for the current worker.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# self.terminate()
#
# message = 'Cancelled on forced worker shutdown (maintenance)' \
# ' - you may retry submitting your experiment shortly'
#
# # cancel job splits which were not yet started
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.QUEUED,
# start_date__isnull=True, process_id__isnull=True):
# j.end(Result(status=1, usrerr=message))
#
# # cancel job splits which are running
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.PROCESSING,
# end_date__isnull=True, process_id__isnull=False):
# j._cancel()
#
#
#
# def work(self, environments, process):
# '''Launches user code on isolated processes
#
# This function is supposed to be called asynchronously, by a
# scheduled agent, every few seconds. It examines job splits assigned
# to the current host and launches an individual process to handle
# these splits. The process is started locally and the process ID
# stored with the split.
#
# Job split cancelling is executed by setting the split state as
# ``CANCEL`` and waiting for this function to handle it.
#
#
# Parameters:
#
# environments (dict): A dictionary containing installed
# environments, their description and execute-file paths.
#
# process (str): The path to the ``process.py`` program to use for
# running the user code on isolated processes.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# # refresh state from database and update state if required
# self.refresh_from_db()
# if self.update: self.update_state()
#
# # cancel job splits by killing associated processes
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
# end_date__isnull=True):
# if j.process_id is not None and psutil.pid_exists(j.process_id):
# os.kill(j.process_id, signal.SIGTERM)
# else: # process went away without any apparent reason
# with transaction.atomic():
# message = "Split %d/%d running at worker `%s' for " \
# "block `%s' of experiment `%s' finished before " \
# "even starting. Force-cancelling job split at " \
# "database..." % (j.split_index+1,
# j.job.block.required_slots,
# self,
# j.job.block.name,
# j.job.block.experiment.fullname(),
# )
# logger.error(message)
# j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR))
#
# # cmdline base argument
# cmdline = [process]
# if settings.DEBUG:
# cmdline += ['-vv']
# else:
# cmdline += ['-v']
#
# # start newly assigned job splits
# with transaction.atomic():
# splits = JobSplit.objects.select_for_update().filter(worker=self,
# status=JobSplit.QUEUED, start_date__isnull=True,
# process_id__isnull=True)
# for split in splits:
# # if we get to this point, then we launch the user process
# # -> see settings.WORKER_DETACH_CHILDREN for more info
# kwargs = dict()
# if settings.WORKER_DETACH_CHILDREN:
# kwargs['preexec_fn'] = os.setpgrp
# subprocess.Popen(cmdline + [str(split.pk)], **kwargs)
# split.status = JobSplit.PROCESSING #avoids re-running
# split.save()
#
# # cleans-up zombie processes that may linger
# _cleanup_zombies()
#
#
# def __enter__(self):
# self.update_state()
# return self
#
#
# def __exit__(self, *exc):
# self.terminate()
# return False #propagate exceptions
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
'''Scheduling functions and utilities'''
import logging
logger = logging.getLogger(__name__)
from django.db import transaction
from .models import Job, JobSplit, Queue, Worker
def _select_splits_for_queue(queue):
'''Returns a list of job splits that can run now, at a certain queue
Here is the work done:
1. Find the queue availability. This is a bit tricky as queues are only
allowed to consume a limited (configurable) number of slots in each
worker, per user
2. Calculate runnable job splits
3. TODO: Calculates the list of job splits that can potentially run now
(for which there is space in the current queue being analyzed), taking