Skip to content
Snippets Groups Projects
test_cli.py 17 KiB
Newer Older
André Anjos's avatar
André Anjos committed
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""

import contextlib
import glob
André Anjos's avatar
André Anjos committed
import os
import re

André Anjos's avatar
André Anjos committed
from click.testing import CliRunner


@contextlib.contextmanager
def stdout_logging():
    # copy logging messages to std out

    import io
    import logging

    buf = io.StringIO()
    ch = logging.StreamHandler(buf)
    ch.setFormatter(logging.Formatter("%(message)s"))
    ch.setLevel(logging.INFO)
    logger = logging.getLogger("mednet")
André Anjos's avatar
André Anjos committed
    logger.addHandler(ch)
    yield buf
    logger.removeHandler(ch)


def _assert_exit_0(result):
    assert (
        result.exit_code == 0
    ), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"


def _check_help(entry_point):
    runner = CliRunner()
    result = runner.invoke(entry_point, ["--help"])
    _assert_exit_0(result)
    assert result.output.startswith("Usage:")


def test_config_help():
    from mednet.scripts.config import config
André Anjos's avatar
André Anjos committed

    _check_help(config)


def test_config_list_help():
    from mednet.scripts.config import list
André Anjos's avatar
André Anjos committed

    _check_help(list)


def test_config_list():
    from mednet.scripts.config import list
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(list)
    _assert_exit_0(result)
    assert "module: mednet.config.data" in result.output
    assert "module: mednet.config.models" in result.output
André Anjos's avatar
André Anjos committed


def test_config_list_v():
    from mednet.scripts.config import list
André Anjos's avatar
André Anjos committed

    result = CliRunner().invoke(list, ["--verbose"])
    _assert_exit_0(result)
    assert "module: mednet.config.data" in result.output
    assert "module: mednet.config.models" in result.output
André Anjos's avatar
André Anjos committed


def test_config_describe_help():
    from mednet.scripts.config import describe
André Anjos's avatar
André Anjos committed

    _check_help(describe)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
André Anjos's avatar
André Anjos committed
def test_config_describe_montgomery():
    from mednet.scripts.config import describe
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(describe, ["montgomery"])
    _assert_exit_0(result)
    assert "Montgomery DataModule for TB detection." in result.output
André Anjos's avatar
André Anjos committed


    from mednet.scripts.database import database
André Anjos's avatar
André Anjos committed

André Anjos's avatar
André Anjos committed


def test_datamodule_list_help():
    from mednet.scripts.database import list
André Anjos's avatar
André Anjos committed

    _check_help(list)


def test_datamodule_list():
    from mednet.scripts.database import list
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(list)
    _assert_exit_0(result)
    assert result.output.startswith("Available databases:")
def test_datamodule_check_help():
    from mednet.scripts.database import check
André Anjos's avatar
André Anjos committed

    _check_help(check)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
    from mednet.scripts.database import check
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
André Anjos's avatar
André Anjos committed
    _assert_exit_0(result)


def test_main_help():
    from mednet.scripts.cli import cli
André Anjos's avatar
André Anjos committed

    _check_help(cli)


def test_train_help():
    from mednet.scripts.train import train
André Anjos's avatar
André Anjos committed

    _check_help(train)


def _str_counter(substr, s):
    return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))


def test_predict_help():
    from mednet.scripts.predict import predict
André Anjos's avatar
André Anjos committed

    _check_help(predict)


def test_evaluate_help():
    from mednet.scripts.evaluate import evaluate
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed

    _check_help(evaluate)
def test_saliency_generate_help():
    from mednet.scripts.saliency.generate import generate
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed

    _check_help(generate)
def test_saliency_completeness_help():
    from mednet.scripts.saliency.completeness import completeness

    _check_help(completeness)


def test_saliency_view_help():
    from mednet.scripts.saliency.view import view
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed

    _check_help(view)
def test_saliency_evaluate_help():
    from mednet.scripts.saliency.evaluate import evaluate
André Anjos's avatar
André Anjos committed

    _check_help(evaluate)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery(temporary_basedir):
    from mednet.scripts.train import train
    from mednet.utils.checkpointer import (
        CHECKPOINT_EXTENSION,
        _get_checkpoint_from_alias,
    )
André Anjos's avatar
André Anjos committed

