test_execution.py 12.2 KB
Newer Older
André Anjos's avatar
André Anjos committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.core module of the BEAT platform.             #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


# Tests for experiment execution

import os
import glob
import logging
logger = logging.getLogger(__name__)

import numpy
37
import unittest
André Anjos's avatar
André Anjos committed
38 39

from ..experiment import Experiment
40
from ..execution import DockerExecutor
41
from ..execution import LocalExecutor
André Anjos's avatar
André Anjos committed
42 43
from ..hash import hashFileContents
from ..data import CachedDataSource
44
from ..dock import Host
André Anjos's avatar
André Anjos committed
45 46

from . import prefix, tmp_prefix
47
from .utils import cleanup
48
from .utils import slow
49 50 51 52


class TestExecution(unittest.TestCase):

Philip ABBET's avatar
Philip ABBET committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    def check_output(self, prefix, path):
        '''Checks if a given output exists, together with its indexes and checksums
        '''

        finalpath = os.path.join(prefix, path)
        datafiles = glob.glob(finalpath + '*.data')
        datachksums = glob.glob(finalpath + '*.data.checksum')
        indexfiles = glob.glob(finalpath + '*.index')
        indexchksums = glob.glob(finalpath + '*.index.checksum')
        assert datafiles
        self.assertEqual(len(datafiles), len(indexfiles))
        for k in datafiles + indexfiles:
            checksum_file = k + '.checksum'
            assert checksum_file in datachksums + indexchksums
            stored_checksum = None
            with open(checksum_file, 'rt') as f: stored_checksum = f.read().strip()
            current_checksum = hashFileContents(k)
            self.assertEqual(current_checksum, stored_checksum)


    def load_result(self, executor):
        '''Loads the result of an experiment, in a single go'''

        f = CachedDataSource()
        assert f.setup(os.path.join(executor.cache,
            executor.data['result']['path'] + '.data'), executor.prefix)
        data, start, end = f.next()
        self.assertEqual(start, 0)
        self.assertEqual(start, end)
        f.close()
        return data


    def execute(self, label, expected_result):
        """Executes the full experiment, block after block, returning results. If an
        error occurs, returns information about the err'd block. Otherwise, returns
        ``None``.

        This bit of code mimics the scheduler, but does everything on the local
        machine. It borrows some code from the package ``beat.cmdline``.
        """

        dataformat_cache = {}
        database_cache = {}
        algorithm_cache = {}

        experiment = Experiment(prefix, label,
                dataformat_cache, database_cache, algorithm_cache)

        assert experiment.valid, '\n  * %s' % '\n  * '.join(experiment.errors)

        scheduled = experiment.setup()

        # can we execute it?
        results = []
        for key, value in scheduled.items():
            executor = self.create_executor(prefix, value['configuration'], tmp_prefix,
                                            dataformat_cache, database_cache, algorithm_cache)
            assert executor.valid, '\n  * %s' % '\n  * '.join(executor.errors)

            with executor:
                result = executor.process(timeout_in_minutes=3)
                assert result
                assert 'status' in result
                assert 'stdout' in result
                assert 'stderr' in result
                assert 'timed_out' in result
                assert 'system_error' in result
                assert 'user_error' in result
                if result['status'] != 0:
                    logger.warn("(eventual) system errors: %s", result['system_error'])
                    logger.warn("(eventual) user errors: %s", result['user_error'])
                    logger.warn("stdout: %s", result['stdout'])
                    logger.warn("stderr: %s", result['stderr'])
                    return result
                if result['system_error']:
                    logger.warn("system errors: %s", result['system_error'])
                    return result
                assert result['status'] == 0

                if result.has_key('statistics'):
                    assert isinstance(result['statistics'], dict)

            if executor.analysis:
                self.check_output(tmp_prefix, executor.data['result']['path'])
                results.append(self.load_result(executor))
            else:
                for name, details in executor.data['outputs'].items():
                    self.check_output(tmp_prefix, details['path'])

        # compares all results
        assert results
        for k, result in enumerate(results):
            expected = result.__class__()
            expected.from_dict(expected_result[k], casting='unsafe') #defaults=False
            assert result.isclose(expected), "%r is not close enough to %r" % (result.as_dict(), expected.as_dict())


    @slow
    def test_integers_addition_1(self):
        assert self.execute('user/user/integers_addition/1/integers_addition',
                [{'sum': 12272, 'nb': 10}]) is None

    @slow
    def test_integers_addition_2(self):
        assert self.execute('user/user/integers_addition/2/integers_addition',
                [{'sum': 18392, 'nb': 10}]) is None

    @slow
    def test_single_1_single(self):
        assert self.execute('user/user/single/1/single', [{'out_data': 42}]) is None

    @slow
    def test_single_1_add(self):
        assert self.execute('user/user/single/1/single_add', [{'out_data': 43}]) is None

    @slow
    def test_single_1_add2(self):
        assert self.execute('user/user/single/1/single_add2', [{'out_data': 44}]) is None

    @slow
    def test_single_1_error(self):
        result = self.execute('user/user/single/1/single_error', [None])
176
        assert result
