Newer
Older
# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tests for our CLI applications."""
import contextlib
import pytest
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from click.testing import CliRunner
@contextlib.contextmanager
def stdout_logging():
# copy logging messages to std out
import io
import logging
buf = io.StringIO()
ch = logging.StreamHandler(buf)
ch.setFormatter(logging.Formatter("%(message)s"))
ch.setLevel(logging.INFO)
logger = logging.getLogger("ptbench")
logger.addHandler(ch)
yield buf
logger.removeHandler(ch)
def _assert_exit_0(result):
assert (
result.exit_code == 0
), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}"
def _check_help(entry_point):
runner = CliRunner()
result = runner.invoke(entry_point, ["--help"])
_assert_exit_0(result)
assert result.output.startswith("Usage:")
def test_config_help():
from ptbench.scripts.config import config
_check_help(config)
def test_config_list_help():
from ptbench.scripts.config import list
_check_help(list)
def test_config_list():
from ptbench.scripts.config import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_list_v():
from ptbench.scripts.config import list
result = CliRunner().invoke(list, ["--verbose"])
_assert_exit_0(result)

André Anjos
committed
assert "module: ptbench.config.data" in result.output
assert "module: ptbench.config.models" in result.output
def test_config_describe_help():
from ptbench.scripts.config import describe
_check_help(describe)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_config_describe_montgomery():
from ptbench.scripts.config import describe
runner = CliRunner()
result = runner.invoke(describe, ["montgomery"])
_assert_exit_0(result)
assert "Montgomery datamodule for TB detection." in result.output
def test_database_help():
from ptbench.scripts.database import database
_check_help(database)
from ptbench.scripts.database import list
from ptbench.scripts.database import list
runner = CliRunner()
result = runner.invoke(list)
_assert_exit_0(result)
assert result.output.startswith("Available databases:")
from ptbench.scripts.database import check
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_database_check():
from ptbench.scripts.database import check
result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"])
_assert_exit_0(result)
def test_main_help():
from ptbench.scripts.cli import cli
_check_help(cli)
def test_train_help():
from ptbench.scripts.train import train
_check_help(train)
def _str_counter(substr, s):
return sum(1 for _ in re.finditer(substr, s, re.MULTILINE))
def test_predict_help():
from ptbench.scripts.predict import predict
_check_help(predict)
def test_visualize_help():
from ptbench.scripts.visualize import visualize
_check_help(visualize)
def test_evaluate_help():
from ptbench.scripts.evaluate import evaluate
_check_help(evaluate)
def test_evaluatevis_help():
from ptbench.scripts.evaluatevis import evaluatevis
_check_help(evaluatevis)
def test_comparevis_help():
from ptbench.scripts.comparevis import comparevis
_check_help(comparevis)
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_train_pasa_montgomery(temporary_basedir):
from ptbench.scripts.train import train

André Anjos
committed
runner = CliRunner()

André Anjos
committed
with stdout_logging() as buf:
output_folder = str(temporary_basedir / "results")
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)

André Anjos
committed
assert os.path.exists(

André Anjos
committed
)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")

André Anjos
committed
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))

André Anjos
committed
keywords = {
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 1 epochs.$": 1,
r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1,

André Anjos
committed
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,

André Anjos
committed
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")
def test_train_pasa_montgomery_from_checkpoint(temporary_basedir):
from ptbench.scripts.train import train
runner = CliRunner()
output_folder = str(temporary_basedir / "results/pasa_checkpoint")
result0 = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=1",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result0)
assert os.path.exists(os.path.join(output_folder, "model_final_epoch.ckpt"))
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 1

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
with stdout_logging() as buf:
result = runner.invoke(
train,
[
"pasa",
"montgomery",
"-vv",
"--epochs=2",
"--batch-size=1",
f"--output-folder={output_folder}",
],
)
_assert_exit_0(result)
assert os.path.exists(
os.path.join(output_folder, "model_lowest_valid_loss.ckpt")
)
assert os.path.exists(os.path.join(output_folder, "constants.csv"))
assert (
len(
glob.glob(
os.path.join(output_folder, "logs", "events.out.tfevents.*")
)
)
== 2

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "model-summary.txt"))
r"^Writing command-line for reproduction at .*$": 1,
r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1,
r"^Applying datamodule train sampler balancing...$": 1,
r"^Balancing samples from dataset using metadata targets `label`$": 1,
r"^Training for at most 2 epochs.$": 1,
r"^Resuming from epoch 0...$": 1,
r"^Saving model summary at.*$": 1,
r"^Dataset `train` is already setup. Not re-instantiating it.$": 1,
r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1,
r"^Restoring normalizer from checkpoint.$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_predict_pasa_montgomery(temporary_basedir, datadir):
from ptbench.scripts.predict import predict
runner = CliRunner()
with stdout_logging() as buf:

Maxime DELITROZ
committed
output = str(temporary_basedir / "predictions")

André Anjos
committed
result = runner.invoke(
predict,
[
"pasa",
"montgomery",
"-vv",
"--batch-size=1",

Maxime DELITROZ
committed
f"--weight={str(temporary_basedir / 'results' / 'model_final_epoch.ckpt')}",
f"--output={output}",

André Anjos
committed
],
)
_assert_exit_0(result)

Maxime DELITROZ
committed
assert os.path.exists(output)

André Anjos
committed
keywords = {

Maxime DELITROZ
committed
r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more$": 3,
r"^Loading checkpoint from .*$": 1,

Maxime DELITROZ
committed
r"^Running prediction on `train` split...$": 1,
r"^Running prediction on `validation` split...$": 1,
r"^Running prediction on `test` split...$": 1,

André Anjos
committed
}

Maxime DELITROZ
committed

André Anjos
committed
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"

André Anjos
committed

Maxime DELITROZ
committed
# @pytest.mark.skip(reason="Test need to be updated")
@pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
def test_evaluate_pasa_montgomery(temporary_basedir):
from ptbench.scripts.evaluate import evaluate
runner = CliRunner()
with stdout_logging() as buf:
prediction_folder = str(temporary_basedir / "predictions")
output_folder = str(temporary_basedir / "evaluations")
result = runner.invoke(
evaluate,
[
"-vv",
"montgomery",

Maxime DELITROZ
committed
f"--predictions={prediction_folder}",

André Anjos
committed
f"--output-folder={output_folder}",

André Anjos
committed
],
)
_assert_exit_0(result)
assert os.path.exists(os.path.join(output_folder, "plots.pdf"))

Maxime DELITROZ
committed
assert os.path.exists(os.path.join(output_folder, "summary.rst"))

Maxime DELITROZ
committed
r"^Setting --threshold=.*$": 1,
r"^Analyzing split `train`...$": 1,
r"^Analyzing split `validation`...$": 1,
r"^Analyzing split `test`...$": 1,
r"^Saving measures at .*$": 1,
}
buf.seek(0)
logging_output = buf.read()
for k, v in keywords.items():
assert _str_counter(k, logging_output) == v, (

André Anjos
committed
f"Count for string '{k}' appeared "
f"({_str_counter(k, logging_output)}) "
f"instead of the expected {v}:\nOutput:\n{logging_output}"
)
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def test_comparevis(temporary_basedir, datadir):
from ptbench.scripts.comparevis import comparevis
runner = CliRunner()
input_dir = str(
datadir / "test_visualization_images" / "indirect-model" / "tbx11k"
)
output_dir = str(temporary_basedir / "comparevis")
result = runner.invoke(
comparevis, ["-vv", "-i", str(input_dir), "-o", str(output_dir)]
)
assert result.exit_code == 0
# Check that the output image was created
assert (
temporary_basedir
/ "comparevis"
/ "targeted_class"
/ "test"
/ "tb0004.png"
).exists()
assert (
temporary_basedir
/ "comparevis"
/ "targeted_class"
/ "train"
/ "tb0005.png"
).exists()
def test_evaluatevis(temporary_basedir):
import pandas as pd
from ptbench.scripts.evaluatevis import evaluatevis
runner = CliRunner()
# Create a sample directory structure and CSV files
input_folder = temporary_basedir / "camutils_cli" / "gradcam"
input_folder.mkdir(parents=True, exist_ok=True)
class1_dir = input_folder / "class1"
class1_dir.mkdir(parents=True, exist_ok=True)
class2_dir = input_folder / "class2"
class2_dir.mkdir(parents=True, exist_ok=True)
data = {
"MoRF": [1, 2, 3],
"LeRF": [2, 4, 6],
"Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5],
"IoU": [1, 2, 3],
"IoDA": [2, 4, 6],
"propEnergy": [1.5, 3, 4.5],
"ASF": [1, 2, 3],
}
df = pd.DataFrame(data)
df.to_csv(class1_dir / "file1.csv", index=False)
df.to_csv(class2_dir / "file1.csv", index=False)
df.to_csv(class1_dir / "file2.csv", index=False)
df.to_csv(class2_dir / "file2.csv", index=False)
result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)])
assert result.exit_code == 0
assert (input_folder / "file1_summary.csv").exists()
assert (input_folder / "file2_summary.csv").exists()
# @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery")

André Anjos
committed
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
# def test_predict_densenetrs_montgomery(temporary_basedir, datadir):
# from ptbench.scripts.predict import predict
# runner = CliRunner()
# with stdout_logging() as buf:
# output_folder = str(temporary_basedir / "predictions")
# result = runner.invoke(
# predict,
# [
# "densenet_rs",
# "montgomery_f0_rgb",
# "-vv",
# "--batch-size=1",
# f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}",
# f"--output-folder={output_folder}",
# "--grad-cams"
# ],
# )
# _assert_exit_0(result)
# # check predictions are there
# predictions_file1 = os.path.join(output_folder, "train/predictions.csv")
# predictions_file2 = os.path.join(output_folder, "validation/predictions.csv")
# predictions_file3 = os.path.join(output_folder, "test/predictions.csv")
# assert os.path.exists(predictions_file1)
# assert os.path.exists(predictions_file2)
# assert os.path.exists(predictions_file3)
# # check some grad cams are there
# cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png")
# cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png")
# cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png")
# cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png")
# cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png")
# cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png")
# cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png")
# cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png")
# cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png")
# assert os.path.exists(cam1)
# assert os.path.exists(cam2)
# assert os.path.exists(cam3)
# assert os.path.exists(cam4)
# assert os.path.exists(cam5)
# assert os.path.exists(cam6)
# assert os.path.exists(cam7)
# assert os.path.exists(cam8)
# assert os.path.exists(cam9)
# keywords = {
# r"^Loading checkpoint from.*$": 1,
# r"^Total time:.*$": 3,
# r"^Grad cams folder:.*$": 3,
# }
# buf.seek(0)
# logging_output = buf.read()
# for k, v in keywords.items():
# assert _str_counter(k, logging_output) == v, (
# f"Count for string '{k}' appeared "
# f"({_str_counter(k, logging_output)}) "
# f"instead of the expected {v}:\nOutput:\n{logging_output}"
# )