André Anjos's avatar
André Anjos committed

        output_folder = temporary_basedir / "results"
        result = runner.invoke(
            train,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--epochs=1",
                "--batch-size=1",
                f"--output-folder={str(output_folder)}",
André Anjos's avatar
André Anjos committed

        # asserts checkpoints are there, or raises FileNotFoundError
        last = _get_checkpoint_from_alias(output_folder, "periodic")
        assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
        best = _get_checkpoint_from_alias(output_folder, "best")
        assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

        assert os.path.exists(os.path.join(output_folder, "constants.csv"))
        assert (
            len(
                glob.glob(
                    os.path.join(output_folder, "logs", "events.out.tfevents.*")
                )
            )
            == 1
Daniel CARRON's avatar
Daniel CARRON committed
        )
        assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
            r"^Writing command-line for reproduction at .*$": 1,
            r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
            r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
Daniel CARRON's avatar
Daniel CARRON committed
            r"^Applying DataModule train sampler balancing...$": 1,
            r"^Balancing samples from dataset using metadata targets `label`$": 1,
            r"^Training for at most 1 epochs.$": 1,
            r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,
            r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
            r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
André Anjos's avatar
André Anjos committed
            )

@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
    from mednet.scripts.train import train
    from mednet.utils.checkpointer import (
        CHECKPOINT_EXTENSION,
        _get_checkpoint_from_alias,
    )
    output_folder = temporary_basedir / "results" / "pasa_checkpoint"
    result0 = runner.invoke(
        train,
        [
            "pasa",
            "montgomery",
            "-vv",
            "--epochs=1",
            "--batch-size=1",
            f"--output-folder={str(output_folder)}",
        ],
    )
    _assert_exit_0(result0)

    # asserts checkpoints are there, or raises FileNotFoundError
    last = _get_checkpoint_from_alias(output_folder, "periodic")
    assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
    best = _get_checkpoint_from_alias(output_folder, "best")
    assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

    assert os.path.exists(os.path.join(output_folder, "constants.csv"))
    assert (
        len(
            glob.glob(
                os.path.join(output_folder, "logs", "events.out.tfevents.*")
            )
        )
        == 1
Daniel CARRON's avatar
Daniel CARRON committed
    )
    assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

    with stdout_logging() as buf:
        result = runner.invoke(
            train,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--epochs=2",
                "--batch-size=1",
                f"--output-folder={output_folder}",
            ],
        )
        _assert_exit_0(result)

        # asserts checkpoints are there, or raises FileNotFoundError
        last = _get_checkpoint_from_alias(output_folder, "periodic")
        assert last.name.endswith("epoch=1" + CHECKPOINT_EXTENSION)
        best = _get_checkpoint_from_alias(output_folder, "best")

        assert os.path.exists(os.path.join(output_folder, "constants.csv"))

        assert (
            len(
                glob.glob(
                    os.path.join(output_folder, "logs", "events.out.tfevents.*")
                )
            )
            == 2
Daniel CARRON's avatar
Daniel CARRON committed
        )
        assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
            r"^Writing command-line for reproduction at .*$": 1,
            r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
            r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
