# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch> # # SPDX-License-Identifier: GPL-3.0-or-later """Tests for our CLI applications.""" import contextlib import glob import os import re import pytest from click.testing import CliRunner @contextlib.contextmanager def stdout_logging(): # copy logging messages to std out import io import logging buf = io.StringIO() ch = logging.StreamHandler(buf) ch.setFormatter(logging.Formatter("%(message)s")) ch.setLevel(logging.INFO) logger = logging.getLogger("ptbench") logger.addHandler(ch) yield buf logger.removeHandler(ch) def _assert_exit_0(result): assert ( result.exit_code == 0 ), f"Exit code {result.exit_code} != 0 -- Output:\n{result.output}" def _check_help(entry_point): runner = CliRunner() result = runner.invoke(entry_point, ["--help"]) _assert_exit_0(result) assert result.output.startswith("Usage:") def test_config_help(): from ptbench.scripts.config import config _check_help(config) def test_config_list_help(): from ptbench.scripts.config import list _check_help(list) def test_config_list(): from ptbench.scripts.config import list runner = CliRunner() result = runner.invoke(list) _assert_exit_0(result) assert "module: ptbench.config.data" in result.output assert "module: ptbench.config.models" in result.output def test_config_list_v(): from ptbench.scripts.config import list result = CliRunner().invoke(list, ["--verbose"]) _assert_exit_0(result) assert "module: ptbench.config.data" in result.output assert "module: ptbench.config.models" in result.output def test_config_describe_help(): from ptbench.scripts.config import describe _check_help(describe) @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_config_describe_montgomery(): from ptbench.scripts.config import describe runner = CliRunner() result = runner.invoke(describe, ["montgomery"]) _assert_exit_0(result) assert "Montgomery datamodule for TB detection." in result.output def test_database_help(): from ptbench.scripts.database import database _check_help(database) def test_datamodule_list_help(): from ptbench.scripts.database import list _check_help(list) def test_datamodule_list(): from ptbench.scripts.database import list runner = CliRunner() result = runner.invoke(list) _assert_exit_0(result) assert result.output.startswith("Available databases:") def test_datamodule_check_help(): from ptbench.scripts.database import check _check_help(check) @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_database_check(): from ptbench.scripts.database import check runner = CliRunner() result = runner.invoke(check, ["--verbose", "--limit=1", "montgomery"]) _assert_exit_0(result) def test_main_help(): from ptbench.scripts.cli import cli _check_help(cli) def test_train_help(): from ptbench.scripts.train import train _check_help(train) def _str_counter(substr, s): return sum(1 for _ in re.finditer(substr, s, re.MULTILINE)) def test_predict_help(): from ptbench.scripts.predict import predict _check_help(predict) def test_calculate_road_help(): from ptbench.scripts.calculate_road import calculate_road _check_help(calculate_road) def test_evaluate_saliencymaps_help(): from ptbench.scripts.evaluate_saliencymaps import evaluate_saliencymaps _check_help(evaluate_saliencymaps) def test_generate_saliencymaps_help(): from ptbench.scripts.generate_saliencymaps import generate_saliencymaps _check_help(generate_saliencymaps) def test_visualize_help(): from ptbench.scripts.visualize import visualize _check_help(visualize) def test_evaluate_help(): from ptbench.scripts.evaluate import evaluate _check_help(evaluate) def test_evaluatevis_help(): from ptbench.scripts.evaluatevis import evaluatevis _check_help(evaluatevis) def test_compare_vis_help(): from ptbench.scripts.compare_vis import compare_vis _check_help(compare_vis) @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_train_pasa_montgomery(temporary_basedir): from ptbench.scripts.train import train runner = CliRunner() with stdout_logging() as buf: output_folder = str(temporary_basedir / "results") result = runner.invoke( train, [ "pasa", "montgomery", "-vv", "--epochs=1", "--batch-size=1", f"--output-folder={output_folder}", ], ) _assert_exit_0(result) assert os.path.exists( os.path.join(output_folder, "model_final_epoch.ckpt") ) assert os.path.exists( os.path.join(output_folder, "model_lowest_valid_loss.ckpt") ) assert os.path.exists(os.path.join(output_folder, "constants.csv")) assert ( len( glob.glob( os.path.join(output_folder, "logs", "events.out.tfevents.*") ) ) == 1 ) assert os.path.exists(os.path.join(output_folder, "model-summary.txt")) keywords = { r"^Writing command-line for reproduction at .*$": 1, r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1, r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1, r"^Applying datamodule train sampler balancing...$": 1, r"^Balancing samples from dataset using metadata targets `label`$": 1, r"^Training for at most 1 epochs.$": 1, r"^Uninitialised pasa model - computing z-norm factors from train dataloader.$": 1, r"^Saving model summary at.*$": 1, r"^Dataset `train` is already setup. Not re-instantiating it.$": 1, r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1, } buf.seek(0) logging_output = buf.read() for k, v in keywords.items(): assert _str_counter(k, logging_output) == v, ( f"Count for string '{k}' appeared " f"({_str_counter(k, logging_output)}) " f"instead of the expected {v}:\nOutput:\n{logging_output}" ) @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_train_pasa_montgomery_from_checkpoint(temporary_basedir): from ptbench.scripts.train import train runner = CliRunner() output_folder = str(temporary_basedir / "results/pasa_checkpoint") result0 = runner.invoke( train, [ "pasa", "montgomery", "-vv", "--epochs=1", "--batch-size=1", f"--output-folder={output_folder}", ], ) _assert_exit_0(result0) assert os.path.exists(os.path.join(output_folder, "model_final_epoch.ckpt")) assert os.path.exists( os.path.join(output_folder, "model_lowest_valid_loss.ckpt") ) assert os.path.exists(os.path.join(output_folder, "constants.csv")) assert ( len( glob.glob( os.path.join(output_folder, "logs", "events.out.tfevents.*") ) ) == 1 ) assert os.path.exists(os.path.join(output_folder, "model-summary.txt")) with stdout_logging() as buf: result = runner.invoke( train, [ "pasa", "montgomery", "-vv", "--epochs=2", "--batch-size=1", f"--output-folder={output_folder}", ], ) _assert_exit_0(result) assert os.path.exists( os.path.join(output_folder, "model_final_epoch.ckpt") ) assert os.path.exists( os.path.join(output_folder, "model_lowest_valid_loss.ckpt") ) assert os.path.exists(os.path.join(output_folder, "constants.csv")) assert ( len( glob.glob( os.path.join(output_folder, "logs", "events.out.tfevents.*") ) ) == 2 ) assert os.path.exists(os.path.join(output_folder, "model-summary.txt")) keywords = { r"^Writing command-line for reproduction at .*$": 1, r"^Loading dataset:`train` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1, r"^Loading dataset:`validation` without caching. Trade-off: CPU RAM: less | Disk: more.$": 1, r"^Applying datamodule train sampler balancing...$": 1, r"^Balancing samples from dataset using metadata targets `label`$": 1, r"^Training for at most 2 epochs.$": 1, r"^Resuming from epoch 0...$": 1, r"^Saving model summary at.*$": 1, r"^Dataset `train` is already setup. Not re-instantiating it.$": 1, r"^Dataset `validation` is already setup. Not re-instantiating it.$": 1, r"^Restoring normalizer from checkpoint.$": 1, } buf.seek(0) logging_output = buf.read() for k, v in keywords.items(): assert _str_counter(k, logging_output) == v, ( f"Count for string '{k}' appeared " f"({_str_counter(k, logging_output)}) " f"instead of the expected {v}:\nOutput:\n{logging_output}" ) # @pytest.mark.skip(reason="Test need to be updated") @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_predict_pasa_montgomery(temporary_basedir, datadir): from ptbench.scripts.predict import predict runner = CliRunner() with stdout_logging() as buf: output = str(temporary_basedir / "predictions") result = runner.invoke( predict, [ "pasa", "montgomery", "-vv", "--batch-size=1", f"--weight={str(temporary_basedir / 'results' / 'model_final_epoch.ckpt')}", f"--output={output}", ], ) _assert_exit_0(result) assert os.path.exists(output) keywords = { r"^Loading dataset: * without caching. Trade-off: CPU RAM: less | Disk: more$": 3, r"^Loading checkpoint from .*$": 1, r"^Restoring normalizer from checkpoint.$": 1, r"^Running prediction on `train` split...$": 1, r"^Running prediction on `validation` split...$": 1, r"^Running prediction on `test` split...$": 1, r"^Predictions saved to .*$": 1, } buf.seek(0) logging_output = buf.read() for k, v in keywords.items(): assert _str_counter(k, logging_output) == v, ( f"Count for string '{k}' appeared " f"({_str_counter(k, logging_output)}) " f"instead of the expected {v}:\nOutput:\n{logging_output}" ) # @pytest.mark.skip(reason="Test need to be updated") @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") def test_evaluate_pasa_montgomery(temporary_basedir): from ptbench.scripts.evaluate import evaluate runner = CliRunner() with stdout_logging() as buf: prediction_folder = str(temporary_basedir / "predictions") output_folder = str(temporary_basedir / "evaluations") result = runner.invoke( evaluate, [ "-vv", "montgomery", f"--predictions={prediction_folder}", f"--output-folder={output_folder}", "--threshold=test", ], ) _assert_exit_0(result) assert os.path.exists(os.path.join(output_folder, "plots.pdf")) assert os.path.exists(os.path.join(output_folder, "summary.rst")) keywords = { r"^Setting --threshold=.*$": 1, r"^Analyzing split `train`...$": 1, r"^Analyzing split `validation`...$": 1, r"^Analyzing split `test`...$": 1, r"^Saving measures at .*$": 1, r"^Saving figures at .*$": 1, } buf.seek(0) logging_output = buf.read() for k, v in keywords.items(): assert _str_counter(k, logging_output) == v, ( f"Count for string '{k}' appeared " f"({_str_counter(k, logging_output)}) " f"instead of the expected {v}:\nOutput:\n{logging_output}" ) def test_compare_vis(temporary_basedir, datadir): from ptbench.scripts.compare_vis import compare_vis runner = CliRunner() input_dir = str( datadir / "test_visualization_images" / "indirect-model" / "tbx11k" ) output_dir = str(temporary_basedir / "compare_vis") result = runner.invoke( compare_vis, ["-vv", "-i", str(input_dir), "-o", str(output_dir)] ) assert result.exit_code == 0 # Check that the output image was created assert ( temporary_basedir / "compare_vis" / "targeted_class" / "test" / "tb0004.png" ).exists() assert ( temporary_basedir / "compare_vis" / "targeted_class" / "train" / "tb0005.png" ).exists() # This script does not work anymore, either fix or remove the script + this test # def test_evaluatevis(temporary_basedir): # import pandas as pd # from ptbench.scripts.evaluatevis import evaluatevis # runner = CliRunner() # # Create a sample directory structure and CSV files # input_folder = temporary_basedir / "camutils_cli" / "gradcam" # input_folder.mkdir(parents=True, exist_ok=True) # class1_dir = input_folder / "class1" # class1_dir.mkdir(parents=True, exist_ok=True) # class2_dir = input_folder / "class2" # class2_dir.mkdir(parents=True, exist_ok=True) # data = { # "MoRF": [1, 2, 3], # "LeRF": [2, 4, 6], # "Combined Score ((LeRF-MoRF) / 2)": [1.5, 3, 4.5], # "IoU": [1, 2, 3], # "IoDA": [2, 4, 6], # "propEnergy": [1.5, 3, 4.5], # "ASF": [1, 2, 3], # } # df = pd.DataFrame(data) # df.to_csv(class1_dir / "file1.csv", index=False) # df.to_csv(class2_dir / "file1.csv", index=False) # df.to_csv(class1_dir / "file2.csv", index=False) # df.to_csv(class2_dir / "file2.csv", index=False) # result = runner.invoke(evaluatevis, ["-vv", "-i", str(input_folder)]) # assert result.exit_code == 0 # assert (input_folder / "file1_summary.csv").exists() # assert (input_folder / "file2_summary.csv").exists() # Not enough RAM available to do this test # @pytest.mark.skip_if_rc_var_not_set("datadir.montgomery") # def test_predict_densenetrs_montgomery(temporary_basedir, datadir): # from ptbench.scripts.predict import predict # runner = CliRunner() # with stdout_logging() as buf: # output_folder = str(temporary_basedir / "predictions") # result = runner.invoke( # predict, # [ # "densenet_rs", # "montgomery_f0_rgb", # "-vv", # "--batch-size=1", # f"--weight={str(datadir / 'lfs' / 'models' / 'densenetrs.pth')}", # f"--output-folder={output_folder}", # "--grad-cams" # ], # ) # _assert_exit_0(result) # # check predictions are there # predictions_file1 = os.path.join(output_folder, "train/predictions.csv") # predictions_file2 = os.path.join(output_folder, "validation/predictions.csv") # predictions_file3 = os.path.join(output_folder, "test/predictions.csv") # assert os.path.exists(predictions_file1) # assert os.path.exists(predictions_file2) # assert os.path.exists(predictions_file3) # # check some grad cams are there # cam1 = os.path.join(output_folder, "train/cams/MCUCXR_0002_0_cam.png") # cam2 = os.path.join(output_folder, "train/cams/MCUCXR_0126_1_cam.png") # cam3 = os.path.join(output_folder, "train/cams/MCUCXR_0275_1_cam.png") # cam4 = os.path.join(output_folder, "validation/cams/MCUCXR_0399_1_cam.png") # cam5 = os.path.join(output_folder, "validation/cams/MCUCXR_0113_1_cam.png") # cam6 = os.path.join(output_folder, "validation/cams/MCUCXR_0013_0_cam.png") # cam7 = os.path.join(output_folder, "test/cams/MCUCXR_0027_0_cam.png") # cam8 = os.path.join(output_folder, "test/cams/MCUCXR_0094_0_cam.png") # cam9 = os.path.join(output_folder, "test/cams/MCUCXR_0375_1_cam.png") # assert os.path.exists(cam1) # assert os.path.exists(cam2) # assert os.path.exists(cam3) # assert os.path.exists(cam4) # assert os.path.exists(cam5) # assert os.path.exists(cam6) # assert os.path.exists(cam7) # assert os.path.exists(cam8) # assert os.path.exists(cam9) # keywords = { # r"^Loading checkpoint from.*$": 1, # r"^Total time:.*$": 3, # r"^Grad cams folder:.*$": 3, # } # buf.seek(0) # logging_output = buf.read() # for k, v in keywords.items(): # assert _str_counter(k, logging_output) == v, ( # f"Count for string '{k}' appeared " # f"({_str_counter(k, logging_output)}) " # f"instead of the expected {v}:\nOutput:\n{logging_output}" # )