diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py index 077b39ce93f71cb4f15deb570592ffe4316fcabf..45373e109c1003994036a7d727ebc713819fcc3e 100644 --- a/beat/web/backend/api.py +++ b/beat/web/backend/api.py @@ -25,224 +25,20 @@ # # ############################################################################### -from django.conf import settings -from django.db import transaction -from django.utils import six - from rest_framework.decorators import api_view, permission_classes from rest_framework.response import Response from rest_framework import permissions from ..common.permissions import IsSuperuser -from ..common.responses import BadRequestResponse -from .permissions import IsScheduler from ..utils.api import send_email_to_administrators from ..utils import scheduler as scheduler_api -from .models import Worker -from .models import Queue from .models import Environment from ..experiments.models import Experiment -from ..experiments.models import Block -from ..experiments.models import Result -from ..experiments.models import CachedFile - -from ..statistics.utils import updateStatistics - -import beat.core.stats -import beat.core.hash -import beat.core.experiment -import beat.core.toolchain -import beat.core.algorithm -import beat.core.data -import beat.core.utils import simplejson as json -from datetime import datetime -import os - -import logging -logger = logging.getLogger(__name__) - - -#---------------------------------------------------------- - - -@api_view(['GET']) -@permission_classes([IsSuperuser]) -def scheduler(request): - - # Determine if we want the latest known status or a fresh one - refresh = False - if request.GET.has_key('refresh'): - refresh = (request.GET['refresh'] == '1') - - if refresh: - timeout = request.GET.get('timeout', '120') #seconds - status_and_data = scheduler_api.getMessage( - '/state?refresh=1&timeout=%s' % timeout) - else: - status_and_data = scheduler_api.getMessage('/state?refresh=0') - - if status_and_data is None: - return Response('ERROR: Could not connect to the scheduler', status=500) - - status, data = status_and_data - if status == 500: - send_email_to_administrators('Scheduler internal error', data) - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - 'An administrator has been notified about this problem.' - return Response(error_message, status=status) - - elif status != 200: - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - ' REASON: %s' % data - return Response(error_message, status=status) - - - # Setup worker information - data = json.loads(data) - - workers = Worker.objects.all() - known_node_names = [k.name for k in workers] - active_node_names = [k.name for k in workers if k.active] - reported_names = data['workers'].keys() - - for name in reported_names: #for all nodes described by the scheduler - - if name not in known_node_names: - data['workers'][name]['db_status'] = 'Unknown' - data['workers'][name]['info'] = 'Worker not described' - else: - worker = Worker.objects.get(name=name) - data['workers'][name]['id'] = worker.id - - if name not in active_node_names: - data['workers'][name]['db_status'] = 'Inactive' - data['workers'][name]['info'] = 'Worker temporarily deactivated' - - elif data['workers'][name]['cores'] != worker.cores: - data['workers'][name]['db_status'] = 'Mismatch' - data['workers'][name]['info'] = \ - 'Number of cores differs (db: %d, scheduler: %d)' % \ - (data['workers'][name]['cores'], worker.cores) - else: - data['workers'][name]['db_status'] = 'Active' - data['workers'][name]['info'] = 'Worker is declared and active' - - # for nodes in the database, but not described by the scheduler - for name in [x for x in known_node_names if x not in reported_names]: - if name not in active_node_names: - data['workers'][name] = { - 'db_status': 'Inactive', - 'info': 'Worker not reported by the scheduler', - } - else: - data['workers'][name] = { - 'db_status': 'Active', - 'info': 'Worker not reported by the scheduler', - } - data['workers'][name]['id'] = Worker.objects.get(name=name).id - - # Amend the queue statistics - queues = Queue.objects.all() - known_queue_names = [k.name for k in queues] - reported_names = data['scheduler']['queues'].keys() - - for name in reported_names: - data['scheduler']['queues'][name]['status'] = 'Active' - if name not in known_queue_names: - data['scheduler']['queues'][name]['db_status'] = 'Missing' - data['scheduler']['queues'][name]['info'] = 'Queue is missing ' \ - 'on the database' - else: - data['scheduler']['queues'][name]['total-slots'] = \ - sum(data['scheduler']['queues'][name]['slots'].values()) - data['scheduler']['queues'][name]['db_status'] = 'Active' - data['scheduler']['queues'][name]['info'] = 'Queue is declared ' \ - 'and active' - db_queue = Queue.objects.get(name=name) - - slots = {} - for slot in db_queue.slots.iterator(): - if slot.worker.active: - slots[slot.worker.name] = slot.quantity - - environments = [] - for environment in db_queue.environments.filter(active=True): - environments.append({ - 'name': environment.name, - 'version': environment.version, - }) - - compare = { - 'memory-in-megabytes': db_queue.memory_limit, - 'time-limit-in-minutes': db_queue.time_limit, - 'nb-cores-per-slot': db_queue.cores_per_slot, - 'max-slots-per-user': db_queue.max_slots_per_user, - 'slots': slots, - 'environments': environments, - } - - sched = data['scheduler']['queues'][name] - for key in compare: - same = True - if isinstance(compare[key], list): - #compare environments - comp_key = set(['%s-%s' % (k['name'], k['version']) for k in compare[key]]) - sched_key = set(['%s-%s' % (k['name'], k['version']) for k in sched[key]]) - same = comp_key == sched_key - else: - same = compare[key] == sched[key] - if not same: - sched['db_status'] = 'Mismatch' - if 'info' in sched: - sched['info'] += '; mismatch at "%s" (%s != %s)' \ - % (key, compare[key], sched[key]) - else: - sched['info'] += 'Mismatch at "%s" (%s != %s)' \ - % (key, compare[key], sched[key]) - - - # attach existing database ids to the queues - for name in filter(lambda x: x in reported_names, known_queue_names): - db_queue = filter(lambda x: x.name == name, queues)[0] - data['scheduler']['queues'][name]['id'] = db_queue.id - - for name in filter(lambda x: x not in reported_names, known_queue_names): - db_queue = filter(lambda x: x.name == name, queues)[0] - - slots = {} - for slot in db_queue.slots.iterator(): - if slot.worker.active: - slots[slot.worker.name] = slot.quantity - - environments = [] - for environment in db_queue.environments.filter(active=True): - environments.append({ - 'name': environment.name, - 'version': environment.version, - }) - - data['scheduler']['queues'][name] = { - 'db_status': 'Inactive', - 'info': 'Queue described at database, but not reported by scheduler', - 'id': db_queue.id, - 'status': 'Active', - 'memory-in-megabytes': db_queue.memory_limit, - 'time-limit-in-minutes': db_queue.time_limit, - 'nb-cores-per-slot': db_queue.cores_per_slot, - 'max-slots-per-user': db_queue.max_slots_per_user, - 'slots': slots, - 'total-slots': db_queue.total_slots(), - 'environments': environments, - } - - - # Send the response - return Response(data) #---------------------------------------------------------- @@ -252,24 +48,8 @@ def scheduler(request): @permission_classes([IsSuperuser]) def cancel_all_experiments(request): - # Send the command to the Scheduler - status_and_data = scheduler_api.postMessage('/cancel-all-experiments') - - if status_and_data is None: - return Response('ERROR: Could not connect to the scheduler', status=500) - - status, data = status_and_data - if status == 500: - send_email_to_administrators('Scheduler internal error', data) - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - 'An administrator has been notified about this problem.\n\n' \ - ' REASON:\n%s' % data - return Response(error_message, status=status) - - elif status != 200: - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - ' REASON: %s' % data - return Response(error_message, status=status) + for q in Experiment.objects.filter(status__in=(Experiment.RUNNING, Experiment.SCHEDULED)): + q.cancel() return Response(status=200) @@ -277,380 +57,14 @@ def cancel_all_experiments(request): #---------------------------------------------------------- -@api_view(['POST']) -@permission_classes([IsSuperuser]) -def scheduler_configuration(request): - - configuration = {} - for queue in Queue.objects.all(): - configuration[queue.name] = queue.as_json() - - # Send the configuration to the Scheduler - status_and_data = scheduler_api.putMessage('/queue-configuration', data=configuration) - - if status_and_data is None: - return Response('ERROR: Could not connect to the scheduler', status=500) - - status, data = status_and_data - if status == 500: - send_email_to_administrators('Scheduler internal error', data) - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - 'An administrator has been notified about this problem.\n\n' \ - ' REASON:\n%s' % data - return Response(error_message, status=status) - - elif status != 200: - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - ' REASON: %s' % data - return Response(error_message, status=status) - - return Response(status=200) - - -#---------------------------------------------------------- - - -@api_view(['POST']) -@permission_classes([IsSuperuser]) -def cache_cleanup(request): - - data = request.data - - if data.has_key('olderthan'): - olderthan = data['olderthan'] - else: - olderthan = 0 - - - # Send the configuration to the Scheduler - params = { - 'olderthan': olderthan, - 'nolist': 0, - } - - status_and_data = scheduler_api.postMessage('/cache-cleanup', params=params) - if status_and_data is None: - return Response('ERROR: Could not connect to the scheduler', status=500) - - status, data = status_and_data - if status == 500: - send_email_to_administrators('Scheduler internal error', data) - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - 'An administrator has been notified about this problem.\n\n' \ - ' REASON:\n%s' % data - return Response(error_message, status=status) - - elif status != 200: - error_message = 'ERROR: The scheduler did not accept the request.\n' \ - ' REASON: %s' % data - return Response(error_message, status=status) - - - # Reset the DB representation of the cache - data = json.loads(data) - if len(data) > 0: - blocks = Block.objects.filter(outputs__hash__in=data) - for block in blocks: - block.status = Block.NOT_CACHED - block.save() - - return Response(data) - - -#---------------------------------------------------------- - - -@api_view(['PUT']) -@permission_classes([permissions.IsAuthenticated, IsScheduler]) -@transaction.atomic -def block_started(request): - - # Check the validity of the request - data = request.data - - if not(data.has_key('experiment-name')) or \ - not(isinstance(data['experiment-name'], six.string_types)): - return BadRequestResponse('ERROR: Experiment name not provided') - - if not(data.has_key('block-name')) or \ - not(isinstance(data['block-name'], six.string_types)): - return BadRequestResponse('ERROR: Block name not provided') - - - # Retrieve the experiment - try: - configuration_id = beat.core.experiment.Storage(settings.PREFIX, - data['experiment-name']) - toolchain_id = beat.core.toolchain.Storage(settings.PREFIX, - configuration_id.toolchain) - - experiment = Experiment.objects.select_for_update().get( - author__username=configuration_id.username, - toolchain__author__username=toolchain_id.username, - toolchain__name=toolchain_id.name, - toolchain__version=toolchain_id.version, - name=configuration_id.name, - ) - except Exception: - return Response(status=404) - - - if experiment.status == Experiment.PENDING: - return BadRequestResponse("ERROR: The experiment '%s' is still marked as 'pending' in the database" % data['experiment-name']) - elif experiment.status not in (Experiment.SCHEDULED, Experiment.RUNNING): - return BadRequestResponse("ERROR: The experiment '%s' is already marked as '%s' in the database" % (data['experiment-name'], experiment.get_status_display())) - - - # Retrieve the block - try: - block = experiment.blocks.get(name=data['block-name']) - except Exception: - return Response(status=404) - - - # Modify the status of the experiment (if necessary) - if experiment.status == Experiment.SCHEDULED: - experiment.start_date = datetime.now() - experiment.status = Experiment.RUNNING - experiment.save() - - - # Modify the status of the block - block.status = Block.PROCESSING - block.save() - - - # Update all the other similar not-cached blocks and associated scheduled - # experiments. Note we don't updated failed blocks or unscheduled - # experiments as not to reset experiments that have already been run. - similar_blocks = Block.objects.filter(outputs__in=block.outputs.all()).exclude(pk=block.pk).order_by('pk').distinct() - similar_blocks.filter(status=Block.NOT_CACHED).update(status=Block.PROCESSING) - Experiment.objects.filter(blocks__in=similar_blocks, status=Experiment.SCHEDULED).update(start_date=datetime.now(), status=Experiment.RUNNING) - - # Send the response - return Response(status=204) - - -#---------------------------------------------------------- - - -@api_view(['PUT']) -@permission_classes([permissions.IsAuthenticated, IsScheduler]) -@transaction.atomic -def block_finished(request): - # Check the validity of the request - data = request.data - - if not(data.has_key('experiment-name')) or \ - not(isinstance(data['experiment-name'], six.string_types)): - return BadRequestResponse('ERROR: Experiment name not provided') - - if not(data.has_key('block-name')) or \ - not(isinstance(data['block-name'], six.string_types)): - return BadRequestResponse('ERROR: Block name not provided') - - if not(data.has_key('state')) or \ - not(isinstance(data['state'], six.string_types)): - return BadRequestResponse('ERROR: Block state not provided') - - if not(data.has_key('outputs')) or \ - not(isinstance(data['outputs'], (list, tuple))): - return BadRequestResponse('ERROR: Block outputs not provided') - - # Alert system administrators if weird errors occur - if data.has_key('system-message') and \ - isinstance(data['system-message'], six.string_types) and \ - len(data['system-message'].strip()) != 0: - send_email_to_administrators('Worker environment error', data['system-message']) - - block_state = data['state'] - if block_state not in ['processed', 'failed', 'cancelled']: - return BadRequestResponse('ERROR: Invalid block state value: ' + block_state) - - # Retrieve the experiment, block all other experiment retrieval operations - try: - configuration_id = beat.core.experiment.Storage(settings.PREFIX, - data['experiment-name']) - toolchain_id = beat.core.toolchain.Storage(settings.PREFIX, - configuration_id.toolchain) - - experiment = Experiment.objects.select_for_update().get( - author__username=configuration_id.username, - toolchain__author__username=toolchain_id.username, - toolchain__name=toolchain_id.name, - toolchain__version=toolchain_id.version, - name=configuration_id.name, - ) - except Exception as e: - logger.error("Could not retrieve experiment '%s/%s/%s/%s/%s' from " \ - "database: %s", configuration_id.username, - toolchain_id.username, toolchain_id.name, toolchain_id.version, - configuration_id.name, str(e)) - return Response(status=404) - - - if experiment.status == Experiment.PENDING: - return BadRequestResponse("ERROR: The experiment '%s' is still marked as 'pending' in the database" % data['experiment-name']) - - - # Retrieve the block - try: - block = experiment.blocks.get(name=data['block-name']) - except Exception: - return Response(status=404) - - - if (block.status == Block.NOT_CACHED) and (block_state != 'processed'): - if block_state == 'cancelled' or block_state == 'failed': - pass - else: - return BadRequestResponse("ERROR: The block '%s' isn't marked as 'running' in the database" % data['block-name']) - elif block.status == Block.FAILED: - return BadRequestResponse("ERROR: The block '%s' is already marked as 'failed' in the database" % data['block-name']) - - - # Updates all sorts of statistics on these caches (typically only one) - statistics = None - if block_state != 'cancelled': - if not(data.has_key('statistics')): - return BadRequestResponse('ERROR: Invalid statistics') - - if data['statistics']: - statistics = beat.core.stats.Statistics(data['statistics']) - - update_info = dict() - if statistics is not None: - update_info.update(dict( - cpu_time = statistics.cpu['user'] + statistics.cpu['system'], - max_memory = statistics.memory['rss'], - data_read_size = statistics.data['volume']['read'], - data_read_nb_blocks = statistics.data['blocks']['read'], - data_read_time = statistics.data['time']['read'], - data_written_size = statistics.data['volume']['write'], - data_written_nb_blocks = statistics.data['blocks']['write'], - data_written_time = statistics.data['time']['write'], - )) - - if data.has_key('execution_info') and \ - (data['execution_info'] is not None): - - execution_infos = data['execution_info'] - - if execution_infos.has_key('linear_execution_time') and \ - (execution_infos['linear_execution_time'] is not None): - update_info['linear_execution_time'] = \ - execution_infos['linear_execution_time'] - - if execution_infos.has_key('speed_up_real') and \ - (execution_infos['speed_up_real'] is not None): - update_info['speed_up_real'] = execution_infos['speed_up_real'] - - if execution_infos.has_key('speed_up_maximal') and \ - (execution_infos['speed_up_maximal'] is not None): - update_info['speed_up_maximal'] = \ - execution_infos['speed_up_maximal'] - - if execution_infos.has_key('queuing_time') and \ - (execution_infos['queuing_time'] is not None): - update_info['queuing_time'] = execution_infos['queuing_time'] - - # Logged messages - if data.has_key('stdout') and isinstance(data['stdout'], six.string_types): - update_info['stdout'] = data['stdout'] - if data.has_key('stderr') and isinstance(data['stderr'], six.string_types): - update_info['stderr'] = data['stderr'] - if data.has_key('error-message') and \ - isinstance(data['error-message'], six.string_types): - update_info['error_message'] = data['error-message'] - - # Saves all cached files to the database - CachedFile.objects.filter(hash__in=data['outputs']).update(**update_info) - - # Save the results in the database (if applicable) - if block.analyzer and (block_state == 'processed'): - data_source = beat.core.data.CachedDataSource() - data_source.setup(os.path.join(settings.CACHE_ROOT, - beat.core.hash.toPath(block.outputs.all()[0].hash)), - settings.PREFIX) - output_data = data_source.next()[0] - if output_data is not None: - algorithm = beat.core.algorithm.Algorithm(settings.PREFIX, - block.algorithm.fullname()) - for field, value in output_data.as_dict().items(): - cached_file = block.first_cache() - result_entry, created = Result.objects.get_or_create( - name=field, - cache=cached_file, - ) - result_entry.primary = algorithm.results[field]['display'] - result_entry.type = algorithm.results[field]["type"] - - if result_entry.type in ['int32', 'float32', 'bool', 'string']: - result_entry.data_value = str(value) - else: - result_entry.data_value = json.dumps(value, indent=4, cls=beat.core.utils.NumpyJSONEncoder) - - result_entry.save() - - data_source.close() - - # Modify the status of the block - if block_state == 'processed': - block.status = Block.CACHED - block.save() - - if (experiment.status in [Experiment.SCHEDULED, Experiment.RUNNING]) \ - and \ - (experiment.blocks.filter(analyzer=True).exclude(status=Block.CACHED).count() == 0): - experiment.end_date = datetime.now() - experiment.status = Experiment.DONE - experiment.save() - elif experiment.status == Experiment.SCHEDULED: - experiment.start_date = datetime.now() - experiment.status = Experiment.RUNNING - experiment.save() - - elif block_state == 'failed': - block.status = Block.FAILED - block.save() - - if experiment.status != Experiment.FAILED: - experiment.end_date = datetime.now() - experiment.status = Experiment.FAILED - experiment.save() - - elif block_state == 'cancelled': - block.status=Block.NOT_CACHED - block.save() - - if experiment.status not in [Experiment.CANCELING, Experiment.FAILED]: - experiment.end_date = datetime.now() - experiment.status = Experiment.CANCELING - experiment.save() - - if experiment.status == Experiment.CANCELING: - if experiment.blocks.filter(status=Block.PROCESSING).count() == 0: - experiment.end_date = datetime.now() - experiment.status = Experiment.FAILED - experiment.save() - - # Update central statistics - if statistics: updateStatistics(statistics) - - # Send the response - return Response(status=204) - - -#---------------------------------------------------------- - - @api_view(['GET']) @permission_classes([permissions.AllowAny]) def accessible_environments_list(request): """Returns all accessible environments for a given user""" # Retrieve the list of environments - environments = Environment.objects.filter(active=True).order_by('name', 'version') + environments = Environment.objects.filter(active=True).order_by('name', + 'version') result = [] for environment in environments.iterator(): diff --git a/beat/web/backend/api_urls.py b/beat/web/backend/api_urls.py index 3e76dd472a521a21499effc41826a773b6dc1654..d7b2197695b90bd3d89efc8af8745206c5619c3c 100644 --- a/beat/web/backend/api_urls.py +++ b/beat/web/backend/api_urls.py @@ -29,11 +29,6 @@ from django.conf.urls import url from . import api urlpatterns = [ - url( - r'^scheduler/$', - api.scheduler, - name='backend-api-scheduler', - ), url( r'^cancel-all-experiments/$', @@ -41,30 +36,6 @@ urlpatterns = [ name='backend-api-cancel-all-experiments', ), - url( - r'^scheduler-configuration/$', - api.scheduler_configuration, - name='backend-api-scheduler-configuration', - ), - - url( - r'^cache-cleanup/$', - api.cache_cleanup, - name='backend-api-cache-cleanup', - ), - - url( - r'^block-started/$', - api.block_started, - name='backend-api-block-started', - ), - - url( - r'^block-finished/$', - api.block_finished, - name='backend-api-block-finished', - ), - url( r'^environments/$', api.accessible_environments_list, diff --git a/beat/web/backend/templatetags/backend_tags.py b/beat/web/backend/templatetags/backend_tags.py index dc44d65edc1f1967a1c19e491db25a915b384f58..3e94d1d45445968b4f117055a5774f905d28cfda 100644 --- a/beat/web/backend/templatetags/backend_tags.py +++ b/beat/web/backend/templatetags/backend_tags.py @@ -81,3 +81,14 @@ def environment_actions(context, object, display_count): def visible_queues(context, object): '''Calculates the visible queues for an environment and requestor''' return object.queues_for(context['request'].user) + + +@register.filter +def count_job_splits(xp, state=None): + """Returns job splits for an experiment in a certain state""" + return xp.job_splits(state=state).count() + + +#-------------------------------------------------- + + diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index dde215fb781e8091d51799cae49c4a8b4527dfae..b4dd5c7d96a4e61fc5a1f1bc13c478a3f521e25f 100644 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -37,7 +37,8 @@ from django.contrib.auth.decorators import login_required from django.http import HttpResponseForbidden from django.contrib import messages -from .models import Environment +from .models import Environment, Worker, Queue +from . import state #------------------------------------------------ @@ -45,45 +46,37 @@ from .models import Environment @login_required def scheduler(request): - if not(request.user.is_superuser): - return HttpResponseForbidden() - from .api import scheduler as scheduler_api - - response = scheduler_api(request) - - if response.status_code != 200: - messages.error(request, "Error contacting the scheduler at {}:{} " \ - "[{}/{}] {}".format(settings.SCHEDULER_ADDRESS, - settings.SCHEDULER_PORT, response.status_text, - response.status_code, response.data)) - return render_to_response('backend/scheduler.html', dict(), - context_instance=RequestContext(request)) + if not(request.user.is_superuser): return HttpResponseForbidden() # data for the cache plot + cache = state.cache() cache_chart_data = [ dict( - label= 'Used (%d%%)' % round(100*float(response.data['cache']['size-in-megabytes'])/response.data['cache']['capacity-in-megabytes']), - value= response.data['cache']['size-in-megabytes'], + label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['size-in-megabytes'], color= '#F7464A', highlight= '#FF5A5E', ), dict( - label= 'Free (%d%%)' % round(100*float(response.data['cache']['capacity-in-megabytes'] - response.data['cache']['size-in-megabytes'])/response.data['cache']['capacity-in-megabytes']), - value= response.data['cache']['capacity-in-megabytes'] - response.data['cache']['size-in-megabytes'], + label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], color= '#46BFBD', highlight= '#5AD3D1', ), ] - cache_gb = int(response.data['cache']['capacity-in-megabytes'] / 1024.0) + cache_gb = int(cache['capacity-in-megabytes'] / 1024.0) # Organize the data a bit response.data['workers'] = collections.OrderedDict(sorted([(k,v) for k,v in response.data['workers'].items()], key=lambda x: (not x[1].has_key('active'), x[1]['db_status']!='Active', x[0]))) return render_to_response('backend/scheduler.html', dict( - data=response.data, + jobs=state.jobs(), + experiments=state.experiments(), + workers=Worker.objects.all(), + queues=Queue.objects.all(), cache_chart_data=simplejson.dumps(cache_chart_data), cache_gb=cache_gb, ),