Daniel CARRON's avatar
Daniel CARRON committed
            r"^Applying DataModule train sampler balancing...$": 1,
            r"^Balancing samples from dataset using metadata targets `label`$": 1,
            r"^Training for at most 2 epochs.$": 1,
            r"^Resuming from epoch 0 \(checkpoint file: .*$": 1,
            r"^Saving model summary at.*$": 1,
            r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
            r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
            r"^Restoring normalizer from checkpoint.$": 1,
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
            )


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predict_pasa_montgomery(temporary_basedir, datadir):
    from mednet.scripts.predict import predict
    from mednet.utils.checkpointer import (
        CHECKPOINT_EXTENSION,
        _get_checkpoint_from_alias,
    )
        output = temporary_basedir / "predictions"
        last = _get_checkpoint_from_alias(
            temporary_basedir / "results", "periodic"
        )
        assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
        result = runner.invoke(
            predict,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--batch-size=1",
            r"^Loading dataset: * without caching. Trade-off: CPU RAM usage: less | Disk I/O: more$": 3,
            r"^Restoring normalizer from checkpoint.$": 1,
            r"^Running prediction on `train` split...$": 1,
            r"^Running prediction on `validation` split...$": 1,
            r"^Running prediction on `test` split...$": 1,
            r"^Predictions saved to .*$": 1,
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
André Anjos's avatar
André Anjos committed
            )
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_evaluate_pasa_montgomery(temporary_basedir):
    from mednet.scripts.evaluate import evaluate

    runner = CliRunner()

    with stdout_logging() as buf:
        prediction_folder = str(temporary_basedir / "predictions")
        output_folder = str(temporary_basedir / "evaluations")
        result = runner.invoke(
            evaluate,
            [
                "-vv",
                "montgomery",
                "--threshold=test",
        assert os.path.exists(os.path.join(output_folder, "plots.pdf"))
        assert os.path.exists(os.path.join(output_folder, "summary.rst"))
André Anjos's avatar
André Anjos committed

        keywords = {
            r"^Setting --threshold=.*$": 1,
            r"^Analyzing split `train`...$": 1,
            r"^Analyzing split `validation`...$": 1,
            r"^Analyzing split `test`...$": 1,
            r"^Saving measures at .*$": 1,
            r"^Saving figures at .*$": 1,
André Anjos's avatar
André Anjos committed
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
            )
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed
# This script does not work anymore, either fix or remove the script + this test
# def test_evaluatevis(temporary_basedir):
#     import pandas as pd
#     from mednet.scripts.evaluatevis import evaluatevis
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed
#     runner = CliRunner()
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed
#     # Create a sample directory structure and CSV files
#     input_folder = temporary_basedir / "camutils_cli" / "gradcam"
#     input_folder.mkdir(parents=True, exist_ok=True)
#     class1_dir = input_folder / "class1"
#     class1_dir.mkdir(parents=True, exist_ok=True)
#     class2_dir = input_folder / "class2"
#     class2_dir.mkdir(parents=True, exist_ok=True)
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed
#     data = {
#         "MoRF": [1, 2, 3],
#         "LeRF": [2, 4, 6],
#         "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
#         "IoU": [1, 2, 3],
#         "IoDA": [2, 4, 6],
#         "propEnergy": [1.5, 3, 4.5],
#         "ASF": [1, 2, 3],
#     }
#     df = pd.DataFrame(data)
#     df.to_csv(class1_dir / "file1.csv", index=False)
#     df.to_csv(class2_dir / "file1.csv", index=False)
#     df.to_csv(class1_dir / "file2.csv", index=False)
#     df.to_csv(class2_dir / "file2.csv", index=False)

#     result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])

#     assert result.exit_code == 0
ogueler@idiap.ch's avatar
ogueler@idiap.ch committed
#     assert (input_folder / "file1_summary.csv").exists()
#     assert (input_folder / "file2_summary.csv").exists()
André Anjos's avatar
André Anjos committed
# Not enough RAM available to do this test
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):

#    from mednet.scripts.predict import predict

#    runner = CliRunner()

#    with stdout_logging() as buf:

#        output_folder = str(temporary_basedir / "predictions")
#        result = runner.invoke(
#            predict,
#            [
#                "densenet_rs",
#                "montgomery_f0_rgb",
#                "-vv",
#                "--batch-size=1",
#                f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
#                f"--output-folder={output_folder}",
#                "--grad-cams"
#            ],
#        )
#        _assert_exit_0(result)

#        # check predictions are there
#        predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
#        predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
#        predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
#        assert os.path.exists(predictions_file1)
#        assert os.path.exists(predictions_file2)
#        assert os.path.exists(predictions_file3)
#        # check some grad cams are there
#        cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
#        cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
#        cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
#        cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
#        cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
#        cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
#        cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
#        cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
#        cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
#        assert os.path.exists(cam1)
#        assert os.path.exists(cam2)
#        assert os.path.exists(cam3)
#        assert os.path.exists(cam4)
#        assert os.path.exists(cam5)
#        assert os.path.exists(cam6)
#        assert os.path.exists(cam7)
#        assert os.path.exists(cam8)
#        assert os.path.exists(cam9)

#        keywords = {
#            r"^Loading checkpoint from.*$": 1,
#            r"^Total time:.*$": 3,
#            r"^Grad cams folder:.*$": 3,
#        }
#        buf.seek(0)
#        logging_output = buf.read()

#        for k, v in keywords.items():
#            assert _str_counter(k, logging_output) == v, (
#                f"Count for string '{k}' appeared "
#                f"({_str_counter(k, logging_output)}) "
#                f"instead of the expected {v}:\nOutput:\n{logging_output}"
#            )