Newer
Older
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import pytest
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("ptbench")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from ptbench.scripts.config import config
_check_help(config)
def test_config_list_help():
from ptbench.scripts.config import list
_check_help(list)
def test_config_list():
from ptbench.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_list_v():
from ptbench.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_describe_help():
from ptbench.scripts.config import describe
_check_help(describe)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_config_describe_montgomery():
from ptbench.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery datamodule for TB detection." in result.output
def test_database_help():
from ptbench.scripts.database import database
_check_help(database)
from ptbench.scripts.database import list
from ptbench.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
from ptbench.scripts.database import check
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from ptbench.scripts.database import check
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
_assert_exit_0(result)
def test_main_help():
from ptbench.scripts.cli import cli
_check_help(cli)
def test_train_help():
from ptbench.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from ptbench.scripts.predict import predict
_check_help(predict)
def test_evaluate_help():
from ptbench.scripts.evaluate import evaluate
def test_saliency_generate_help():
from ptbench.scripts.saliency.generate import generate
def test_saliency_view_help():
from ptbench.scripts.saliency.view import view
def test_saliency_evaluate_help():
from ptbench.scripts.saliency.evaluate import evaluate
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_train_pasa_montgomery(temporary_basedir):
from ptbench.scripts.train import train
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()

André Anjos
committed
with stdout_logging() as buf:
output_folder = temporary_basedir / "results"

André Anjos
committed
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",

André Anjos
committed
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

André Anjos
committed
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,

André Anjos
committed
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,

André Anjos
committed
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from ptbench.scripts.train import train
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)
output_folder = temporary_basedir / "results" / "pasa_checkpoint"
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",
],
)
_assert_exit_0(result0)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=1" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0 \(checkpoint file: .*$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
from ptbench.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()
with stdout_logging() as buf:
output = temporary_basedir / "predictions"
last = _get_checkpoint_from_alias(
temporary_basedir / "results", "periodic"
)
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",
f"--weight={str(last)}",

Maxime DELITROZ
committed
f"--output={output}",

André Anjos
committed
],
)
_assert_exit_0(result)

Maxime DELITROZ
committed
assert os.path.exists(output)

André Anjos
committed
keywords = {
r"^Loading dataset: * without caching. Trade-off: CPU RAM usage: less | Disk I/O: more$": 3,

Maxime DELITROZ
committed
r"^Loading checkpoint from .*$": 1,

Maxime DELITROZ
committed
r"^Running prediction on `train` split...$": 1,
r"^Running prediction on `validation` split...$": 1,
r"^Running prediction on `test` split...$": 1,

André Anjos
committed
}

Maxime DELITROZ
committed

André Anjos
committed
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_evaluate_pasa_montgomery(temporary_basedir):
from ptbench.scripts.evaluate import evaluate
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",

Maxime DELITROZ
committed
f"--predictions={prediction_folder}",

André Anjos
committed
f"--output-folder={output_folder}",

André Anjos
committed
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "summary.rst"))

Maxime DELITROZ
committed
r"^Setting --threshold=.*$": 1,
r"^Analyzing split `train`...$": 1,
r"^Analyzing split `validation`...$": 1,
r"^Analyzing split `test`...$": 1,
r"^Saving measures at .*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (

André Anjos
committed
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
# This script does not work anymore, either fix or remove the script + this test
# def test_evaluatevis(temporary_basedir):
# import pandas as pd
# # Create a sample directory structure and CSV files
# input_folder = temporary_basedir / "camutils_cli" / "gradcam"
# input_folder.mkdir(parents=True, exist_ok=True)
# class1_dir = input_folder / "class1"
# class1_dir.mkdir(parents=True, exist_ok=True)
# class2_dir = input_folder / "class2"
# class2_dir.mkdir(parents=True, exist_ok=True)
# data = {
# "MoRF": [1, 2, 3],
# "LeRF": [2, 4, 6],
# "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
# "IoU": [1, 2, 3],
# "IoDA": [2, 4, 6],
# "propEnergy": [1.5, 3, 4.5],
# "ASF": [1, 2, 3],
# }
# df = pd.DataFrame(data)
# df.to_csv(class1_dir / "file1.csv", index=False)
# df.to_csv(class2_dir / "file1.csv", index=False)
# df.to_csv(class1_dir / "file2.csv", index=False)
# df.to_csv(class2_dir / "file2.csv", index=False)
# result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])
# assert result.exit_code == 0
# assert (input_folder / "file1_summary.csv").exists()
# assert (input_folder / "file2_summary.csv").exists()
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from ptbench.scripts.predict import predict
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )