Newer
Older
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import pytest
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("ptbench")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from ptbench.scripts.config import config
_check_help(config)
def test_config_list_help():
from ptbench.scripts.config import list
_check_help(list)
def test_config_list():
from ptbench.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_list_v():
from ptbench.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_describe_help():
from ptbench.scripts.config import describe
_check_help(describe)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_config_describe_montgomery():
from ptbench.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery datamodule for TB detection." in result.output
def test_database_help():
from ptbench.scripts.database import database
_check_help(database)
from ptbench.scripts.database import list
from ptbench.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
from ptbench.scripts.database import check
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from ptbench.scripts.database import check
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
_assert_exit_0(result)
def test_main_help():
from ptbench.scripts.cli import cli
_check_help(cli)
def test_train_help():
from ptbench.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from ptbench.scripts.predict import predict
_check_help(predict)
def test_calculate_road_help():
from ptbench.scripts.calculate_road import calculate_road
_check_help(calculate_road)
def test_evaluate_saliencymaps_help():
from ptbench.scripts.evaluate_saliencymaps import evaluate_saliencymaps
_check_help(evaluate_saliencymaps)
def test_generate_saliencymaps_help():
from ptbench.scripts.generate_saliencymaps import generate_saliencymaps
_check_help(generate_saliencymaps)
def test_visualize_help():
from ptbench.scripts.visualize import visualize
_check_help(visualize)
def test_evaluate_help():
from ptbench.scripts.evaluate import evaluate
_check_help(evaluate)
def test_evaluatevis_help():
from ptbench.scripts.evaluatevis import evaluatevis
_check_help(evaluatevis)
def test_compare_vis_help():
from ptbench.scripts.compare_vis import compare_vis
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_train_pasa_montgomery(temporary_basedir):
from ptbench.scripts.train import train
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()

André Anjos
committed
with stdout_logging() as buf:
output_folder = temporary_basedir / "results"

André Anjos
committed
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",

André Anjos
committed
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

André Anjos
committed
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,

André Anjos
committed
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,

André Anjos
committed
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from ptbench.scripts.train import train
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)
output_folder = temporary_basedir / "results" / "pasa_checkpoint"
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",
],
)
_assert_exit_0(result0)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=1" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0...$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()
with stdout_logging() as buf:
output = temporary_basedir / "predictions"
last = _get_checkpoint_from_alias(
temporary_basedir / "results", "periodic"
)
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",
f"--weight={str(last)}",

Maxime DELITROZ
committed
f"--output={output}",

André Anjos
committed
],
)
_assert_exit_0(result)

Maxime DELITROZ
committed
assert os.path.exists(output)

André Anjos
committed
keywords = {

Maxime DELITROZ
committed
r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more$": 3,
r"^Loading checkpoint from .*$": 1,

Maxime DELITROZ
committed
r"^Running prediction on `train` split...$": 1,
r"^Running prediction on `validation` split...$": 1,
r"^Running prediction on `test` split...$": 1,

André Anjos
committed
}

Maxime DELITROZ
committed

André Anjos
committed
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_evaluate_pasa_montgomery(temporary_basedir):
from ptbench.scripts.evaluate import evaluate
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",

Maxime DELITROZ
committed
f"--predictions={prediction_folder}",

André Anjos
committed
f"--output-folder={output_folder}",

André Anjos
committed
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "summary.rst"))

Maxime DELITROZ
committed
r"^Setting --threshold=.*$": 1,
r"^Analyzing split `train`...$": 1,
r"^Analyzing split `validation`...$": 1,
r"^Analyzing split `test`...$": 1,
r"^Saving measures at .*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (

André Anjos
committed
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
def test_compare_vis(temporary_basedir, datadir):
from ptbench.scripts.compare_vis import compare_vis
runner = CliRunner()
input_dir = str(
datadir / "test_visualization_images" / "indirect-model" / "tbx11k"
)
result = runner.invoke(
compare_vis, ["-vv", "-i", str(input_dir), "-o", str(output_dir)]
)
assert result.exit_code == 0
# Check that the output image was created
assert (
temporary_basedir
/ "targeted_class"
/ "test"
/ "tb0004.png"
).exists()
assert (
temporary_basedir
/ "targeted_class"
/ "train"
/ "tb0005.png"
).exists()
# This script does not work anymore, either fix or remove the script + this test
# def test_evaluatevis(temporary_basedir):
# import pandas as pd
# # Create a sample directory structure and CSV files
# input_folder = temporary_basedir / "camutils_cli" / "gradcam"
# input_folder.mkdir(parents=True, exist_ok=True)
# class1_dir = input_folder / "class1"
# class1_dir.mkdir(parents=True, exist_ok=True)
# class2_dir = input_folder / "class2"
# class2_dir.mkdir(parents=True, exist_ok=True)
# data = {
# "MoRF": [1, 2, 3],
# "LeRF": [2, 4, 6],
# "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
# "IoU": [1, 2, 3],
# "IoDA": [2, 4, 6],
# "propEnergy": [1.5, 3, 4.5],
# "ASF": [1, 2, 3],
# }
# df = pd.DataFrame(data)
# df.to_csv(class1_dir / "file1.csv", index=False)
# df.to_csv(class2_dir / "file1.csv", index=False)
# df.to_csv(class1_dir / "file2.csv", index=False)
# df.to_csv(class2_dir / "file2.csv", index=False)
# result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])
# assert result.exit_code == 0
# assert (input_folder / "file1_summary.csv").exists()
# assert (input_folder / "file2_summary.csv").exists()
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from ptbench.scripts.predict import predict
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )