Newer
Older
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import pytest
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("ptbench")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from ptbench.scripts.config import config
_check_help(config)
def test_config_list_help():
from ptbench.scripts.config import list
_check_help(list)
def test_config_list():
from ptbench.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_list_v():
from ptbench.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_describe_help():
from ptbench.scripts.config import describe
_check_help(describe)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_config_describe_montgomery():
from ptbench.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery datamodule for TB detection." in result.output
def test_database_help():
from ptbench.scripts.database import database
_check_help(database)
from ptbench.scripts.database import list
from ptbench.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
from ptbench.scripts.database import check
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from ptbench.scripts.database import check
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
_assert_exit_0(result)
def test_main_help():
from ptbench.scripts.cli import cli
_check_help(cli)
def test_train_help():
from ptbench.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from ptbench.scripts.predict import predict
_check_help(predict)
def test_evaluate_help():
from ptbench.scripts.evaluate import evaluate
_check_help(evaluate)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_train_pasa_montgomery(temporary_basedir):
from ptbench.scripts.train import train

André Anjos
committed
runner = CliRunner()

André Anjos
committed
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "results")
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)

André Anjos
committed
assert os.path.exists(

André Anjos
committed
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")

André Anjos
committed
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

André Anjos
committed
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,

André Anjos
committed
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,

André Anjos
committed
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from ptbench.scripts.train import train
runner = CliRunner()
output_folder = str(temporary_basedir / "results/pasa_checkpoint")
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result0)
assert os.path.exists(os.path.join(output_folder, "model_final_epoch.ckpt"))
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0...$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
runner = CliRunner()
with stdout_logging() as buf:

Maxime DELITROZ
committed
output = str(temporary_basedir / "predictions")

André Anjos
committed
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",

Maxime DELITROZ
committed
f"--weight={str(temporary_basedir / 'results' / 'model_final_epoch.ckpt')}",
f"--output={output}",

André Anjos
committed
],
)
_assert_exit_0(result)

Maxime DELITROZ
committed
assert os.path.exists(output)

André Anjos
committed
keywords = {

Maxime DELITROZ
committed
r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more$": 3,
r"^Loading checkpoint from .*$": 1,

Maxime DELITROZ
committed
r"^Running prediction on `train` split...$": 1,
r"^Running prediction on `validation` split...$": 1,
r"^Running prediction on `test` split...$": 1,
r"^Predictions saved to .*$": 1

André Anjos
committed
}

Maxime DELITROZ
committed

André Anjos
committed
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_evaluate_pasa_montgomery(temporary_basedir):
from ptbench.scripts.evaluate import evaluate
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",

Maxime DELITROZ
committed
f"--predictions={prediction_folder}",

André Anjos
committed
f"--output-folder={output_folder}",

Maxime DELITROZ
committed
"--threshold=test"

André Anjos
committed
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "summary.rst"))

Maxime DELITROZ
committed
r"^Setting --threshold=.*$": 1,
r"^Analyzing split `train`...$": 1,
r"^Analyzing split `validation`...$": 1,
r"^Analyzing split `test`...$": 1,
r"^Saving measures at .*$": 1,
r"^Saving figures at .*$": 1
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (

André Anjos
committed
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from ptbench.scripts.predict import predict
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )