Skip to content
Snippets Groups Projects
test_cli.py 16.8 KiB
Newer Older
André Anjos's avatar
André Anjos committed
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""

import contextlib
import glob
André Anjos's avatar
André Anjos committed
import os
import re

André Anjos's avatar
André Anjos committed
from click.testing import CliRunner


@contextlib.contextmanager
def stdout_logging():
    # copy logging messages to std out

    import io
    import logging

    buf = io.StringIO()
    ch = logging.StreamHandler(buf)
    ch.setFormatter(logging.Formatter("%(message)s"))
    ch.setLevel(logging.INFO)
    logger = logging.getLogger("ptbench")
    logger.addHandler(ch)
    yield buf
    logger.removeHandler(ch)


def _assert_exit_0(result):
    assert (
        result.exit_code == 0
    ), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"


def _check_help(entry_point):
    runner = CliRunner()
    result = runner.invoke(entry_point, ["--help"])
    _assert_exit_0(result)
    assert result.output.startswith("Usage:")


def test_config_help():
    from ptbench.scripts.config import config

    _check_help(config)


def test_config_list_help():
    from ptbench.scripts.config import list

    _check_help(list)


def test_config_list():
    from ptbench.scripts.config import list

    runner = CliRunner()
    result = runner.invoke(list)
    _assert_exit_0(result)
    assert "module: ptbench.config.data" in result.output
    assert "module: ptbench.config.models" in result.output
André Anjos's avatar
André Anjos committed


def test_config_list_v():
    from ptbench.scripts.config import list

    result = CliRunner().invoke(list, ["--verbose"])
    _assert_exit_0(result)
    assert "module: ptbench.config.data" in result.output
    assert "module: ptbench.config.models" in result.output
André Anjos's avatar
André Anjos committed


def test_config_describe_help():
    from ptbench.scripts.config import describe

    _check_help(describe)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
André Anjos's avatar
André Anjos committed
def test_config_describe_montgomery():
    from ptbench.scripts.config import describe

    runner = CliRunner()
    result = runner.invoke(describe, ["montgomery"])
    _assert_exit_0(result)
    assert "Montgomery datamodule for TB detection." in result.output
André Anjos's avatar
André Anjos committed


def test_database_help():
    from ptbench.scripts.database import database
André Anjos's avatar
André Anjos committed

André Anjos's avatar
André Anjos committed


def test_datamodule_list_help():
    from ptbench.scripts.database import list
André Anjos's avatar
André Anjos committed

    _check_help(list)


def test_datamodule_list():
    from ptbench.scripts.database import list
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(list)
    _assert_exit_0(result)
    assert result.output.startswith("Available databases:")
def test_datamodule_check_help():
    from ptbench.scripts.database import check
André Anjos's avatar
André Anjos committed

    _check_help(check)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
    from ptbench.scripts.database import check
André Anjos's avatar
André Anjos committed

    runner = CliRunner()
    result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
André Anjos's avatar
André Anjos committed
    _assert_exit_0(result)


def test_main_help():
    from ptbench.scripts.cli import cli

    _check_help(cli)


def test_train_help():
    from ptbench.scripts.train import train

    _check_help(train)


def _str_counter(substr, s):
    return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))


def test_predict_help():
    from ptbench.scripts.predict import predict

    _check_help(predict)


def test_visualize_help():
    from ptbench.scripts.visualize import visualize

    _check_help(visualize)


André Anjos's avatar
André Anjos committed
def test_evaluate_help():
    from ptbench.scripts.evaluate import evaluate

    _check_help(evaluate)


def test_evaluatevis_help():
    from ptbench.scripts.evaluatevis import evaluatevis

    _check_help(evaluatevis)


def test_comparevis_help():
    from ptbench.scripts.comparevis import comparevis

    _check_help(comparevis)


@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery(temporary_basedir):
    from ptbench.scripts.train import train
André Anjos's avatar
André Anjos committed

André Anjos's avatar
André Anjos committed

    with stdout_logging() as buf:
        output_folder = str(temporary_basedir / "results")
        result = runner.invoke(
            train,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--epochs=1",
                "--batch-size=1",
                f"--output-folder={output_folder}",
            ],
        )
        _assert_exit_0(result)
André Anjos's avatar
André Anjos committed

Daniel CARRON's avatar
Daniel CARRON committed
            os.path.join(output_folder, "model_final_epoch.ckpt")
Daniel CARRON's avatar
Daniel CARRON committed
            os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
        )
        assert os.path.exists(os.path.join(output_folder, "constants.csv"))
        assert (
            len(
                glob.glob(
                    os.path.join(output_folder, "logs", "events.out.tfevents.*")
                )
            )
            == 1
Daniel CARRON's avatar
Daniel CARRON committed
        )
        assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
            r"^Writing command-line for reproduction at .*$": 1,
            r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
            r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
            r"^Applying datamodule train sampler balancing...$": 1,
            r"^Balancing samples from dataset using metadata targets `label`$": 1,
            r"^Training for at most 1 epochs.$": 1,
            r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,
            r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
            r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
André Anjos's avatar
André Anjos committed
            )

@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
    from ptbench.scripts.train import train

    runner = CliRunner()

    output_folder = str(temporary_basedir / "results/pasa_checkpoint")
    result0 = runner.invoke(
        train,
        [
            "pasa",
            "montgomery",
            "-vv",
            "--epochs=1",
            "--batch-size=1",
            f"--output-folder={output_folder}",
        ],
    )
    _assert_exit_0(result0)

Daniel CARRON's avatar
Daniel CARRON committed
    assert os.path.exists(os.path.join(output_folder, "model_final_epoch.ckpt"))
    assert os.path.exists(
Daniel CARRON's avatar
Daniel CARRON committed
        os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
    )
    assert os.path.exists(os.path.join(output_folder, "constants.csv"))
    assert (
        len(
            glob.glob(
                os.path.join(output_folder, "logs", "events.out.tfevents.*")
            )
        )
        == 1
Daniel CARRON's avatar
Daniel CARRON committed
    )
    assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

    with stdout_logging() as buf:
        result = runner.invoke(
            train,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--epochs=2",
                "--batch-size=1",
                f"--output-folder={output_folder}",
            ],
        )
        _assert_exit_0(result)

        assert os.path.exists(
Daniel CARRON's avatar
Daniel CARRON committed
            os.path.join(output_folder, "model_final_epoch.ckpt")
        )
        assert os.path.exists(
Daniel CARRON's avatar
Daniel CARRON committed
            os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
        )
        assert os.path.exists(os.path.join(output_folder, "constants.csv"))

        assert (
            len(
                glob.glob(
                    os.path.join(output_folder, "logs", "events.out.tfevents.*")
                )
            )
            == 2
Daniel CARRON's avatar
Daniel CARRON committed
        )
        assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
            r"^Writing command-line for reproduction at .*$": 1,
            r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
            r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
            r"^Applying datamodule train sampler balancing...$": 1,
            r"^Balancing samples from dataset using metadata targets `label`$": 1,
            r"^Training for at most 2 epochs.$": 1,
            r"^Resuming from epoch 0...$": 1,
            r"^Saving model summary at.*$": 1,
            r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
            r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
            r"^Restoring normalizer from checkpoint.$": 1,
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
            )


# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predict_pasa_montgomery(temporary_basedir, datadir):
    from ptbench.scripts.predict import predict

    runner = CliRunner()

    with stdout_logging() as buf:
        output = str(temporary_basedir / "predictions")
        result = runner.invoke(
            predict,
            [
                "pasa",
                "montgomery",
                "-vv",
                "--batch-size=1",
                f"--weight={str(temporary_basedir / 'results' / 'model_final_epoch.ckpt')}",
                f"--output={output}",
            r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more$": 3,
            r"^Loading checkpoint from .*$": 1,
            r"^Restoring normalizer from checkpoint.$": 1,
            r"^Running prediction on `train` split...$": 1,
            r"^Running prediction on `validation` split...$": 1,
            r"^Running prediction on `test` split...$": 1,
            r"^Predictions saved to .*$": 1,
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
André Anjos's avatar
André Anjos committed
            )
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_evaluate_pasa_montgomery(temporary_basedir):
    from ptbench.scripts.evaluate import evaluate

    runner = CliRunner()

    with stdout_logging() as buf:
        prediction_folder = str(temporary_basedir / "predictions")
        output_folder = str(temporary_basedir / "evaluations")
        result = runner.invoke(
            evaluate,
            [
                "-vv",
                "montgomery",
                "--threshold=test",
        assert os.path.exists(os.path.join(output_folder, "plots.pdf"))
        assert os.path.exists(os.path.join(output_folder, "summary.rst"))
André Anjos's avatar
André Anjos committed

        keywords = {
            r"^Setting --threshold=.*$": 1,
            r"^Analyzing split `train`...$": 1,
            r"^Analyzing split `validation`...$": 1,
            r"^Analyzing split `test`...$": 1,
            r"^Saving measures at .*$": 1,
            r"^Saving figures at .*$": 1,
André Anjos's avatar
André Anjos committed
        }
        buf.seek(0)
        logging_output = buf.read()

        for k, v in keywords.items():
            assert _str_counter(k, logging_output) == v, (
                f"Count for string '{k}' appeared "
                f"({_str_counter(k, logging_output)}) "
                f"instead of the expected {v}:\nOutput:\n{logging_output}"
            )
def test_comparevis(temporary_basedir, datadir):
    from ptbench.scripts.comparevis import comparevis

    runner = CliRunner()

    input_dir = str(
        datadir / "test_visualization_images" / "indirect-model" / "tbx11k"
    )
    output_dir = str(temporary_basedir / "comparevis")

    result = runner.invoke(
        comparevis, ["-vv", "-i", str(input_dir), "-o", str(output_dir)]
    )

    assert result.exit_code == 0

    # Check that the output image was created
    assert (
        temporary_basedir
        / "comparevis"
        / "targeted_class"
        / "test"
        / "tb0004.png"
    ).exists()
    assert (
        temporary_basedir
        / "comparevis"
        / "targeted_class"
        / "train"
        / "tb0005.png"
    ).exists()


def test_evaluatevis(temporary_basedir):
    import pandas as pd

    from ptbench.scripts.evaluatevis import evaluatevis

    runner = CliRunner()

    # Create a sample directory structure and CSV files
    input_folder = temporary_basedir / "camutils_cli" / "gradcam"
    input_folder.mkdir(parents=True, exist_ok=True)
    class1_dir = input_folder / "class1"
    class1_dir.mkdir(parents=True, exist_ok=True)
    class2_dir = input_folder / "class2"
    class2_dir.mkdir(parents=True, exist_ok=True)

    data = {
        "MoRF": [1, 2, 3],
        "LeRF": [2, 4, 6],
        "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
        "IoU": [1, 2, 3],
        "IoDA": [2, 4, 6],
        "propEnergy": [1.5, 3, 4.5],
        "ASF": [1, 2, 3],
    }
    df = pd.DataFrame(data)
    df.to_csv(class1_dir / "file1.csv", index=False)
    df.to_csv(class2_dir / "file1.csv", index=False)
    df.to_csv(class1_dir / "file2.csv", index=False)
    df.to_csv(class2_dir / "file2.csv", index=False)

    result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])

    assert result.exit_code == 0

    assert (input_folder / "file1_summary.csv").exists()
    assert (input_folder / "file2_summary.csv").exists()


André Anjos's avatar
André Anjos committed
# Not enough RAM available to do this test
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):

#    from ptbench.scripts.predict import predict

#    runner = CliRunner()

#    with stdout_logging() as buf:

#        output_folder = str(temporary_basedir / "predictions")
#        result = runner.invoke(
#            predict,
#            [
#                "densenet_rs",
#                "montgomery_f0_rgb",
#                "-vv",
#                "--batch-size=1",
#                f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
#                f"--output-folder={output_folder}",
#                "--grad-cams"
#            ],
#        )
#        _assert_exit_0(result)

#        # check predictions are there
#        predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
#        predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
#        predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
#        assert os.path.exists(predictions_file1)
#        assert os.path.exists(predictions_file2)
#        assert os.path.exists(predictions_file3)
#        # check some grad cams are there
#        cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
#        cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
#        cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
#        cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
#        cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
#        cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
#        cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
#        cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
#        cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
#        assert os.path.exists(cam1)
#        assert os.path.exists(cam2)
#        assert os.path.exists(cam3)
#        assert os.path.exists(cam4)
#        assert os.path.exists(cam5)
#        assert os.path.exists(cam6)
#        assert os.path.exists(cam7)
#        assert os.path.exists(cam8)
#        assert os.path.exists(cam9)

#        keywords = {
#            r"^Loading checkpoint from.*$": 1,
#            r"^Total time:.*$": 3,
#            r"^Grad cams folder:.*$": 3,
#        }
#        buf.seek(0)
#        logging_output = buf.read()

#        for k, v in keywords.items():
#            assert _str_counter(k, logging_output) == v, (
#                f"Count for string '{k}' appeared "
#                f"({_str_counter(k, logging_output)}) "
#                f"instead of the expected {v}:\nOutput:\n{logging_output}"
#            )