-
André Anjos authoredAndré Anjos authored
test_cli.py 26.16 KiB
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import glob
import os
import re
import pytest
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("ptbench")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from ptbench.scripts.config import config
_check_help(config)
def test_config_list_help():
from ptbench.scripts.config import list
_check_help(list)
def test_config_list():
from ptbench.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_list_v():
from ptbench.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_describe_help():
from ptbench.scripts.config import describe
_check_help(describe)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_config_describe_montgomery():
from ptbench.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery datamodule for TB detection." in result.output
def test_database_help():
from ptbench.scripts.database import database
_check_help(database)
def test_datamodule_list_help():
from ptbench.scripts.database import list
_check_help(list)
def test_datamodule_list():
from ptbench.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
def test_datamodule_check_help():
from ptbench.scripts.database import check
_check_help(check)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from ptbench.scripts.database import check
runner = CliRunner()
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
_assert_exit_0(result)
def test_main_help():
from ptbench.scripts.cli import cli
_check_help(cli)
def test_train_help():
from ptbench.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from ptbench.scripts.predict import predict
_check_help(predict)
def test_predtojson_help():
from ptbench.scripts.predtojson import predtojson
_check_help(predtojson)
def test_aggregpred_help():
from ptbench.scripts.aggregpred import aggregpred
_check_help(aggregpred)
def test_evaluate_help():
from ptbench.scripts.evaluate import evaluate
_check_help(evaluate)
def test_compare_help():
from ptbench.scripts.compare import compare
_check_help(compare)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery(temporary_basedir):
from ptbench.scripts.train import train
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "results")
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_final_epoch.ckpt")
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1
)
assert os.path.exists(os.path.join(output_folder, "model_summary.txt"))
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from ptbench.scripts.train import train
runner = CliRunner()
output_folder = str(temporary_basedir / "results/pasa_checkpoint")
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result0)
assert os.path.exists(os.path.join(output_folder, "model_final_epoch.ckpt"))
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1
)
assert os.path.exists(os.path.join(output_folder, "model_summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_final_epoch.ckpt")
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2
)
assert os.path.exists(os.path.join(output_folder, "model_summary.txt"))
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0...$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "predictions")
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",
f"--weight={str(datadir / 'lfs' / 'models' / 'pasa.ckpt')}",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# check predictions are there
train_predictions_file = os.path.join(output_folder, "train.csv")
validation_predictions_file = os.path.join(
output_folder, "validation.csv"
)
test_predictions_file = os.path.join(output_folder, "test.csv")
assert os.path.exists(train_predictions_file)
assert os.path.exists(validation_predictions_file)
assert os.path.exists(test_predictions_file)
keywords = {
r"^Restoring normalizer from checkpoint.$": 1,
r"^Output folder: .*$": 1,
r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more": 3,
r"^Saving predictions in .*$": 3,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predtojson(datadir, temporary_basedir):
from ptbench.scripts.predtojson import predtojson
runner = CliRunner()
with stdout_logging() as buf:
predictions = str(datadir / "test_predictions.csv")
output_folder = str(temporary_basedir / "pred_to_json")
result = runner.invoke(
predtojson,
[
"-vv",
"train",
f"{predictions}",
"test",
f"{predictions}",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# check json file is there
assert os.path.exists(os.path.join(output_folder, "dataset.json"))
keywords = {
f"Output folder: {output_folder}": 1,
r"Saving JSON file...": 1,
r"^Loading predictions from.*$": 2,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_evaluate_pasa_montgomery(temporary_basedir):
from ptbench.scripts.evaluate import evaluate
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",
f"--predictions-folder={prediction_folder}",
f"--output-folder={output_folder}",
"--threshold=test",
"--steps=2000",
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "scores.pdf"))
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))
assert os.path.exists(os.path.join(output_folder, "table.txt"))
keywords = {
r"^Evaluating threshold on.*$": 1,
r"^Maximum F1-score of.*$": 4,
r"^Set --f1_threshold=.*$": 1,
r"^Set --eer_threshold=.*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_compare_pasa_montgomery(temporary_basedir):
from ptbench.scripts.compare import compare
runner = CliRunner()
with stdout_logging() as buf:
predictions_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "comparisons")
result = runner.invoke(
compare,
[
"-vv",
"train",
f"{predictions_folder}/train/predictions.csv",
"test",
f"{predictions_folder}/test/predictions.csv",
f"--output-figure={output_folder}/compare.pdf",
f"--output-table={output_folder}/table.txt",
"--threshold=0.5",
],
)
_assert_exit_0(result)
# check comparisons are there
assert os.path.exists(os.path.join(output_folder, "compare.pdf"))
assert os.path.exists(os.path.join(output_folder, "table.txt"))
keywords = {
r"^Dataset '\*': threshold =.*$": 1,
r"^Loading predictions from.*$": 2,
r"^Tabulating performance summary...": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_mlp_montgomery_rs(temporary_basedir, datadir):
from ptbench.scripts.train import train
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "results/mlp")
result = runner.invoke(
train,
[
"mlp",
"montgomery_rs",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_final_epoch.ckpt")
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert os.path.exists(
os.path.join(output_folder, "logs_csv", "version_0", "metrics.csv")
)
assert os.path.exists(
os.path.join(output_folder, "logs_tensorboard", "version_0")
)
assert os.path.exists(os.path.join(output_folder, "model_summary.txt"))
keywords = {
r"^Found \(dedicated\) '__train__' set for training$": 1,
r"^Found \(dedicated\) '__valid__' set for validation$": 1,
r"^Continuing from epoch 0$": 1,
r"^Saving model summary at.*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predict_mlp_montgomery_rs(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "predictions")
result = runner.invoke(
predict,
[
"mlp",
"montgomery_rs",
"-vv",
"--batch-size=1",
"--relevance-analysis",
f"--weight={str(datadir / 'lfs' / 'models' / 'mlp.ckpt')}",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# check predictions are there
predictions_file = os.path.join(output_folder, "train/predictions.csv")
RA1 = os.path.join(output_folder, "train_RA.pdf")
RA2 = os.path.join(output_folder, "validation_RA.pdf")
RA3 = os.path.join(output_folder, "test_RA.pdf")
assert os.path.exists(predictions_file)
assert os.path.exists(RA1)
assert os.path.exists(RA2)
assert os.path.exists(RA3)
keywords = {
r"^Loading checkpoint from.*$": 1,
r"^Starting relevance analysis for subset.*$": 3,
r"^Creating and saving plot at.*$": 3,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_logreg_montgomery_rs(temporary_basedir, datadir):
from ptbench.scripts.train import train
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "results/logreg")
result = runner.invoke(
train,
[
"logistic_regression",
"montgomery_rs",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_final_epoch.ckpt")
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert os.path.exists(
os.path.join(output_folder, "logs_csv", "version_0", "metrics.csv")
)
assert os.path.exists(
os.path.join(output_folder, "logs_tensorboard", "version_0")
)
assert os.path.exists(os.path.join(output_folder, "model_summary.txt"))
keywords = {
r"^Found \(dedicated\) '__train__' set for training$": 1,
r"^Found \(dedicated\) '__valid__' set for validation$": 1,
r"^Continuing from epoch 0$": 1,
r"^Saving model summary at.*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_predict_logreg_montgomery_rs(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
runner = CliRunner()
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "predictions")
result = runner.invoke(
predict,
[
"logistic_regression",
"montgomery_rs",
"-vv",
"--batch-size=1",
f"--weight={str(datadir / 'lfs' / 'models' / 'logreg.ckpt')}",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# check predictions are there
predictions_file = os.path.join(output_folder, "train/predictions.csv")
wfile = os.path.join(output_folder, "LogReg_Weights.pdf")
assert os.path.exists(predictions_file)
assert os.path.exists(wfile)
keywords = {
r"^Loading checkpoint from.*$": 1,
r"^Logistic regression identified: saving model weights.*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
@pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_aggregpred(temporary_basedir):
from ptbench.scripts.aggregpred import aggregpred
runner = CliRunner()
with stdout_logging() as buf:
predictions = str(temporary_basedir / "predictions" / "test.csv")
output_folder = str(temporary_basedir / "aggregpred")
result = runner.invoke(
aggregpred,
[
"-vv",
f"{predictions}",
f"{predictions}",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
# check csv file is there
assert os.path.exists(os.path.join(output_folder, "aggregpred.csv"))
keywords = {
f"Output folder: {output_folder}": 1,
r"Saving aggregated CSV file...": 1,
r"^Loading predictions from.*$": 2,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
# Not enough RAM available to do this test
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from ptbench.scripts.predict import predict
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )