Commit 2b02eac5 authored by Philip ABBET's avatar Philip ABBET Committed by Samuel GAIST
Browse files

Update to beat.core 1.6.x and beat.backend.python 1.5.x

parent fcea6b29
......@@ -943,7 +943,7 @@ def load_results_from_cache(block, cached_file):
beat.core.hash.toPath(cached_file.hash)),
settings.PREFIX)
output_data = data_source.next()[0]
(output_data, start_index, end_index) = data_source[0]
if output_data is not None:
algorithm = beat.core.algorithm.Algorithm(
settings.PREFIX, block.algorithm.fullname())
......
......@@ -42,7 +42,7 @@ import socket
CORES = psutil.cpu_count()
RAM = psutil.virtual_memory().total/(1024*1024)
ENVIRONMENT = {'name': 'Python 2.7', 'version': '1.2.0'}
ENVIRONMENT = {'name': 'Python 2.7', 'version': '1.3.0'}
CXX_ENVIRONMENT = {'name': 'Cxx backend', 'version': '1.1.0'}
ENVKEY = '%(name)s (%(version)s)' % ENVIRONMENT
CXX_ENVKEY = '%(name)s (%(version)s)' % CXX_ENVIRONMENT
......
......@@ -125,8 +125,12 @@ class JobSplitManager(models.Manager):
# Load the list of indices for each inputs
indices = [ beat.core.data.load_data_index(settings.CACHE_ROOT, x['path'])
for x in inputs ]
indices = []
for input_cfg in inputs:
if 'database' in input_cfg:
indices.extend(beat.core.data.load_data_index_db(settings.CACHE_ROOT, input_cfg['path']))
else:
indices.append(beat.core.data.load_data_index(settings.CACHE_ROOT, input_cfg['path']))
# Attempt to split the indices
......
......@@ -45,6 +45,7 @@ from ..management.commands import qsetup
from beat.core.dataformat import DataFormat
from beat.core.data import CachedDataSink
from beat.core.database import Database
import beat.core.hash
import os
......@@ -62,7 +63,7 @@ ONE_QUEUE_TWO_WORKERS = {
"cores-per-slot": 1,
"max-slots-per-user": 2,
"environments": [
'Python 2.7 (1.2.0)'
'Python 2.7 (1.3.0)'
],
"slots": {
'node1': {
......@@ -90,12 +91,12 @@ ONE_QUEUE_TWO_WORKERS = {
}
},
"environments": {
"Python 2.7 (1.2.0)": {
"Python 2.7 (1.3.0)": {
"name": 'Python 2.7',
"version": '1.2.0',
"version": '1.3.0',
"short_description": "Test",
"description": "Test environment",
"languages": "python",
"languages": ["python"],
},
},
}
......@@ -185,7 +186,11 @@ class BackendUtilitiesMixin(object):
for index, split in enumerate(splits):
sink = CachedDataSink()
sink.setup(path, dataformat, process_id=index)
start_data_index = split[0][0] if isinstance(split[0], tuple) else split[0]
end_data_index = split[-1][1] if isinstance(split[-1], tuple) else split[-1]
sink.setup(path, dataformat, start_data_index, end_data_index)
for indices in split:
if not isinstance(indices, tuple):
......@@ -196,8 +201,8 @@ class BackendUtilitiesMixin(object):
end = indices[1]
sink.write({
'value': value,
},
'value': value,
},
start_data_index = start,
end_data_index = end
)
......@@ -207,6 +212,17 @@ class BackendUtilitiesMixin(object):
sink.close()
def prepare_databases(self, configuration):
for _, cfg in configuration['datasets'].items():
path = beat.core.hash.toPath(beat.core.hash.hashDataset(
cfg['database'], cfg['protocol'], cfg['set']), suffix='.db')
if not os.path.exists(os.path.join(settings.CACHE_ROOT, path)):
database = Database(settings.PREFIX, cfg['database'])
view = database.view(cfg['protocol'], cfg['set'])
view.index(os.path.join(settings.CACHE_ROOT, path))
#----------------------------------------------------------
......
......@@ -663,6 +663,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
b0 = xp.blocks.all()[0]
schedule_experiment(xp)
......@@ -670,10 +672,6 @@ class SplitNewJobsTest(BaseBackendTestCase):
self.assertEqual(Job.objects.count(), 2)
self.assertEqual(JobSplit.objects.count(), 0)
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
])
split_new_jobs()
self.assertEqual(JobSplit.objects.count(), 2)
......@@ -698,6 +696,9 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
......@@ -729,6 +730,9 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
......@@ -737,10 +741,6 @@ class SplitNewJobsTest(BaseBackendTestCase):
b0 = xp1.blocks.all()[0]
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
])
split_new_jobs()
self.assertEqual(JobSplit.objects.count(), 3)
......@@ -768,10 +768,12 @@ class SplitNewJobsTest(BaseBackendTestCase):
def test_one_experiment_two_uneven_slots(self):
fullname = 'user/user/single/1/single_split_2'
fullname = 'user/user/duo/1/split_2'
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
b0 = xp.blocks.all()[0]
schedule_experiment(xp)
......@@ -779,10 +781,6 @@ class SplitNewJobsTest(BaseBackendTestCase):
self.assertEqual(Job.objects.count(), 2)
self.assertEqual(JobSplit.objects.count(), 0)
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[(0, 2), (3, 5), (6, 8)]
])
split_new_jobs()
self.assertEqual(JobSplit.objects.count(), 2)
......@@ -796,6 +794,9 @@ class SplitNewJobsTest(BaseBackendTestCase):
split1 = b0.job.splits.all()[0]
split2 = b0.job.splits.all()[1]
print split2.start_index
print split2.end_index
self.check_split(split1, split_index=0, start_index=0, end_index=5)
self.check_split(split2, split_index=1, start_index=6, end_index=8)
......@@ -805,6 +806,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
b0 = xp.blocks.all()[0]
schedule_experiment(xp)
......@@ -812,27 +815,27 @@ class SplitNewJobsTest(BaseBackendTestCase):
self.assertEqual(Job.objects.count(), 2)
self.assertEqual(JobSplit.objects.count(), 0)
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[(0, 2), (3, 5), (6, 8)]
])
split_new_jobs()
self.assertEqual(JobSplit.objects.count(), 3)
self.assertEqual(JobSplit.objects.count(), 5)
xp.refresh_from_db()
b0 = xp.blocks.all()[0]
self.assertEqual(b0.job.splits.count(), 3)
self.assertEqual(b0.job.splits.count(), 5)
split1 = b0.job.splits.all()[0]
split2 = b0.job.splits.all()[1]
split3 = b0.job.splits.all()[2]
split4 = b0.job.splits.all()[3]
split5 = b0.job.splits.all()[4]
self.check_split(split1, split_index=0, start_index=0, end_index=2)
self.check_split(split2, split_index=1, start_index=3, end_index=5)
self.check_split(split3, split_index=2, start_index=6, end_index=8)
self.check_split(split1, split_index=0, start_index=0, end_index=0)
self.check_split(split2, split_index=1, start_index=1, end_index=1)
self.check_split(split3, split_index=2, start_index=2, end_index=2)
self.check_split(split4, split_index=3, start_index=3, end_index=3)
self.check_split(split5, split_index=4, start_index=4, end_index=4)
def test_similar_experiment_after_assignation(self):
......@@ -844,6 +847,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname.split('/')[-1])
xp2 = xp1.fork(name='single_fork')
self.prepare_databases(xp1.declaration)
schedule_experiment(xp1)
split_new_jobs()
......@@ -882,6 +887,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname.split('/')[-1])
xp2 = xp1.fork(name='single_fork')
self.prepare_databases(xp1.declaration)
schedule_experiment(xp1)
split_new_jobs()
......@@ -921,6 +928,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
self.assertEqual(Job.objects.count(), 2)
......@@ -962,6 +971,8 @@ class SplitNewJobsTest(BaseBackendTestCase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
self.assertEqual(Job.objects.count(), 2)
......@@ -1013,6 +1024,8 @@ class AssignSplitsToWorkersTest(BaseBackendTestCase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
split_new_jobs()
......@@ -1040,6 +1053,9 @@ class AssignSplitsToWorkersTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
split_new_jobs()
......@@ -1068,6 +1084,9 @@ class AssignSplitsToWorkersTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
split_new_jobs()
......@@ -1105,6 +1124,9 @@ class AssignSplitsToWorkersTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
split_new_jobs()
......@@ -1132,6 +1154,9 @@ class AssignSplitsToWorkersTest(BaseBackendTestCase):
xp1 = Experiment.objects.get(name=fullname.split('/')[-1])
xp2 = xp1.fork(name='single_fork')
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
split_new_jobs()
......@@ -1180,14 +1205,12 @@ class GetConfigurationForSplitTest(BaseBackendTestCase):
def prepare_experiment(self, name):
xp = Experiment.objects.get(name=name.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
b0 = xp.blocks.all()[0]
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
])
split_new_jobs()
xp.refresh_from_db()
......@@ -1277,15 +1300,13 @@ class SplitHelpersBaseTest(BaseBackendTestCase):
def prepare_experiment(self, name, generate_cache=True):
xp = Experiment.objects.get(name=name.split('/')[-1])
if generate_cache:
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
b0 = xp.blocks.all()[0]
if generate_cache:
self.generate_cached_files(b0.inputs.all()[0].database.hash, [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
])
split_new_jobs()
xp.refresh_from_db()
......@@ -1717,6 +1738,7 @@ class OnSplitStartedTest(SplitHelpersBaseTest):
def test_one_split(self):
xp = self.prepare_experiment('user/user/single/1/single')
assigned_splits = assign_splits_to_workers()
split = assigned_splits[0]
......@@ -1730,6 +1752,7 @@ class OnSplitStartedTest(SplitHelpersBaseTest):
def test_two_splits(self):
xp = self.prepare_experiment('user/user/single/1/single_split_2')
assigned_splits = assign_splits_to_workers()
split0 = assigned_splits[0]
......
......@@ -93,7 +93,7 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def start_scheduler(self):
(pid, self.scheduler_thread) = start_scheduler(settings_module='beat.web.settings.test',
interval=1, address='127.0.0.1',
port=52000)
port=50800)
def stop_scheduler(self):
......@@ -105,7 +105,7 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def start_worker(self, name):
(pid, worker_thread) = start_worker(name, settings.PREFIX, settings.CACHE_ROOT,
'tcp://127.0.0.1:52000')
'tcp://127.0.0.1:50800')
self.worker_threads[name] = worker_thread
......@@ -258,6 +258,8 @@ class TestExecution(TestSchedulerBase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
......@@ -280,6 +282,8 @@ class TestExecution(TestSchedulerBase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
......@@ -304,6 +308,9 @@ class TestExecution(TestSchedulerBase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
......@@ -351,6 +358,9 @@ class TestExecution(TestSchedulerBase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
......@@ -374,6 +384,9 @@ class TestExecution(TestSchedulerBase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
xp1.refresh_from_db()
......@@ -398,6 +411,8 @@ class TestExecution(TestSchedulerBase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
......@@ -418,6 +433,8 @@ class TestExecution(TestSchedulerBase):
xp = Experiment.objects.get(name=fullname.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
......@@ -442,6 +459,9 @@ class TestExecution(TestSchedulerBase):
xp1 = Experiment.objects.get(name=fullname1.split('/')[-1])
xp2 = Experiment.objects.get(name=fullname2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
......@@ -490,6 +510,8 @@ class TestCancellation(TestSchedulerBase):
def process(self, experiment_name, block_name=None):
xp = Experiment.objects.get(name=experiment_name.split('/')[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
......@@ -523,6 +545,9 @@ class TestCancellation(TestSchedulerBase):
xp1 = Experiment.objects.get(name=experiment_name1.split('/')[-1])
xp2 = Experiment.objects.get(name=experiment_name2.split('/')[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
......
......@@ -52,7 +52,7 @@ QUEUES_WITHOUT_PRIORITY = {
"time-limit": 180, #3 hours
"cores-per-slot": 1,
"max-slots-per-user": 4,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -69,7 +69,7 @@ QUEUES_WITHOUT_PRIORITY = {
"time-limit": 360, #6 hours
"cores-per-slot": 2,
"max-slots-per-user": 2,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -86,7 +86,7 @@ QUEUES_WITHOUT_PRIORITY = {
"time-limit": 720, #12 hours
"cores-per-slot": 4,
"max-slots-per-user": 1,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -106,9 +106,9 @@ QUEUES_WITHOUT_PRIORITY = {
}
},
"environments": {
'Python 2.7 (1.2.0)': {
'Python 2.7 (1.3.0)': {
"name": 'Python 2.7',
"version": '1.2.0',
"version": '1.3.0',
"short_description": "Test",
"description": "Test environment",
"languages": "python",
......@@ -124,7 +124,7 @@ PRIORITY_QUEUES = {
"time-limit": 180, #3 hours
"cores-per-slot": 1,
"max-slots-per-user": 2,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -145,7 +145,7 @@ PRIORITY_QUEUES = {
"time-limit": 360, #6 hours
"cores-per-slot": 2,
"max-slots-per-user": 1,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -166,7 +166,7 @@ PRIORITY_QUEUES = {
"time-limit": 180, #3 hours
"cores-per-slot": 1,
"max-slots-per-user": 8,
"environments": ['Python 2.7 (1.2.0)'],
"environments": ['Python 2.7 (1.3.0)'],
"groups": [
"Default",
],
......@@ -196,9 +196,9 @@ PRIORITY_QUEUES = {
)
]),
"environments": {
'Python 2.7 (1.2.0)': {
'Python 2.7 (1.3.0)': {
"name": 'Python 2.7',
"version": '1.2.0',
"version": '1.3.0',
"short_description": "Test",
"description": "Test environment",
"languages": "python",
......@@ -303,7 +303,7 @@ class BackendSetup(BaseBackendTestCase):
env = q1.environments.first()
self.assertEqual(env.name, 'Python 2.7')
self.assertEqual(env.version, '1.2.0')
self.assertEqual(env.version, '1.3.0')
self.assertEqual(q1.slots.count(), 1)
self.assertEqual(q2.slots.count(), 1)
......@@ -412,7 +412,7 @@ class BackendSetup(BaseBackendTestCase):
env = q1.environments.first()
self.assertEqual(env.name, 'Python 2.7')
self.assertEqual(env.version, '1.2.0')
self.assertEqual(env.version, '1.3.0')
self.assertEqual(q1.slots.count(), 2)
self.assertEqual(q1_special.slots.count(), 2)
......
......@@ -277,8 +277,8 @@ class DatabaseSetOutputInline(admin.TabularInline):
model = DatabaseSetOutputModel
extra = 0
ordering = ('hash',)
readonly_fields = ('hash', 'template')
ordering = ('template__name',)
readonly_fields = ('template',)
def has_delete_permission(self, request, obj=None):
return False
......@@ -289,13 +289,13 @@ class DatabaseSetOutputInline(admin.TabularInline):
class DatabaseSet(admin.ModelAdmin):
list_display = ('id', 'protocol', 'name', 'template')
list_display = ('id', 'protocol', 'name', 'template', 'hash')
search_fields = ['name',
'template__name',
'protocol__database__name',
'protocol__name']
list_display_links = ('id', 'name')
readonly_fields = ('name', 'template', 'protocol')
readonly_fields = ('name', 'template', 'protocol', 'hash')
inlines = [
DatabaseSetOutputInline,
......
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2018-01-25 09:06
from __future__ import unicode_literals
from django.db import migrations, models
from beat.backend.python.hash import hashDataset
def compute_hashes(apps, schema_editor):
'''Refreshes each database so datasets/outputs are recreated'''
DatabaseSet = apps.get_model("databases", "DatabaseSet")