Philip ABBET's avatar
Philip ABBET committed
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
        self.assertEqual(result['status'], 1)
        assert result['user_error']
        assert 'NameError' in result['user_error']
        self.assertEqual(result['system_error'], '')

    @slow
    def test_single_1_crash(self):
        result = self.execute('user/user/single/1/single_crash', [None])
        assert result
        self.assertEqual(result['status'], 1)
        assert result['user_error']
        assert 'NameError' in result['user_error']
        self.assertEqual(result['system_error'], '')

    @slow
    def test_single_1_db_crash_done(self):
        result = self.execute('user/user/single/1/single_db_crash_done', [None])
        assert result
        self.assertTrue(result['status'] != 0)
        assert result['user_error']
        assert 'a = b' in result['user_error']
        self.assertEqual(result['system_error'], '')

    @slow
    def test_single_1_db_crash_next(self):
        result = self.execute('user/user/single/1/single_db_crash_next', [None])
        assert result
        self.assertTrue(result['status'] != 0)
        assert result['user_error']
        assert 'a = b' in result['user_error']
        self.assertEqual(result['system_error'], '')

    @slow
    def test_single_1_large(self):
        assert self.execute('user/user/single/1/single_large', [{'out_data': 2.0}]) is None

    @slow
    def test_double_1(self):
        assert self.execute('user/user/double/1/double', [{'out_data': 42}]) is None

    @slow
    def test_triangle_1(self):
        assert self.execute('user/user/triangle/1/triangle', [{'out_data': 42}]) is None

    @slow
    def test_too_many_nexts(self):
        result = self.execute('user/user/triangle/1/too_many_nexts', [None])
        assert result
        self.assertTrue(result['status'] != 0)
        assert result['user_error']
        assert 'no more data' in result['user_error']

    @slow
    def test_double_triangle_1(self):
        assert self.execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None

    @slow
    def test_inputs_mix_1(self):
        assert self.execute('user/user/inputs_mix/1/test', [{'sum': 12272, 'nb': 10}]) is None

    @slow
    def test_inputs_mix_2(self):
        assert self.execute('user/user/inputs_mix/2/test', [{'sum': 13529, 'nb': 10}]) is None

    @slow
    def test_inputs_mix_3(self):
        assert self.execute('user/user/inputs_mix/3/test', [{'sum': 19523, 'nb': 10}]) is None

    @slow
    def test_inputs_mix_3b(self):
        assert self.execute('user/user/inputs_mix/3/test2', [{'sum': 19533, 'nb': 10}]) is None

    @slow
    def test_inputs_mix_4(self):
        assert self.execute('user/user/inputs_mix/4/test', [{'sum': 25150, 'nb': 10}]) is None

    @slow
    def test_inputs_mix_4b(self):
        assert self.execute('user/user/inputs_mix/4/test2', [{'sum': 25170, 'nb': 10}]) is None

    @slow
    def test_integers_labelled_1(self):
        assert self.execute('user/user/integers_labelled/1/test', [
            {
                'nb_data_units': 3,
                'indices': '0 - 4\n5 - 9\n10 - 14\n'
            }
        ]) is None

    # For benchmark purposes
    # def test_double_1_large(self):
    #   import time
    #   start = time.time()
    #   assert self.execute('user/user/double/1/large', [{'out_data': 49489830}]) is None
    #   print time.time() - start

    # For benchmark purposes
    # def test_double_1_large2(self):
    #   import time
    #   start = time.time()
    #   assert self.execute('user/user/double/1/large2', [{'out_data': 21513820}]) is None
    #   print time.time() - start
279

280 281


282 283
class TestDockerExecution(TestExecution):

Philip ABBET's avatar
Philip ABBET committed
284 285 286
    @classmethod
    def setUpClass(cls):
        cls.host = Host(raise_on_errors=False)
287 288


Philip ABBET's avatar
Philip ABBET committed
289 290 291 292
    @classmethod
    def tearDownClass(cls):
        cls.host.teardown()
        cleanup()
293

294

Philip ABBET's avatar
Philip ABBET committed
295 296 297
    def setUp(self):
        super(TestDockerExecution, self).setUp()
        self.proxy_mode = True
298 299


Philip ABBET's avatar
Philip ABBET committed
300 301 302
    def tearDown(self):
        super(TestDockerExecution, self).tearDown()
        self.host.teardown()
303 304


Philip ABBET's avatar
Philip ABBET committed
305 306 307 308 309
    def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
                        database_cache, algorithm_cache):
        return DockerExecutor(self.host, prefix, configuration, tmp_prefix,
                              dataformat_cache, database_cache, algorithm_cache,
                              proxy_mode=self.proxy_mode)
310 311


Philip ABBET's avatar
Philip ABBET committed
312 313 314
    @slow
    def test_cxx_double_1(self):
        assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None
315 316 317 318 319 320




class TestDockerExecutionNoProxy(TestDockerExecution):

Philip ABBET's avatar
Philip ABBET committed
321 322 323
    def setUp(self):
        super(TestDockerExecutionNoProxy, self).setUp()
        self.proxy_mode = False
324 325 326 327 328



class TestLocalExecution(TestExecution):

Philip ABBET's avatar
Philip ABBET committed
329 330 331 332
    def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
                        database_cache, algorithm_cache):
        return LocalExecutor(prefix, configuration, tmp_prefix,
                             dataformat_cache, database_cache, algorithm_cache)
333 334 335 336 337



# Don't run tests from the base class
del TestExecution