Newer
Older
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import pytest
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("mednet")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from mednet.scripts.config import config
_check_help(config)
def test_config_list_help():
from mednet.scripts.config import list
from mednet.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert "module: mednet.config.data" in result.output
assert "module: mednet.config.models" in result.output
from mednet.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)
assert "module: mednet.config.data" in result.output
assert "module: mednet.config.models" in result.output
from mednet.scripts.config import describe
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
from mednet.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery DataModule for TB detection." in result.output
def test_database_help():
from mednet.scripts.database import database
_check_help(database)
from mednet.scripts.database import list
from mednet.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
from mednet.scripts.database import check
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from mednet.scripts.database import check
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
from mednet.scripts.cli import cli
from mednet.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from mednet.scripts.predict import predict
from mednet.scripts.evaluate import evaluate
from mednet.scripts.saliency.generate import generate
from mednet.scripts.saliency.completeness import completeness
from mednet.scripts.saliency.view import view
from mednet.scripts.saliency.evaluate import evaluate
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_train_pasa_montgomery(temporary_basedir):
from mednet.scripts.train import train
from mednet.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()

André Anjos
committed
with stdout_logging() as buf:
output_folder = temporary_basedir / "results"

André Anjos
committed
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",

André Anjos
committed
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

André Anjos
committed
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Applying DataModule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,

André Anjos
committed
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,

André Anjos
committed
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from mednet.scripts.train import train
from mednet.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)
output_folder = temporary_basedir / "results" / "pasa_checkpoint"
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={str(output_folder)}",
],
)
_assert_exit_0(result0)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert best.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# asserts checkpoints are there, or raises FileNotFoundError
last = _get_checkpoint_from_alias(output_folder, "periodic")
assert last.name.endswith("epoch=1" + CHECKPOINT_EXTENSION)
best = _get_checkpoint_from_alias(output_folder, "best")
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM usage: less | Disk I/O: more.$": 1,
r"^Applying DataModule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0 \(checkpoint file: .*$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from mednet.scripts.predict import predict
from mednet.utils.checkpointer import (
CHECKPOINT_EXTENSION,
_get_checkpoint_from_alias,
)

André Anjos
committed
runner = CliRunner()
with stdout_logging() as buf:
output = temporary_basedir / "predictions"
last = _get_checkpoint_from_alias(
temporary_basedir / "results", "periodic"
)
assert last.name.endswith("epoch=0" + CHECKPOINT_EXTENSION)

André Anjos
committed
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",
f"--weight={str(last)}",

Maxime DELITROZ
committed
f"--output={output}",

André Anjos
committed
],
)
_assert_exit_0(result)

Maxime DELITROZ
committed
assert os.path.exists(output)

André Anjos
committed
keywords = {
r"^Loading dataset: * without caching. Trade-off: CPU RAM usage: less | Disk I/O: more$": 3,

Maxime DELITROZ
committed
r"^Loading checkpoint from .*$": 1,

Maxime DELITROZ
committed
r"^Running prediction on `train` split...$": 1,
r"^Running prediction on `validation` split...$": 1,
r"^Running prediction on `test` split...$": 1,

André Anjos
committed
}

Maxime DELITROZ
committed

André Anjos
committed
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_evaluate_pasa_montgomery(temporary_basedir):
from mednet.scripts.evaluate import evaluate

André Anjos
committed
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",

Maxime DELITROZ
committed
f"--predictions={prediction_folder}",

André Anjos
committed
f"--output-folder={output_folder}",

André Anjos
committed
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "summary.rst"))

Maxime DELITROZ
committed
r"^Setting --threshold=.*$": 1,
r"^Analyzing split `train`...$": 1,
r"^Analyzing split `validation`...$": 1,
r"^Analyzing split `test`...$": 1,
r"^Saving measures at .*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (

André Anjos
committed
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
# This script does not work anymore, either fix or remove the script + this test
# def test_evaluatevis(temporary_basedir):
# import pandas as pd
# from mednet.scripts.evaluatevis import evaluatevis
# # Create a sample directory structure and CSV files
# input_folder = temporary_basedir / "camutils_cli" / "gradcam"
# input_folder.mkdir(parents=True, exist_ok=True)
# class1_dir = input_folder / "class1"
# class1_dir.mkdir(parents=True, exist_ok=True)
# class2_dir = input_folder / "class2"
# class2_dir.mkdir(parents=True, exist_ok=True)
# data = {
# "MoRF": [1, 2, 3],
# "LeRF": [2, 4, 6],
# "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
# "IoU": [1, 2, 3],
# "IoDA": [2, 4, 6],
# "propEnergy": [1.5, 3, 4.5],
# "ASF": [1, 2, 3],
# }
# df = pd.DataFrame(data)
# df.to_csv(class1_dir / "file1.csv", index=False)
# df.to_csv(class2_dir / "file1.csv", index=False)
# df.to_csv(class1_dir / "file2.csv", index=False)
# df.to_csv(class2_dir / "file2.csv", index=False)
# result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])
# assert result.exit_code == 0
# assert (input_folder / "file1_summary.csv").exists()
# assert (input_folder / "file2_summary.csv").exists()
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from mednet.scripts.predict import predict

André Anjos
committed
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )