diff --git a/doc/api.rst b/doc/api.rst index 793ad9e68cc1d69304b194fa95b6fc5241b4df62..8f4f90ae0c3d98afbdf24a60653cbafe68bd8bb4 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -232,10 +232,7 @@ Segmentation-specific utilities. .. autosummary:: :toctree: api/utils - mednet.libs.segmentation.utils.measure - mednet.libs.segmentation.utils.plot mednet.libs.common.utils.rc - mednet.libs.segmentation.utils.table .. include:: links.rst diff --git a/doc/references.rst b/doc/references.rst index 71574c60f27995b1c6eaa20c769260f5bc9ae1b9..44f2a2ff5d2d13509ba82503e5adca3b37778824 100644 --- a/doc/references.rst +++ b/doc/references.rst @@ -102,11 +102,6 @@ Segmentation with Minimalistic Models**, 2020. https://arxiv.org/abs/2009.01907 -.. [GOUTTE-2005] *C. Goutte and E. Gaussier*, **A probabilistic interpretation - of precision, recall and F-score, with implication for evaluation**, - European conference on Advances in Information Retrieval Research, 2005. - https://doi.org/10.1007/978-3-540-31865-1_25 - .. [JSRT-2000] *J. Shiraishi, S. Katsuragawa, J. Ikezoe, T. Matsumoto, T. Kobayashi, K. Komatsu, M. Matsui, H. Fujita, Y. Kodera, K. Doi*, **Development of a digital image database for chest radiographs with and diff --git a/src/mednet/libs/segmentation/engine/evaluator.py b/src/mednet/libs/segmentation/engine/evaluator.py index 6a7f7444a020675629a2da724ab98c9201453548..82ec4bbba6eb56a14fa55551be8a9081e9794e6c 100644 --- a/src/mednet/libs/segmentation/engine/evaluator.py +++ b/src/mednet/libs/segmentation/engine/evaluator.py @@ -6,506 +6,459 @@ import logging import pathlib +import typing import h5py import numpy -import pandas -import PIL -import torch -import torch.nn.functional -import torchvision.transforms.functional as tf +import numpy.typing from tqdm import tqdm -from ..utils.measure import base_measures, bayesian_measures - logger = logging.getLogger(__name__) +SUPPORTED_METRIC_TYPE = typing.Literal[ + "precision", "recall", "specificity", "accuracy", "jaccard", "f1" +] +"""Supported metrics for evaluation of counts.""" -def _posneg( - pred: torch.Tensor, gt: torch.Tensor, threshold: float -) -> tuple[torch.tensor, torch.tensor, torch.tensor, torch.tensor]: - """Calculate true and false positives and negatives. + +def _tricky_division(n: float, d: float): + """Divide n by d, or 0.0 in case of a division by zero. Parameters ---------- - pred - Pixel-wise predictions. - gt - Ground-truth (annotations). - threshold - A particular threshold in which to calculate the performance - measures. + n + The number to divide. + d + The divisor. Returns ------- - tp_tensor - Boolean tensor with true positives, considering all observations. - fp_tensor - Boolean tensor with false positives, considering all observations. - tn_tensor - Boolean tensor with true negatives, considering all observations. - fn_tensor - Boolean tensor with false negatives, considering all observations. + Result of the division. """ - gt = gt.byte() # byte tensor + return n / (d + (d == 0)) - # threshold - binary_pred = torch.gt(pred, threshold).byte() - # equals and not-equals - equals = torch.eq(binary_pred, gt).type(torch.uint8) # tensor - notequals = torch.ne(binary_pred, gt).type(torch.uint8) # tensor +def precision(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the precision given true/false positive/negative counts. - # true positives - tp_tensor = gt * binary_pred + P, AKA positive predictive value (PPV). It corresponds arithmetically to + ``tp/(tp+fp)``. In the case ``tp+fp == 0``, this function returns zero for + precision. - # false positives - fp_tensor = torch.eq((binary_pred + tp_tensor), 1).byte() - - # true negatives - tn_tensor = equals - tp_tensor + Parameters + ---------- + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". - # false negatives - fn_tensor = notequals - fp_tensor + Returns + ------- + The precision. + """ + return _tricky_division(tp, tp + fp) - return tp_tensor, fp_tensor, tn_tensor, fn_tensor +def recall(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the recall given true/false positive/negative counts. -def sample_measures_for_threshold( - pred: torch.Tensor, gt: torch.Tensor, mask: torch.Tensor, threshold: float -) -> tuple[int, int, int, int]: - """Calculate counts on one single sample, for a specific threshold. + R, AKA sensitivity, hit rate, or true positive rate (TPR). It corresponds + arithmetically to ``tp/(tp+fn)``. In the special case where ``tp+fn == + 0``, this function returns zero for recall. Parameters ---------- - pred - Pixel-wise predictions. - gt - Ground-truth (annotations). - mask - Region mask (used only if available). May be set to ``None``. - - threshold - A particular threshold in which to calculate the performance - measures. + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". Returns ------- - The true positives, false positives, true negatives and false - negatives, in that order. + The recall. """ + return _tricky_division(tp, tp + fn) - tp_tensor, fp_tensor, tn_tensor, fn_tensor = _posneg(pred, gt, threshold) - # if a mask is provided, consider only TP/FP/TN/FN **within** the region of - # interest defined by the mask - if mask is not None: - antimask = torch.le(mask, 0.5) - tp_tensor[antimask] = 0 - fp_tensor[antimask] = 0 - tn_tensor[antimask] = 0 - fn_tensor[antimask] = 0 +def specificity(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the specificity given true/false positive/negative counts. - # calc measures from scalars - tp_count = torch.sum(tp_tensor).item() - fp_count = torch.sum(fp_tensor).item() - tn_count = torch.sum(tn_tensor).item() - fn_count = torch.sum(fn_tensor).item() + S, AKA selectivity or true negative rate (TNR). It corresponds + arithmetically to ``tn/(tn+fp)``. In the special case where ``tn+fp == + 0``, this function returns zero for specificity. - return tp_count, fp_count, tn_count, fn_count + Parameters + ---------- + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". + Returns + ------- + The specificity. + """ + return _tricky_division(tn, fp + tn) -def _sample_measures( - pred: torch.Tensor, gt: torch.Tensor, mask: torch.Tensor, steps: float -) -> pandas.DataFrame: - """Calculate measures on one single sample. + +def accuracy(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the accuracy given true/false positive/negative counts. + + A, see `Accuracy + <https://en.wikipedia.org/wiki/Evaluation_of_binary_classifiers>`_. is the + proportion of correct predictions (both true positives and true negatives) + among the total number of pixels examined. It corresponds arithmetically + to ``(tp+tn)/(tp+tn+fp+fn)``. This measure includes both true-negatives + and positives in the numerator, what makes it sensitive to data or regions + without annotations. Parameters ---------- - pred - Pixel-wise predictions. - gt - Ground-truth (annotations). - mask - Region mask (used only if available). May be set to ``None``. - steps - Number of steps to use for threshold analysis. The step size is - calculated from this by dividing ``1.0/steps``. + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". Returns ------- - A pandas dataframe with the following columns: - - * tp: int - * fp: int - * tn: int - * fn: int + The accuracy. """ + return _tricky_division(tp + tn, tp + fp + fn + tn) - step_size = 1.0 / steps - data = [ - (index, threshold) + sample_measures_for_threshold(pred, gt, mask, threshold) - for index, threshold in enumerate(numpy.arange(0.0, 1.0, step_size)) - ] - - retval = pandas.DataFrame( - data, - columns=( - "index", - "threshold", - "tp", - "fp", - "tn", - "fn", - ), - ) - retval.set_index("index", inplace=True) - return retval +def jaccard(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the Jaccard index given true/false positive/negative counts. -def _sample_analysis( - img: torch.Tensor, - pred: torch.Tensor, - gt: torch.Tensor, - mask: torch.Tensor, - threshold: float, - tp_color: tuple[int, int, int] = (0, 255, 0), # (128,128,128) Gray - fp_color: tuple[int, int, int] = (0, 0, 255), # (70, 240, 240) Cyan - fn_color: tuple[int, int, int] = (255, 0, 0), # (245, 130, 48) Orange - overlay: bool = True, -) -> PIL.Image.Image: - """Visualize true positives, false positives and false negatives. + J, see `Jaccard Index or Similarity + <https://en.wikipedia.org/wiki/Jaccard_index>`_. It corresponds + arithmetically to ``tp/(tp+fp+fn)``. In the special case where ``tn+fp+fn + == 0``, this function returns zero for the Jaccard index. The Jaccard index + depends on a TP-only numerator, similarly to the F1 score. For regions + where there are no annotations, the Jaccard index will always be zero, + irrespective of the model output. Accuracy may be a better proxy if one + needs to consider the true abscence of annotations in a region as part of + the measure. Parameters ---------- - img - Original image. - pred - Pixel-wise predictions. - gt - Ground-truth (annotations). - mask - Region mask (used only if available). May be set to ``None``. - threshold - The threshold to be used while analyzing this image's probability map. - tp_color - RGB value for true positives. - fp_color - RGB value for false positives. - fn_color - RGB value for false negatives. - overlay - If set to ``True`` (which is the default), then overlay annotations on - top of the image. Otherwise, represent data on a black canvas. + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". Returns ------- - A PIL image that contains the overlayed analysis of true-positives - (TP), false-positives (FP) and false negatives (FN). + The Jaccard index. """ + return _tricky_division(tp, tp + fp + fn) - tp_tensor, fp_tensor, tn_tensor, fn_tensor = _posneg(pred, gt, threshold) - # if a mask is provided, consider only TP/FP/TN/FN **within** the region of - # interest defined by the mask - if mask is not None: - antimask = torch.le(mask, 0.5) - tp_tensor[antimask] = 0 - fp_tensor[antimask] = 0 - tn_tensor[antimask] = 0 - fn_tensor[antimask] = 0 +def f1_score(tp: int, fp: int, tn: int, fn: int) -> float: + """Calculate the F1 score given true/false positive/negative counts. - # change to PIL representation - tp_pil = tf.to_pil_image(tp_tensor.float()) - tp_pil_colored = PIL.ImageOps.colorize(tp_pil, (0, 0, 0), tp_color) + F1, see `F1-score <https://en.wikipedia.org/wiki/F1_score>`_. It + corresponds arithmetically to ``2*P*R/(P+R)`` or ``2*tp/(2*tp+fp+fn)``. In + the special case where ``P+R == (2*tp+fp+fn) == 0``, this function returns + zero for the Jaccard index. The F1 or Dice score depends on a TP-only + numerator, similarly to the Jaccard index. For regions where there are no + annotations, the F1-score will always be zero, irrespective of the model + output. Accuracy may be a better proxy if one needs to consider the true + abscence of annotations in a region as part of the measure. - fp_pil = tf.to_pil_image(fp_tensor.float()) - fp_pil_colored = PIL.ImageOps.colorize(fp_pil, (0, 0, 0), fp_color) + Parameters + ---------- + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". - fn_pil = tf.to_pil_image(fn_tensor.float()) - fn_pil_colored = PIL.ImageOps.colorize(fn_pil, (0, 0, 0), fn_color) + Returns + ------- + The F1-score. + """ + return _tricky_division(2 * tp, (2 * tp) + fp + fn) - tp_pil_colored.paste(fp_pil_colored, mask=fp_pil) - tp_pil_colored.paste(fn_pil_colored, mask=fn_pil) - if overlay: - img = tf.to_pil_image(img) # PIL Image - # using blend here, to fade original image being overlayed, or - # its brightness may obfuscate colors from the vessel map - tp_pil_colored = PIL.Image.blend(img, tp_pil_colored, 0.5) +_METRIC_MAPPING = { + k: v + for k, v in zip( + typing.get_args(SUPPORTED_METRIC_TYPE), + [precision, recall, specificity, accuracy, jaccard, f1_score], + ) +} +"""Maps supported metric names and callables implementing them.""" - return tp_pil_colored +def name2metric( + name: SUPPORTED_METRIC_TYPE, +) -> typing.Callable[[int, int, int, int], float]: + """Convert a string name to a callable for summarizing counts. -def _summarize(data: dict[str, pandas.DataFrame]) -> pandas.DataFrame: - """Summarize collected dataframes and add bayesian figures. + Parameters + ---------- + name + The name of the metric to be looked up. + + Returns + ------- + A callable that summarizes counts into single floating-point number. + """ + return _METRIC_MAPPING[name] + + +def all_metrics(tp: int, fp: int, tn: int, fn: int) -> list[float]: + """Compute all available metrics at once. Parameters ---------- - data - A tuple containing the unique sample stem and a Dataframe containing - the evaluation performance on this sample. + tp + True positive count, AKA "hit". + fp + False positive count, AKA "false alarm", or "Type I error". + tn + True negative count, AKA "correct rejection". + fn + False Negative count, AKA "miss", or "Type II error". Returns ------- - A pandas Dataframe containing the summarized metrics. + All supported metrics in the order defined by + :py:obj:`SUPPORTED_METRIC_TYPE`. """ + return [k(tp, fp, tn, fn) for k in _METRIC_MAPPING.values()] - _entries = ( - "mean_precision", - "mode_precision", - "lower_precision", - "upper_precision", - "mean_recall", - "mode_recall", - "lower_recall", - "upper_recall", - "mean_specificity", - "mode_specificity", - "lower_specificity", - "upper_specificity", - "mean_accuracy", - "mode_accuracy", - "lower_accuracy", - "upper_accuracy", - "mean_jaccard", - "mode_jaccard", - "lower_jaccard", - "upper_jaccard", - "mean_f1_score", - "mode_f1_score", - "lower_f1_score", - "upper_f1_score", - "frequentist_precision", - "frequentist_recall", - "frequentist_specificity", - "frequentist_accuracy", - "frequentist_jaccard", - "frequentist_f1_score", - ) - def _row_summary(r): - # run bayesian_measures(), flatten tuple of tuples, name entries - bayesian = [ - item - for sublist in bayesian_measures( - r.tp, - r.fp, - r.tn, - r.fn, - lambda_=0.5, - coverage=0.95, - ) - for item in sublist - ] - - # evaluate frequentist measures - frequentist = base_measures(r.tp, r.fp, r.tn, r.fn) - return pandas.Series(bayesian + list(frequentist), index=_entries) - - # Merges all dataframes together - sums = pandas.concat(data.values()).groupby("index").sum() - sums["threshold"] /= len(data) - - # create a new dataframe with these - measures = sums.apply(lambda r: _row_summary(r), axis=1) - - # merge sums and measures into a single dataframe - return pandas.concat([sums, measures.reindex(sums.index)], axis=1).copy() - - -def _evaluate_sample_worker( - sample: tuple[str, str], - name: str, - steps: float, - threshold: float | None, - output_folder: pathlib.Path | None, - overlayed_folder: pathlib.Path | None, -) -> tuple[str, pandas.DataFrame]: - """Run all of the evaluation steps on a single sample. +def tfpn_masks( + pred: numpy.typing.NDArray[numpy.float32], + gt: numpy.typing.NDArray[numpy.bool_], + threshold: float, +) -> tuple[ + numpy.typing.NDArray[numpy.bool_], + numpy.typing.NDArray[numpy.bool_], + numpy.typing.NDArray[numpy.bool_], + numpy.typing.NDArray[numpy.bool_], +]: + """Calculate true and false positives and negatives. + + All input arrays should have matching sizes. Parameters ---------- - sample - Sample to be processed, containing the stem of the filepath - relative to the database root and a path the hdf5 file in which - the image, target and mask are stored. - name - Local name of the dataset (e.g. ``train``, or ``test``) to be - used when saving measures files. - steps - Number of threshold steps to consider when evaluating thresholds. + pred + Pixel-wise predictions as output by your model. + gt + Ground-truth (annotations). threshold - If ``overlayed_folder``, then this should be threshold (floating - point) to apply to prediction maps to decide on positives and - negatives for overlaying analysis (graphical output). This number - should come from the training set or a separate validation set. - Using a test set value may bias your analysis. This number is also - used to print the a priori F1-score on the evaluated set. - output_folder - If not ``None``, then outputs a copy of the evaluation for this - sample in CSV format at this directory, but respecting the sample - ``stem``. - overlayed_folder - If not ``None``, then outputs a version of the input image with - predictions overlayed, in PNG format, but respecting the sample - ``stem``. + A particular threshold in which to calculate the performance + measures. Values at this threshold are counted as positives. Returns ------- - A tuple containing the unique sample stem and a Dataframe containing - the evaluation performance on this sample. + tp + Boolean array with true positives, considering all observations. + fp + Boolean array with false positives, considering all observations. + tn + Boolean array with true negatives, considering all observations. + fn + Boolean array with false negatives, considering all observations. """ - stem = sample[0] - prediction_data_path = sample[1] - with h5py.File(prediction_data_path, "r") as f: - prediction = torch.from_numpy(f.get("img")[:]) - target = torch.from_numpy(f.get("target")[:]) - mask = torch.from_numpy(f.get("mask")[:]) - retval = _sample_measures(prediction, target, mask, steps) - - if output_folder is not None: - fullpath = (output_folder / name / f"{stem}").with_suffix(".csv") - tqdm.write(f"Saving {fullpath}...") - fullpath.parent.mkdir(parents=True, exist_ok=True) - retval.to_csv(fullpath) - - # Disabled for now, would require to save the original sample with - # transforms applied in hdf5 - """if overlayed_folder is not None: - overlay_image = _sample_analysis( - image, prediction, target, mask, threshold=threshold, overlay=True - ) - fullpath = os.path.join(overlayed_folder, name, f"{stem}.png") - tqdm.write(f"Saving {fullpath}...") - os.makedirs(os.path.dirname(fullpath), exist_ok=True) - overlay_image.save(fullpath)""" - - return stem, retval + binary_pred = pred >= threshold + tp = numpy.logical_and(binary_pred, gt) + fp = numpy.logical_and(binary_pred, numpy.logical_not(gt)) + tn = numpy.logical_and(numpy.logical_not(binary_pred), numpy.logical_not(gt)) + fn = numpy.logical_and(numpy.logical_not(binary_pred), gt) + return tp, fp, tn, fn -def run( - name: str, - predictions: list[tuple[str, str]], - output_folder: pathlib.Path | None, - overlayed_folder: pathlib.Path | None = None, - threshold: float | None | None = None, - steps: float | None = 1000, - parallel: int | None = -1, -) -> float: - """Run inference and calculate measures. +def get_counts_for_threshold( + pred: numpy.typing.NDArray[numpy.float32], + gt: numpy.typing.NDArray[numpy.bool_], + mask: numpy.typing.NDArray[numpy.bool_], + threshold: float, +) -> tuple[int, int, int, int]: + """Calculate counts on one single sample, for a specific threshold. Parameters ---------- - name - The name of subset to load. - predictions - A list of predictions to consider for measurement. - output_folder - Folder where to store results. If not provided, then do not store any - analysis (useful for quickly calculating overlay thresholds). - overlayed_folder - If not ``None``, then it should be the name of a folder where to store - overlayed versions of the images and ground-truths. + pred + Array with pixel-wise predictions. + gt + Array with ground-truth (annotations). + mask + Array with region mask marking parts to ignore. threshold - If ``overlayed_folder``, then this should be threshold (floating point) - to apply to prediction maps to decide on positives and negatives for - overlaying analysis (graphical output). This number should come from - the training set or a separate validation set. Using a test set value - may bias your analysis. This number is also used to print the a priori - F1-score on the evaluated set. - steps - Number of threshold steps to consider when evaluating thresholds. - parallel - If set to a value different >= 0, uses multiprocessing for estimating - thresholds for each sample through a processing pool. A value of zero - will create as many processes in the pool as cores in the machine. A - negative value disables multiprocessing altogether. A value greater - than zero will spawn as many processes as requested. + A particular threshold in which to calculate the performance + measures. Returns ------- - Threshold to achieve the highest possible F1-score for this dataset. + The true positives, false positives, true negatives and false + negatives, in that order. """ - # Collect overall measures - data = {} + tp, fp, tn, fn = tfpn_masks(pred, gt, threshold) + tp = numpy.logical_and(tp, mask) + fp = numpy.logical_and(fp, mask) + tn = numpy.logical_and(tn, mask) + fn = numpy.logical_and(fn, mask) + return tp.sum(), fp.sum(), tn.sum(), fn.sum() + + +def load_count( + prediction_path: pathlib.Path, + predictions: typing.Sequence[str], + thresholds: numpy.typing.NDArray[numpy.float64], +) -> numpy.typing.NDArray[numpy.uint64]: + """Count true/false positive/negatives for the subset. + + This function will load predictions from their store location and will + **cumulatively** count the number of true positives, false positives, true + negatives and false negatives across the various ``thresholds``. This + alternative provides a memory-bound way to compute the performance of + splits with potentially very large images or including a large/very large + number of samples. Unfortunately, sklearn does not provide functions to + compute standard metrics from true/false positive/negative counts, which + implies one needs to make use of further functions defined in this module + to compute such metrics. Alternatively, you may look into + :py:func:`load_predictions`, if you want to use sklearn functions to + compute metrics. - # TODO: Fix multiprocessing - # if parallel < 0: # turns off multiprocessing + Parameters + ---------- + prediction_path + Base directory where the prediction files (HDF5) were stored. + predictions + A list of relative sample prediction paths to consider for measurement. + thresholds + A sequence of thresholds to be applied on ``predictions``, when + evaluating true/false positive/negative counts. + + Returns + ------- + A 2-D array with shape ``(len(thresholds), 4)``, where each row + contains to the counts of true positives, false positives, true + negatives and false negatives, for the related threshold, and for the + **whole dataset**. + """ + data = numpy.zeros((len(thresholds), 4), dtype=numpy.uint64) for sample in tqdm(predictions, desc="sample"): - k, v = _evaluate_sample_worker( - sample, - name, - steps, - threshold, - output_folder, - overlayed_folder, + with h5py.File(prediction_path / sample[1], "r") as f: + pred = numpy.array(f.get("img")) # float32 + gt = numpy.array(f.get("target")) # boolean + mask = numpy.array(f.get("mask")) # boolean + data += numpy.array( + [get_counts_for_threshold(pred, gt, mask, k) for k in thresholds], + dtype=numpy.uint64, ) - data[k] = v - - # TODO: Fix multiprocessing - """else: - parallel = parallel or multiprocessing.cpu_count() - with ( - multiprocessing.Pool(processes=parallel) as pool, - tqdm( - total=len(predictions), - desc="sample", - ) as pbar, - ): - for k, v in pool.imap_unordered( - _evaluate_sample_worker, - zip( - predictions, - itertools.repeat(name), - itertools.repeat(steps), - itertools.repeat(threshold), - itertools.repeat(output_folder), - itertools.repeat(overlayed_folder), - ), - ): - pbar.update() - data[k] = v""" - - # Merges all dataframes together - measures = _summarize(data) - - maxf1 = measures["mean_f1_score"].max() - maxf1_index = measures["mean_f1_score"].idxmax() - maxf1_threshold = measures["threshold"][maxf1_index] + return data - logger.info( - f"Maximum F1-score of {maxf1:.5f}, achieved at " - f"threshold {maxf1_threshold:.3f} (chosen *a posteriori*)" - ) - if threshold is not None: - # get the closest possible threshold we have - index = int(round(steps * threshold)) - f1_a_priori = measures["mean_f1_score"][index] - actual_threshold = measures["threshold"][index] +def load_predictions( + prediction_path: pathlib.Path, + predictions: typing.Sequence[str], +) -> tuple[numpy.typing.NDArray[numpy.float32], numpy.typing.NDArray[numpy.bool_]]: + """Load predictions and ground-truth from HDF5 files. - # mark threshold a priori chosen on this dataset - measures["threshold_a_priori"] = False - measures["threshold_a_priori", index] = True + Loading pixel-data as simple binary predictions with associated labels + allows using sklearn library to compute most metrics defined in this + module. Note however that computing metrics this way requires + pre-allocation of a potentially large vector, which depends on the number + of samples and the size of said samples. This may not work well for very + large datasets of large/very large images. Currently, the evaluation + system uses :py:func:`load_count` instead, which loads and pre-computes the + number of true/false positives/negatives using a list of candidate + thresholds. - logger.info( - f"F1-score of {f1_a_priori:.5f}, at threshold " - f"{actual_threshold:.3f} (chosen *a priori*)" - ) + Parameters + ---------- + prediction_path + Base directory where the prediction files (HDF5) were stored. + predictions + A list of relative sample prediction paths to consider for measurement. - if output_folder is not None: - logger.info(f"Output folder: {output_folder}") - output_folder.mkdir(parents=True, exist_ok=True) + Returns + ------- + Two 1-D arrays containing a linearized version of pixel predictions + (probability) and matching ground-truth. + """ + + # peak prediction size and number of samples + with h5py.File(prediction_path / predictions[0][1], "r") as f: + elements = numpy.array(f.get("img").shape).prod() + size = len(predictions) * elements + logger.info( + f"Data loading will require ({elements} x {len(predictions)} x 5 =) " + f"{size*5/(1024*1024):.0f} MB of RAM" + ) - measures_path = output_folder / f"{name}.csv" - logger.info(f"Saving measures over all input images at {measures_path}...") - measures.to_csv(measures_path) + # now load the data + pred_array = numpy.empty((size,), dtype=numpy.float32) + gt_array = numpy.empty((size,), dtype=numpy.bool_) + for i, sample in enumerate(tqdm(predictions, desc="sample")): + with h5py.File(prediction_path / sample[1], "r") as f: + mask = numpy.array(f.get("mask")) # boolean + pred = numpy.array(f.get("img")) # float32 + pred *= mask.astype(numpy.float32) + gt = numpy.array(f.get("target")) # boolean + gt &= mask + pred_array[i * elements : (i + 1) * elements] = pred.flatten() + gt_array[i * elements : (i + 1) * elements] = gt.flatten() + + return pred_array, gt_array + + +def compute_metric( + counts: numpy.typing.NDArray[numpy.uint64], + metric: typing.Callable[[int, int, int, int], float] + | typing.Callable[[int, int, int, int], tuple[float, ...]], +) -> numpy.typing.NDArray[numpy.float64]: + """Compute ``metric`` for every row of ``counts``. - return maxf1_threshold + Parameters + ---------- + counts + A 2-D array with shape ``(*, 4)``, where each row contains to the + counts of true positives, false positives, true negatives and false + negatives, that need to be evaluated. + metric + A callable that takes 4 integers representing true positives, false + positives, true negatives and false negatives, and outputs one or more + floating-point metrics. + + Returns + ------- + An 1-D array containing the provided metric computed alongside the + first dimension and as many columns as ```metric`` provides in each + call. + """ + return numpy.array([metric(*k) for k in counts], dtype=numpy.float64) # def _compare_annotators_worker( diff --git a/src/mednet/libs/segmentation/scripts/evaluate.py b/src/mednet/libs/segmentation/scripts/evaluate.py index 9be445953d558e76d16687a512459b537de60dc6..00b8e8f3715dea0e640a3e33f5c2f55286394fbc 100644 --- a/src/mednet/libs/segmentation/scripts/evaluate.py +++ b/src/mednet/libs/segmentation/scripts/evaluate.py @@ -10,10 +10,44 @@ import click from clapper.click import ResourceOption, verbosity_option from clapper.logging import setup from mednet.libs.common.scripts.click import ConfigCommand +from mednet.libs.segmentation.engine.evaluator import SUPPORTED_METRIC_TYPE logger = setup(__name__.split(".")[0], format="%(levelname)s: %(message)s") +def _validate_threshold(threshold: float | str, splits: list[str]): + """Validate the user threshold selection and returns parsed threshold. + + Parameters + ---------- + threshold + The threshold to validate. + splits + List of available splits. + + Returns + ------- + The validated threshold. + """ + if threshold is None: + return 0.5 + + try: + # we try to convert it to float first + threshold = float(threshold) + if threshold < 0.0 or threshold > 1.0: + raise ValueError("Float thresholds must be within range [0.0, 1.0]") + except ValueError: + if threshold not in splits: + raise ValueError( + f"Text thresholds should match dataset names, " + f"but {threshold} is not available among the datasets provided (" + f"({', '.join(splits)})" + ) + + return threshold + + @click.command( entry_point_group="mednet.libs.segmentation.config", cls=ConfigCommand, @@ -24,32 +58,19 @@ logger = setup(__name__.split(".")[0], format="%(levelname)s: %(message)s") .. code:: sh - $ mednet segmentation evaluate -vv drive --predictions-folder=path/to/predictions --output-folder=path/to/results - -\b - 2. To run evaluation on a folder with your own images and annotations, you - must first specify resizing, cropping, etc, so that the image can be - correctly input to the model. Failing to do so will likely result in - poor performance. To figure out such specifications, you must consult - the dataset configuration used for **training** the provided model. - Once you figured this out, do the following: - - .. code:: sh - - $ mednet segmentation config copy csv-dataset-example mydataset.py - # modify "mydataset.py" to your liking - $ mednet segmentation evaluate -vv mydataset.py --predictions-folder=path/to/predictions --output-folder=path/to/results + $ mednet segmentation evaluate -vv --predictions=path/to/predictions.json --output-folder=path/to/results """, ) @click.option( "--predictions", "-p", - help="Filename in which predictions are currently stored", + help="""Path to the JSON file describing available predictions. The actual + predictions are supposed to lie on the same folder.""", required=True, type=click.Path( file_okay=True, dir_okay=False, - writable=True, + writable=False, path_type=pathlib.Path, ), cls=ResourceOption, @@ -68,69 +89,53 @@ logger = setup(__name__.split(".")[0], format="%(levelname)s: %(message)s") default="results", cls=ResourceOption, ) -@click.option( - "--second-annotator", - "-S", - help="A dataset or dictionary, like in --dataset, with the same " - "sample keys, but with annotations from a different annotator that is " - "going to be compared to the one in --dataset. The same rules regarding " - "dataset naming conventions apply", - required=False, - default=None, - cls=ResourceOption, - show_default=True, -) -@click.option( - "--overlayed", - "-O", - help="Creates overlayed representations of the output probability maps, " - "similar to --overlayed in prediction-mode, except it includes " - "distinctive colours for true and false positives and false negatives. " - "If not set, or empty then do **NOT** output overlayed images. " - "Otherwise, the parameter represents the name of a folder where to " - "store those", - show_default=True, - default=None, - required=False, - cls=ResourceOption, -) +# @click.option( +# "--second-annotator", +# "-a", +# help="""A datamodule containing annotations from another annotator, that +# will be compared to the ground-truth (reference annotator) in each +# sample.""", +# required=False, +# default=None, +# cls=ResourceOption, +# show_default=True, +# ) @click.option( "--threshold", "-t", - help="This number is used to define positives and negatives from " - "probability maps, and report F1-scores (a priori). It " - "should either come from the training set or a separate validation set " - "to avoid biasing the analysis. Optionally, if you provide a multi-set " - "dataset as input, this may also be the name of an existing set from " - "which the threshold will be estimated (highest F1-score) and then " - "applied to the subsequent sets. This number is also used to print " - "the test set F1-score a priori performance", - default=None, - show_default=False, + help="""This number is used to define positives and negatives from + probability maps, and used to report metrics based on a threshold chosen *a + priori*. It can be set to a floating-point value, or to the name of dataset + split in ``--predictions``. + """, + default="0.5", + show_default=True, required=False, cls=ResourceOption, ) @click.option( - "--steps", - "-S", - help="This number is used to define the number of threshold steps to " - "consider when evaluating the highest possible F1-score on test data.", - default=1000, + "--metric", + "-m", + help="""If threshold is set to the name of a split in ``--predictions``, + then this parameter defines the metric function to be used to evaluate the + threshold at which the metric reaches its maximum value. All other splits + are evaluated with respect to this threshold.""", + default="f1", + type=click.Choice(typing.get_args(SUPPORTED_METRIC_TYPE), case_sensitive=True), show_default=True, required=True, cls=ResourceOption, ) @click.option( - "--parallel", - "-P", - help="""Use multiprocessing for data processing: if set to -1 (default), - disables multiprocessing. Set to 0 to enable as many data loading - instances as processing cores as available in the system. Set to >= 1 to - enable that many multiprocessing instances for data processing.""", - type=click.IntRange(min=-1), + "--steps", + "-s", + help="""Number of steps for evaluating metrics on various splits. This + value is used when drawing precision-recall plots, or when deciding the + highest metric value on splits.""", + default=100, + type=click.IntRange(10), show_default=True, required=True, - default=-1, cls=ResourceOption, ) @verbosity_option(logger=logger, cls=ResourceOption, expose_value=False) @@ -138,53 +143,32 @@ def evaluate( predictions: pathlib.Path, output_folder: pathlib.Path, threshold: str | float, + metric: str, # second_annotator, - overlayed, - steps, - parallel, + steps: int, **_, # ignored ): # numpydoc ignore=PR01 """Evaluate predictions (from a model) on a segmentation task.""" - import pandas + import credible.curves + import credible.plot + import matplotlib.backends.backend_pdf + import numpy + import tabulate + from mednet.libs.classification.engine.evaluator import NumpyJSONEncoder from mednet.libs.common.scripts.utils import ( execution_metadata, save_json_with_backup, ) - from mednet.libs.segmentation.engine.evaluator import run - from tqdm import tqdm - - def _validate_threshold(threshold: float | str, splits: list[str]): - """Validate the user threshold selection and returns parsed threshold. - - Parameters - ---------- - threshold - The threshold to validate. - splits - List of available splits. - - Returns - ------- - The validated threshold. - """ - if threshold is None: - return 0.5 - - try: - # we try to convert it to float first - threshold = float(threshold) - if threshold < 0.0 or threshold > 1.0: - raise ValueError("Float thresholds must be within range [0.0, 1.0]") - except ValueError: - if threshold not in splits: - raise ValueError( - f"Text thresholds should match dataset names, " - f"but {threshold} is not available among the datasets provided (" - f"({', '.join(splits)})" - ) - - return threshold + from mednet.libs.segmentation.engine.evaluator import ( + all_metrics, + compute_metric, + load_count, + name2metric, + precision, + recall, + specificity, + ) evaluation_filename = "evaluation.json" evaluation_file = pathlib.Path(output_folder) / evaluation_filename @@ -194,176 +178,141 @@ def evaluate( # register metadata json_data: dict[str, typing.Any] = execution_metadata() + json_data.update( + dict( + predictions=str(predictions), + output_folder=str(output_folder), + threshold=threshold, + metric=metric, + steps=steps, + ), + ) json_data = {k.replace("_", "-"): v for k, v in json_data.items()} save_json_with_backup(evaluation_file.with_suffix(".meta.json"), json_data) threshold = _validate_threshold(threshold, predict_data) + threshold_list = numpy.arange( + 0.0, (1.0 + 1 / steps), 1 / steps, dtype=numpy.float64 + ) - # Compute threshold on specified split - if isinstance(threshold, str): - logger.info(f"Evaluating threshold on '{threshold}' split") - threshold = run(threshold, predict_data[threshold], output_folder, steps=steps) - logger.info(f"Set --threshold={threshold:.5f}") - - for split_name, predictions in predict_data.items(): - logger.info(f"Analyzing split `{split_name}`...") - run( - split_name, - predictions, - output_folder, - overlayed, - threshold, - steps=steps, - parallel=parallel, + # Compute counts for various splits + eval_json_data: dict[str, dict[str, typing.Any]] = {} + for split_name, samples in predict_data.items(): + logger.info( + f"Counting true/false positive/negatives at split `{split_name}`..." ) - - def _load( - data: dict[str, pathlib.Path], threshold: float | str | None - ) -> dict[str, dict[str, typing.Any]]: # consider using a typing.TypedDict - """Plot comparison chart of all evaluated models. - - Parameters - ---------- - data - A dict in which keys are the names of splits and values are - paths to ``measures.csv`` style files. - threshold - A value indicating which threshold to choose for selecting a score. - If set to ``None``, then use the maximum F1-score on that measures file. - If set to a floating-point value, then use the score that is - obtained on that particular threshold. If set to a string, it should - match one of the keys in ``data``. It then first calculate the - threshold reaching the maximum score on that particular dataset and - then applies that threshold to all other sets. Obs: If the task - is segmentation, the score used is the F1-Score. - - Returns - ------- - A dict in which keys are the names of the systems and the values are - dictionaries that contain two keys: - - * ``df``: A :py:class:`pandas.DataFrame` with the measures data loaded - to - * ``threshold``: A threshold to be used for summarization, depending on - the ``threshold`` parameter set on the input - """ - - col_name = "mean_f1_score" - score_name = "F1-score" - - if isinstance(threshold, str): - logger.info( - f"Calculating threshold from maximum {score_name} at " - f"'{threshold}' dataset..." + eval_json_data.setdefault(split_name, {})["counts"] = { + k: v + for k, v in zip( + threshold_list, load_count(predictions.parent, samples, threshold_list) ) - measures_path = data[threshold] - df = pandas.read_csv(measures_path) - use_threshold = df.threshold[df[col_name].idxmax()] - logger.info(f"Dataset '*': threshold = {use_threshold:.3f}'") - - elif isinstance(threshold, float): - use_threshold = threshold - logger.info(f"Dataset '*': threshold = {use_threshold:.3f}'") + } + eval_json_data[split_name]["threshold_a_posteriori"] = True - # loads all data - retval = {} - for name, measures_path in tqdm(data.items(), desc="sample"): - logger.info(f"Loading measures from {measures_path}...") - df = pandas.read_csv(measures_path) - - if threshold is None: - if "threshold_a_priori" in df: - use_threshold = df.threshold[df.threshold_a_priori.idxmax()] - logger.info( - f"Dataset '{name}': threshold (a priori) = " - f"{use_threshold:.3f}'" - ) - else: - use_threshold = df.threshold[df[col_name].idxmax()] - logger.info( - f"Dataset '{name}': threshold (a posteriori) = " - f"{use_threshold:.3f}'" - ) + if isinstance(threshold, str): + # Compute threshold on specified split, if required + logger.info(f"Evaluating threshold on `{threshold}` split using " f"`{metric}`") + metric_list = compute_metric( + eval_json_data[threshold]["counts"].values(), + name2metric(typing.cast(SUPPORTED_METRIC_TYPE, metric)), + ) + threshold_index = metric_list.argmax() + logger.info(f"Set --threshold={threshold_list[threshold_index]:.4f}") - retval[name] = dict(df=df, threshold=use_threshold) + # Reset list of how thresholds are calculated on the recorded split + for split_name in predict_data.keys(): + if split_name == threshold: + continue + eval_json_data[split_name]["threshold_a_posteriori"] = False - return retval + else: + # must figure out the closest threshold from the list we are using + threshold_index = (numpy.abs(threshold_list - threshold)).argmin() + logger.info(f"Set --threshold={threshold_list[threshold_index]:.4f}") - # Hadcoded for testing - plot_limits = (0.0, 1.0, 0.0, 1.0) + logger.info("Tabulating performance summary...") table_format = "rst" - output_figure = output_folder / "comparison.pdf" - output_table = output_folder / "comparison.rst" - - csv_metrics = { - "train": output_folder / "train.csv", - "test": output_folder / "test.csv", - } - - # load all data measures - data = _load(csv_metrics, threshold=threshold) - - from ..utils.plot import precision_recall_f1iso - from ..utils.table import performance_table - - if output_figure is not None: - output_figure = output_figure.resolve() - logger.info(f"Creating and saving plot at {output_figure}...") - output_figure.parent.mkdir(parents=True, exist_ok=True) - fig = precision_recall_f1iso(data, limits=plot_limits) - fig.savefig(output_figure) - fig.clear() + output_table = output_folder / "evaluation.rst" + metrics_available = list(typing.get_args(SUPPORTED_METRIC_TYPE)) + table_headers = ["Dataset", "threshold"] + metrics_available + ["auroc", "avgprec"] + + table_data = [] + for split_name in predict_data.keys(): + counts = list(eval_json_data[split_name]["counts"].values()) + base_metrics = all_metrics(*counts[threshold_index]) + table_data.append([split_name, threshold_list[threshold_index]] + base_metrics) + eval_json_data[split_name].update( + {k: v for k, v in zip(metrics_available, base_metrics)} + ) + fpr_curve = 1.0 - numpy.array([specificity(*k) for k in counts]) + recall_curve = tpr_curve = numpy.array([recall(*k) for k in counts]) + precision_curve = numpy.array([precision(*k) for k in counts]) + table_data[-1] += [ + credible.curves.area_under_the_curve((fpr_curve, tpr_curve)), # auc-roc + credible.curves.average_metric( + (precision_curve, recall_curve) + ), # average precision + ] + eval_json_data[split_name]["auc_score"] = table_data[-1][-2] + eval_json_data[split_name]["average_precision_score"] = table_data[-1][-1] + eval_json_data[split_name]["curves"] = dict( + roc=dict(fpr=fpr_curve, tpr=tpr_curve, thresholds=threshold_list), + precision_recall=dict( + precision=precision_curve, + recall=recall_curve, + thresholds=threshold_list, + ), + ) - logger.info("Tabulating performance summary...") - table = performance_table(data, table_format) + # records full result analysis to a JSON file + evaluation_file = output_folder / "evaluation.json" + logger.info(f"Saving evaluation results at `{evaluation_file}`...") + with evaluation_file.open("w") as f: + json.dump(eval_json_data, f, indent=2, cls=NumpyJSONEncoder) + + table = tabulate.tabulate( + table_data, + table_headers, + tablefmt=table_format, + floatfmt=".3f", + stralign="right", + ) click.echo(table) - if output_table is not None: - output_table = output_table.resolve() - logger.info(f"Saving table at {output_table}...") - output_table.parent.mkdir(parents=True, exist_ok=True) - with output_table.open("w") as f: - f.write(table) - - # threshold = _validate_threshold(threshold, dataset) - - # #if not isinstance(dataset, dict): - # # dataset = {"test": dataset} - - # if second_annotator is None: - # second_annotator = {} - # elif not isinstance(second_annotator, dict): - # second_annotator = {"test": second_annotator} - # # else, second_annotator must be a dict - - # # clean-up the overlayed path - # if overlayed is not None: - # overlayed = overlayed.strip() - - # # now run with the - # for k, v in dataset.items(): - # if k.startswith("_"): - # logger.info(f"Skipping dataset '{k}' (not to be evaluated)") - # continue - # logger.info(f"Analyzing '{k}' set...") - # run( - # v, - # k, - # predictions_folder, - # output_folder, - # overlayed, - # threshold, - # steps=steps, - # parallel=parallel, - # ) - # second = second_annotator.get(k) - # if second is not None: - # if not second.all_keys_match(v): - # logger.warning( - # f"Key mismatch between `dataset[{k}]` and " - # f"`second_annotator[{k}]` - skipping " - # f"second-annotator comparisons for {k} subset" - # ) - # else: - # compare_annotators( - # v, second, k, output_folder, overlayed, parallel=parallel - # ) + logger.info(f"Saving table at {output_table}...") + output_table.parent.mkdir(parents=True, exist_ok=True) + with output_table.open("w") as f: + f.write(table) + + logger.info("Plotting performance curves...") + output_figure = output_folder / "evaluation.pdf" + logger.info(f"Saving figures at {output_figure}...") + with matplotlib.backends.backend_pdf.PdfPages(output_figure) as pdf: + with credible.plot.tight_layout( + ("False Positive Rate", "True Positive Rate"), "ROC" + ) as ( + fig, + ax, + ): + for split_name, data in eval_json_data.items(): + ax.plot( + data["curves"]["roc"]["fpr"], + data["curves"]["roc"]["tpr"], + label=f"{split_name} (AUC: {data['auc_score']:.2f})", + ) + ax.legend(loc="best", fancybox=True, framealpha=0.7) + pdf.savefig(fig) + + with credible.plot.tight_layout_f1iso( + ("Recall", "Precision"), "Precison-Recall" + ) as ( + fig, + ax, + ): + for split_name, data in eval_json_data.items(): + ax.plot( + data["curves"]["precision_recall"]["precision"], + data["curves"]["precision_recall"]["recall"], + label=f"{split_name} (AP: {data['average_precision_score']:.2f})", + ) + ax.legend(loc="best", fancybox=True, framealpha=0.7) + pdf.savefig(fig) diff --git a/src/mednet/libs/segmentation/scripts/experiment.py b/src/mednet/libs/segmentation/scripts/experiment.py index a912559bc2e5da79f435f863ea1a4bc213ebde51..f3a6c006dc8927278ce0f4013cbf7222c539a308 100644 --- a/src/mednet/libs/segmentation/scripts/experiment.py +++ b/src/mednet/libs/segmentation/scripts/experiment.py @@ -151,6 +151,8 @@ def experiment( predictions=predictions_file, output_folder=evaluation_output, threshold=evaluation_threshold, + # metric="f1", + # steps=100, ) evaluation_stop_timestamp = datetime.now() diff --git a/src/mednet/libs/segmentation/scripts/predict.py b/src/mednet/libs/segmentation/scripts/predict.py index 04b3d57d6c216d41361d256e91b93130399158e9..9c5c3d44e1b35c569c5f512df11e5619139f7243 100644 --- a/src/mednet/libs/segmentation/scripts/predict.py +++ b/src/mednet/libs/segmentation/scripts/predict.py @@ -94,6 +94,8 @@ def predict( ) -> None: # numpydoc ignore=PR01 """Run inference (generates scores) on all input images, using a pre-trained model.""" + import typing + from mednet.libs.classification.engine.predictor import run from mednet.libs.common.engine.device import DeviceManager from mednet.libs.common.scripts.predict import ( @@ -113,11 +115,19 @@ def predict( # Save image data (sample, target, mask) into an hdf5 file json_predictions = {} + assert isinstance( + predictions, typing.Mapping + ), "predictions must be a dictionary or this program will not work!" for split_name, split in predictions.items(): pred_paths = [] for sample in split: - hdf5_path = (output_folder / f"{sample[0]}").with_suffix(".hdf5") - _save_hdf5(sample[3], sample[1], sample[2], hdf5_path) + hdf5_path = pathlib.Path(f"{sample[0]}").with_suffix(".hdf5") + _save_hdf5( + sample[3].numpy(), # float32 + sample[1].numpy() > 0.5, # boolean + sample[2].numpy() > 0.5, # boolean + output_folder / hdf5_path, + ) pred_paths.append([str(sample[0]), str(hdf5_path)]) json_predictions[split_name] = pred_paths diff --git a/src/mednet/libs/segmentation/utils/__init__.py b/src/mednet/libs/segmentation/utils/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/mednet/libs/segmentation/utils/measure.py b/src/mednet/libs/segmentation/utils/measure.py deleted file mode 100644 index 8cbd22fbd7135c638b8f1c614b14104e62528f34..0000000000000000000000000000000000000000 --- a/src/mednet/libs/segmentation/utils/measure.py +++ /dev/null @@ -1,428 +0,0 @@ -# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch> -# -# SPDX-License-Identifier: GPL-3.0-or-later - - -import numpy -import scipy.special -import torch - - -def tricky_division(n: float, d: float): - """Divide n by d, or 0.0 in case of a division by zero. - - Parameters - ---------- - n - The number to divide. - d - The divisor. - - Returns - ------- - Result of the division. - """ - - return n / (d + (d == 0)) - - -def base_measures( - tp: int, fp: int, tn: int, fn: int -) -> tuple[float, float, float, float, float, float]: - """Calculate frequentist measures from true/false positive and negative - counts. - - This function can return (frequentist versions of) standard machine - learning measures from true and false positive counts of positives and - negatives. For a thorough look into these and alternate names for the - returned values, please check Wikipedia's entry on `Precision and Recall - <https://en.wikipedia.org/wiki/Precision_and_recall>`_. - - Parameters - ---------- - tp - True positive count, AKA "hit". - fp - False positive count, AKA "false alarm", or "Type I error". - tn - True negative count, AKA "correct rejection". - fn - False Negative count, AKA "miss", or "Type II error". - - Returns - ------- - Tuple containing the following metrics: - * precision - P, AKA positive predictive value (PPV). It corresponds arithmetically - to ``tp/(tp+fp)``. In the case ``tp+fp == 0``, this function returns - zero for precision. - - * recall - R, AKA sensitivity, hit rate, or true positive rate (TPR). It - corresponds arithmetically to ``tp/(tp+fn)``. In the special case - where ``tp+fn == 0``, this function returns zero for recall. - - * specificity - S, AKA selectivity or true negative rate (TNR). It - corresponds arithmetically to ``tn/(tn+fp)``. In the special case - where ``tn+fp == 0``, this function returns zero for specificity. - - * accuracy - A, see `Accuracy - <https://en.wikipedia.org/wiki/Evaluation_of_binary_classifiers>`_. is - the proportion of correct predictions (both true positives and true - negatives) among the total number of pixels examined. It corresponds - arithmetically to ``(tp+tn)/(tp+tn+fp+fn)``. This measure includes - both true-negatives and positives in the numerator, what makes it - sensitive to data or regions without annotations. - - * jaccard - J, see `Jaccard Index or Similarity - <https://en.wikipedia.org/wiki/Jaccard_index>`_. It corresponds - arithmetically to ``tp/(tp+fp+fn)``. In the special case where - ``tn+fp+fn == 0``, this function returns zero for the Jaccard index. - The Jaccard index depends on a TP-only numerator, similarly to the F1 - score. For regions where there are no annotations, the Jaccard index - will always be zero, irrespective of the model output. Accuracy may be - a better proxy if one needs to consider the true abscence of - annotations in a region as part of the measure. - - * f1_score - F1, see `F1-score <https://en.wikipedia.org/wiki/F1_score>`_. It - corresponds arithmetically to ``2*P*R/(P+R)`` or ``2*tp/(2*tp+fp+fn)``. - In the special case where ``P+R == (2*tp+fp+fn) == 0``, this function - returns zero for the Jaccard index. The F1 or Dice score depends on a - TP-only numerator, similarly to the Jaccard index. For regions where - there are no annotations, the F1-score will always be zero, - irrespective of the model output. Accuracy may be a better proxy if - one needs to consider the true abscence of annotations in a region as - part of the measure. - """ - - return ( - tricky_division(tp, tp + fp), # precision - tricky_division(tp, tp + fn), # recall - tricky_division(tn, fp + tn), # specificity - tricky_division(tp + tn, tp + fp + fn + tn), # accuracy - tricky_division(tp, tp + fp + fn), # jaccard index - tricky_division(2 * tp, (2 * tp) + fp + fn), # f1-score - ) - - -def beta_credible_region( - k: int, i: int, lambda_: float | None, coverage: float | None -) -> tuple[float, float, float, float]: - r"""Return the mode, upper and lower bounds of the equal-tailed credible - region of a probability estimate following Bernoulli trials. - - This implemetnation is based on [GOUTTE-2005]_. It assumes :math:`k` - successes and :math:`l` failures (:math:`n = k+l` total trials) are issued - from a series of Bernoulli trials (likelihood is binomial). The posterior - is derivated using the Bayes Theorem with a beta prior. As there is no - reason to favour high vs. low precision, we use a symmetric Beta prior - (:math:`\\alpha=\\beta`): - - .. math:: - - P(p|k,n) &= \\frac{P(k,n|p)P(p)}{P(k,n)} \\\\ - P(p|k,n) &= \\frac{\\frac{n!}{k!(n-k)!}p^{k}(1-p)^{n-k}P(p)}{P(k)} \\\\ - P(p|k,n) &= \\frac{1}{B(k+\\alpha, n-k+\beta)}p^{k+\\alpha-1}(1-p)^{n-k+\\beta-1} \\\\ - P(p|k,n) &= \\frac{1}{B(k+\\alpha, n-k+\\alpha)}p^{k+\\alpha-1}(1-p)^{n-k+\\alpha-1} - - The mode for this posterior (also the maximum a posteriori) is: - - .. math:: - - \\text{mode}(p) = \\frac{k+\\lambda-1}{n+2\\lambda-2} - - Concretely, the prior may be flat (all rates are equally likely, - :math:`\\lambda=1`) or we may use Jeoffrey's prior - (:math:`\\lambda=0.5`), that is invariant through re-parameterisation. - Jeffrey's prior indicate that rates close to zero or one are more likely. - - The mode above works if :math:`k+{\\alpha},n-k+{\\alpha} > 1`, which is - usually the case for a resonably well tunned system, with more than a few - samples for analysis. In the limit of the system performance, :math:`k` - may be 0, which will make the mode become zero. - - For our purposes, it may be more suitable to represent :math:`n = k + l`, - with :math:`k`, the number of successes and :math:`l`, the number of - failures in the binomial experiment, and find this more suitable - representation: - - .. math:: - - P(p|k,l) &= \\frac{1}{B(k+\\alpha, l+\\alpha)}p^{k+\\alpha-1}(1-p)^{l+\\alpha-1} \\\\ - \\text{mode}(p) &= \\frac{k+\\lambda-1}{k+l+2\\lambda-2} - - This can be mapped to most rates calculated in the context of binary - classification this way: - - * Precision or Positive-Predictive Value (PPV): p = TP/(TP+FP), so k=TP, l=FP - * Recall, Sensitivity, or True Positive Rate: r = TP/(TP+FN), so k=TP, l=FN - * Specificity or True Negative Rage: s = TN/(TN+FP), so k=TN, l=FP - * F1-score: f1 = 2TP/(2TP+FP+FN), so k=2TP, l=FP+FN - * Accuracy: acc = TP+TN/(TP+TN+FP+FN), so k=TP+TN, l=FP+FN - * Jaccard: j = TP/(TP+FP+FN), so k=TP, l=FP+FN - - Contrary to frequentist approaches, in which one can only - say that if the test were repeated an infinite number of times, - and one constructed a confidence interval each time, then X% - of the confidence intervals would contain the true rate, here - we can say that given our observed data, there is a X% probability - that the true value of :math:`k/n` falls within the provided - interval. - - .. note:: - - For a disambiguation with Confidence Interval, read - https://en.wikipedia.org/wiki/Credible_interval. - - Parameters - ---------- - k - Number of successes observed on the experiment. - i - Number of failures observed on the experiment. - lambda_ - The parameterisation of the Beta prior to consider. Use - :math:`\\lambda=1` for a flat prior. Use :math:`\\lambda=0.5` for - Jeffrey's prior (the default). - - coverage - A floating-point number between 0 and 1.0 indicating the - coverage you're expecting. A value of 0.95 will ensure 95% - of the area under the probability density of the posterior - is covered by the returned equal-tailed interval. - - Returns - ------- - Tuple containing the following metrics: - * mean - The mean of the posterior distribution. - * mode - The mode of the posterior distribution. - * lower, upper - The lower and upper bounds of the credible region. - """ - - # we return the equally-tailed range - right = (1.0 - coverage) / 2 # half-width in each side - lower = scipy.special.betaincinv(k + lambda_, i + lambda_, right) - upper = scipy.special.betaincinv(k + lambda_, i + lambda_, 1.0 - right) - - # evaluate mean and mode (https://en.wikipedia.org/wiki/Beta_distribution) - alpha = k + lambda_ - beta = i + lambda_ - - mean = alpha / (alpha + beta) - - # the mode of a beta distribution is a bit tricky - if alpha > 1 and beta > 1: - mode = (alpha - 1) / (alpha + beta - 2) - elif alpha == 1 and beta == 1: - # In the case of precision, if the threshold is close to 1.0, both TP - # and FP can be zero, which may cause this condition to be reached, if - # the prior is exactly 1 (flat prior). This is a weird situation, - # because effectively we are trying to compute the posterior when the - # total number of experiments is zero. So, only the prior counts - but - # the prior is flat, so we should just pick a value. We choose the - # middle of the range. - mode = 0.0 # any value would do, we just pick this one - elif alpha <= 1 and beta > 1: - mode = 0.0 - elif alpha > 1 and beta <= 1: - mode = 1.0 - else: # elif alpha < 1 and beta < 1: - # in the case of precision, if the threshold is close to 1.0, both TP - # and FP can be zero, which may cause this condition to be reached, if - # the prior is smaller than 1. This is a weird situation, because - # effectively we are trying to compute the posterior when the total - # number of experiments is zero. So, only the prior counts - but the - # prior is bimodal, so we should just pick a value. We choose the - # left of the range. - mode = 0.0 # could also be 1.0 as the prior is bimodal - - return mean, mode, lower, upper - - -def bayesian_measures( - tp: int, fp: int, tn: int, fn: int, lambda_: float, coverage: float -): - r"""Calculate mean and mode from true/false positive and negative counts - with credible regions. - - This function can return bayesian estimates of standard machine learning - measures from true and false positive counts of positives and negatives. - For a thorough look into these and alternate names for the returned values, - please check Wikipedia's entry on `Precision and Recall - <https://en.wikipedia.org/wiki/Precision_and_recall>`_. See - :py:func:`beta_credible_region` for details on the calculation of returned - values. - - Parameters - ---------- - tp - True positive count, AKA "hit". - fp - False positive count, AKA "false alarm", or "Type I error". - tn - True negative count, AKA "correct rejection". - fn - False Negative count, AKA "miss", or "Type II error". - lambda_ - The parameterisation of the Beta prior to consider. Use - :math:`\\lambda=1` for a flat prior. Use :math:`\\lambda=0.5` for - Jeffrey's prior. - coverage - A floating-point number between 0 and 1.0 indicating the - coverage you're expecting. A value of 0.95 will ensure 95% - of the area under the probability density of the posterior - is covered by the returned equal-tailed interval. - - Returns - ------- - precision : (float, float, float, float) - P, AKA positive predictive value (PPV), mean, mode and credible - intervals (95% CI). It corresponds arithmetically - to ``tp/(tp+fp)``. - - recall : (float, float, float, float) - R, AKA sensitivity, hit rate, or true positive rate (TPR), mean, mode - and credible intervals (95% CI). It corresponds arithmetically to - ``tp/(tp+fn)``. - - specificity : (float, float, float, float) - S, AKA selectivity or true negative rate (TNR), mean, mode and credible - intervals (95% CI). It corresponds arithmetically to ``tn/(tn+fp)``. - - accuracy : (float, float, float, float) - A, mean, mode and credible intervals (95% CI). See `Accuracy - <https://en.wikipedia.org/wiki/Evaluation_of_binary_classifiers>`_. is - the proportion of correct predictions (both true positives and true - negatives) among the total number of pixels examined. It corresponds - arithmetically to ``(tp+tn)/(tp+tn+fp+fn)``. This measure includes - both true-negatives and positives in the numerator, what makes it - sensitive to data or regions without annotations. - - jaccard : (float, float, float, float) - J, mean, mode and credible intervals (95% CI). See `Jaccard Index or - Similarity <https://en.wikipedia.org/wiki/Jaccard_index>`_. It - corresponds arithmetically to ``tp/(tp+fp+fn)``. The Jaccard index - depends on a TP-only numerator, similarly to the F1 score. For regions - where there are no annotations, the Jaccard index will always be zero, - irrespective of the model output. Accuracy may be a better proxy if - one needs to consider the true abscence of annotations in a region as - part of the measure. - - f1_score : (float, float, float, float) - F1, mean, mode and credible intervals (95% CI). See `F1-score - <https://en.wikipedia.org/wiki/F1_score>`_. It corresponds - arithmetically to ``2*P*R/(P+R)`` or ``2*tp/(2*tp+fp+fn)``. The F1 or - Dice score depends on a TP-only numerator, similarly to the Jaccard - index. For regions where there are no annotations, the F1-score will - always be zero, irrespective of the model output. Accuracy may be a - better proxy if one needs to consider the true abscence of annotations - in a region as part of the measure. - """ - - return ( - beta_credible_region(tp, fp, lambda_, coverage), # precision - beta_credible_region(tp, fn, lambda_, coverage), # recall - beta_credible_region(tn, fp, lambda_, coverage), # specificity - beta_credible_region(tp + tn, fp + fn, lambda_, coverage), # accuracy - beta_credible_region(tp, fp + fn, lambda_, coverage), # jaccard index - beta_credible_region(2 * tp, fp + fn, lambda_, coverage), # f1-score - ) - - -def auc(x: numpy.ndarray, y: numpy.ndarray): - """Calculate the area under the precision-recall curve (AUC). - - This function requires a minimum of 2 points and will use the trapezoidal - method to calculate the area under a curve bound between ``[0.0, 1.0]``. - It interpolates missing points if required. The input ``x`` should be - continuously increasing or decreasing. - - Parameters - ---------- - x - A 1D numpy array containing continuously increasing or decreasing - values for the X coordinate. - y - A 1D numpy array containing the Y coordinates of the X values provided - in ``x``. - - Returns - ------- - The AUC. - """ - - x = numpy.array(x) - y = numpy.array(y) - - assert len(x) == len(y), "x and y sequences must have the same length" - - dx = numpy.diff(x) - if numpy.any(dx < 0): - if numpy.all(dx <= 0): - # invert direction - x = x[::-1] - y = y[::-1] - else: - raise ValueError("x is neither increasing nor decreasing " f": {x}.") - - y_interp = numpy.interp( - numpy.arange(0, 1, 0.001), - numpy.array(x), - numpy.array(y), - left=1.0, - right=0.0, - ) - return y_interp.sum() * 0.001 - - -def get_intersection( - pred_box: torch.Tensor, gt_box: torch.Tensor, multiplier: float -) -> float: - """Calculate intersection of boxes. - - Parameters - ---------- - pred_box - A 1D numpy array containing predicted box coords. - gt_box - A 1D numpy array containing groud truth box coords. - multiplier - A number to increase the predicted bounding box by. - - Returns - ------- - Intersection of boxes. - """ - x1t, y1t, x2t, y2t = gt_box.numpy() - x1, y1, x2, y2 = pred_box.numpy() - - m = numpy.sqrt(multiplier) - d_x = ((m * (x2 - x1)) - (x2 - x1)) / 2.0 - d_y = ((m * (y2 - y1)) - (y2 - y1)) / 2.0 - x1 = max(x1 - d_x, 0) - x2 = x2 + d_x - y1 = max(y1 - d_y, 0) - y2 = y2 + d_y - - gt_tensor = gt_box.detach().clone().unsqueeze(0) - pred_tensor = torch.tensor([x1, y1, x2, y2]).unsqueeze(0) - - lt = torch.max(pred_tensor[:, None, :2], gt_tensor[:, :2]) # [N,M,2] - rb = torch.min(pred_tensor[:, None, 2:], gt_tensor[:, 2:]) # [N,M,2] - - wh = (rb - lt).clamp(min=0) # [N,M,2] - inter = wh[:, :, 0] * wh[:, :, 1] # [N,M] - - gt_area = (x2t - x1t) * (y2t - y1t) - - if gt_area > 0: - return inter.item() / gt_area - - return 0 diff --git a/src/mednet/libs/segmentation/utils/plot.py b/src/mednet/libs/segmentation/utils/plot.py deleted file mode 100644 index 1cd19b320b7ce740fef05500d17cbe29f0ee7033..0000000000000000000000000000000000000000 --- a/src/mednet/libs/segmentation/utils/plot.py +++ /dev/null @@ -1,274 +0,0 @@ -# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch> -# -# SPDX-License-Identifier: GPL-3.0-or-later - -import contextlib -import logging -import typing -from itertools import cycle - -import matplotlib -import matplotlib.pyplot as plt -import numpy -import pandas - -matplotlib.use("agg") -logger = logging.getLogger(__name__) - - -@contextlib.contextmanager -def _precision_recall_canvas( - title=str | None, - limits: tuple[float, float, float, float] | None = (0.0, 1.0, 0.0, 1.0), -): - """Generate a canvas to draw precision-recall curves. - - Works like a context manager, yielding a figure and an axes set in which - the precision-recall curves should be added to. The figure already - contains F1-ISO lines and is preset to a 0-1 square region. Once the - context is finished, ``fig.tight_layout()`` is called. - - Parameters - ---------- - title - Optional title to add to this plot. - limits - If set, a 4-tuple containing the bounds of the plot for the x and y - axis respectively (format: ``[x_low, x_high, y_low, y_high]``). If not - set, use normal bounds (``[0, 1, 0, 1]``). - - Yields - ------ - figure : matplotlib.figure.Figure - The figure that should be finally returned to the user - - axes : matplotlib.figure.Axes - An axis set where to precision-recall plots should be added to. - """ - - fig, axes1 = plt.subplots(1) - - # Names and bounds - axes1.set_xlabel("Recall") - axes1.set_ylabel("Precision") - axes1.set_xlim(limits[:2]) - axes1.set_ylim(limits[2:]) - - if title is not None: - axes1.set_title(title) - - axes1.grid(linestyle="--", linewidth=1, color="gray", alpha=0.2) - axes2 = axes1.twinx() - - # Annotates plot with F1-score iso-lines - f_scores = numpy.linspace(0.1, 0.9, num=9) - tick_locs = [] - tick_labels: list[str] = [] - for f_score in f_scores: - x = numpy.linspace(0.01, 1) - y = f_score * x / (2 * x - f_score) - (_l,) = plt.plot(x[y >= 0], y[y >= 0], color="green", alpha=0.1) - tick_locs.append(y[-1]) - tick_labels.append(f"{f_score:.1f}") - axes2.tick_params(axis="y", which="both", pad=0, right=False, left=False) - axes2.set_ylabel("iso-F", color="green", alpha=0.3) - axes2.set_ylim([0.0, 1.0]) - axes2.yaxis.set_label_coords(1.015, 0.97) - axes2.set_yticks(tick_locs) # notice these are invisible - for k in axes2.set_yticklabels(tick_labels): - k.set_color("green") - k.set_alpha(0.3) - k.set_size(8) - - # we should see some of axes 1 axes - axes1.spines["right"].set_visible(False) - axes1.spines["top"].set_visible(False) - axes1.spines["left"].set_position( - ("data", limits[0] - (0.015 * (limits[1] - limits[0]))) - ) - axes1.spines["bottom"].set_position( - ("data", limits[2] - (0.015 * (limits[3] - limits[2]))) - ) - - # we shouldn't see any of axes 2 axes - axes2.spines["right"].set_visible(False) - axes2.spines["top"].set_visible(False) - axes2.spines["left"].set_visible(False) - axes2.spines["bottom"].set_visible(False) - - # yield execution, lets user draw precision-recall plots, and the legend - # before tighteneing the layout - yield fig, axes1 - - plt.tight_layout() - - -def precision_recall_f1iso( - data: dict[str, dict[str, typing.Any]], - limits: tuple[float, float, float, float], -) -> matplotlib.figure.Figure: - """Create a precision-recall plot. - - This function creates and returns a Matplotlib figure with a - precision-recall plot. The plot will be annotated with F1-score iso-lines - (in which the F1-score maintains the same value). - - This function specially supports "second-annotator" entries by plotting a - line showing the comparison between the default annotator being analyzed - and a second "opinion". Second annotator dataframes contain a single entry - (threshold=0.5), given the nature of the binary map comparisons. - - Parameters - ---------- - data : dict - A dictionary in which keys are strings defining plot labels and values - are dictionaries with two entries: - - * ``df``: :py:class:`pandas.DataFrame` - - A dataframe that is produced by our evaluator engine, indexed by - integer "thresholds", containing the following columns: - ``threshold``, ``tp``, ``fp``, ``tn``, ``fn``, ``mean_precision``, - ``mode_precision``, ``lower_precision``, ``upper_precision``, - ``mean_recall``, ``mode_recall``, ``lower_recall``, ``upper_recall``, - ``mean_specificity``, ``mode_specificity``, ``lower_specificity``, - ``upper_specificity``, ``mean_accuracy``, ``mode_accuracy``, - ``lower_accuracy``, ``upper_accuracy``, ``mean_jaccard``, - ``mode_jaccard``, ``lower_jaccard``, ``upper_jaccard``, - ``mean_f1_score``, ``mode_f1_score``, ``lower_f1_score``, - ``upper_f1_score``, ``frequentist_precision``, - ``frequentist_recall``, ``frequentist_specificity``, - ``frequentist_accuracy``, ``frequentist_jaccard``, - ``frequentist_f1_score``. - - * ``threshold``: :py:class:`list` - - A threshold to graph with a dot for each set. Specific - threshold values do not affect "second-annotator" dataframes. - limits - A 4-tuple containing the bounds of the plot for the x and y axis - respectively (format: ``[x_low, x_high, y_low, y_high]``). If not set, - use normal bounds (``[0, 1, 0, 1]``). - - Returns - ------- - A matplotlib figure you can save or display (uses an ``agg`` backend). - """ - - lines = ["-", "--", "-.", ":"] - colors = [ - "#1f77b4", - "#ff7f0e", - "#2ca02c", - "#d62728", - "#9467bd", - "#8c564b", - "#e377c2", - "#7f7f7f", - "#bcbd22", - "#17becf", - ] - colorcycler = cycle(colors) - linecycler = cycle(lines) - - with _precision_recall_canvas(title=None, limits=limits) as (fig, axes): - legend = [] - - for name, value in data.items(): - df = value["df"] - threshold = value["threshold"] - - # plots only from the point where recall reaches its maximum, - # otherwise, we don't see a curve... - max_recall = df.mean_recall.idxmax() - pi = df.mean_precision[max_recall:] - ri = df.mean_recall[max_recall:] - # valid = (pi + ri) > 0 - - # optimal point along the curve - bins = len(df) - index = int(round(bins * threshold)) - index = min(index, len(df) - 1) # avoids out of range indexing - - # plots Recall/Precision as threshold changes - label = f"{name} (F1={df.mean_f1_score[index]:.4f})" - color = next(colorcycler) - - if len(df) == 1: - # plot black dot for F1-score at select threshold - (marker,) = axes.plot( - df.mean_recall[index], - df.mean_precision[index], - marker="*", - markersize=6, - color=color, - alpha=0.8, - linestyle="None", - ) - (line,) = axes.plot( - df.mean_recall[index], - df.mean_precision[index], - linestyle="None", - color=color, - alpha=0.2, - ) - legend.append(([marker, line], label)) - else: - # line first, so marker gets on top - style = next(linecycler) - (line,) = axes.plot( - ri[pi > 0], pi[pi > 0], color=color, linestyle=style - ) - (marker,) = axes.plot( - df.mean_recall[index], - df.mean_precision[index], - marker="o", - linestyle=style, - markersize=4, - color=color, - alpha=0.8, - ) - legend.append(([marker, line], label)) - - if limits: - axes.set_xlim(limits[:2]) - axes.set_ylim(limits[2:]) - - if len(legend) > 1: - axes.legend( - [tuple(k[0]) for k in legend], - [k[1] for k in legend], - loc="lower left", - fancybox=True, - framealpha=0.7, - ) - - return fig - - -def loss_curve(df: pandas.DataFrame) -> matplotlib.figure.Figure: - """Create a loss curve in a Matplotlib figure. - - Parameters - ---------- - df - A dataframe containing, at least, "epoch", "median-loss" and - "learning-rate" columns, that will be plotted. - - Returns - ------- - A figure, that may be saved or displayed. - """ - - ax1 = df.plot(x="epoch", y="median-loss", grid=True) - ax1.set_ylabel("Median Loss") - ax1.grid(linestyle="--", linewidth=1, color="gray", alpha=0.2) - ax2 = df["learning-rate"].plot( - secondary_y=True, - legend=True, - grid=True, - ) - ax2.set_ylabel("Learning Rate") - ax1.set_xlabel("Epoch") - plt.tight_layout() - return ax1.get_figure() diff --git a/src/mednet/libs/segmentation/utils/table.py b/src/mednet/libs/segmentation/utils/table.py deleted file mode 100644 index f433835ec04df9461f7dcfdc19d0061bd418b352..0000000000000000000000000000000000000000 --- a/src/mednet/libs/segmentation/utils/table.py +++ /dev/null @@ -1,96 +0,0 @@ -# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch> -# -# SPDX-License-Identifier: GPL-3.0-or-later - -import typing - -import tabulate - -from .measure import auc - - -def performance_table(data: dict[str, dict[str, typing.Any]], fmt: str) -> str: - """Comparison of tables in a given format. - - Parameters - ---------- - data : dict - A dictionary in which keys are strings defining plot labels and values - are dictionaries with two entries: - - * ``df``: :py:class:`pandas.DataFrame` - - A dataframe that is produced by our evaluator engine, indexed by - integer "thresholds", containing the following columns: - ``threshold``, ``tp``, ``fp``, ``tn``, ``fn``, ``mean_precision``, - ``mode_precision``, ``lower_precision``, ``upper_precision``, - ``mean_recall``, ``mode_recall``, ``lower_recall``, ``upper_recall``, - ``mean_specificity``, ``mode_specificity``, ``lower_specificity``, - ``upper_specificity``, ``mean_accuracy``, ``mode_accuracy``, - ``lower_accuracy``, ``upper_accuracy``, ``mean_jaccard``, - ``mode_jaccard``, ``lower_jaccard``, ``upper_jaccard``, - ``mean_f1_score``, ``mode_f1_score``, ``lower_f1_score``, - ``upper_f1_score``, ``frequentist_precision``, - ``frequentist_recall``, ``frequentist_specificity``, - ``frequentist_accuracy``, ``frequentist_jaccard``, - ``frequentist_f1_score``. - - * ``threshold``: :py:class:`list` - - A threshold to graph with a dot for each set. Specific - threshold values do not affect "second-annotator" dataframes. - fmt : str - One of the formats supported by tabulate. - - Returns - ------- - A table in a specific format. - """ - - headers = [ - "Dataset", - "T", - "E(F1)", - "CI(F1)", - "AUC", - "CI(AUC)", - ] - - table = [] - for k, v in data.items(): - entry = [ - k, - v["threshold"], - ] - - # statistics based on the "assigned" threshold (a priori, less biased) - bins = len(v["df"]) - index = int(round(bins * v["threshold"])) - index = min(index, len(v["df"]) - 1) # avoids out of range indexing - entry.append(v["df"].mean_f1_score[index]) - entry.append( - f"{v['df'].lower_f1_score[index]:.3f}-{v['df'].upper_f1_score[index]:.3f}" - ) - - # AUC PR curve - entry.append( - auc( - v["df"]["mean_recall"].to_numpy(), - v["df"]["mean_precision"].to_numpy(), - ) - ) - lower_auc = auc( - v["df"]["lower_recall"].to_numpy(), - v["df"]["lower_precision"].to_numpy(), - ) - upper_auc = auc( - v["df"]["upper_recall"].to_numpy(), - v["df"]["upper_precision"].to_numpy(), - ) - entry.append(f"{lower_auc:.3f}-{upper_auc:.3f}") - - table.append(entry) - - return tabulate.tabulate( - table, headers, tablefmt=fmt, floatfmt=".3f", stralign="right" - ) diff --git a/tests/segmentation/test_measures.py b/tests/segmentation/test_measures.py index a9fdceb02a077a7dcd3b59bba91f271f6a45dcb0..cfc9db1d21c83ee1124ae57a80eedbf2bb40b1a2 100644 --- a/tests/segmentation/test_measures.py +++ b/tests/segmentation/test_measures.py @@ -2,280 +2,24 @@ # # SPDX-License-Identifier: GPL-3.0-or-later -import math import random -import unittest import numpy -import pytest -import torch -from mednet.libs.segmentation.engine.evaluator import ( - sample_measures_for_threshold, -) -from mednet.libs.segmentation.utils.measure import ( - auc, - base_measures, - bayesian_measures, - beta_credible_region, -) +from mednet.libs.segmentation.engine.evaluator import all_metrics -class TestFrequentist(unittest.TestCase): - """Unit test for frequentist base measures.""" +def test_all_metrics(): + tp = random.randint(1, 100) + fp = random.randint(1, 100) + tn = random.randint(1, 100) + fn = random.randint(1, 100) - def setUp(self): - self.tp = random.randint(1, 100) - self.fp = random.randint(1, 100) - self.tn = random.randint(1, 100) - self.fn = random.randint(1, 100) + p, r, s, a, j, f1 = all_metrics(tp, fp, tn, fn) - def test_precision(self): - precision = base_measures(self.tp, self.fp, self.tn, self.fn)[0] - self.assertEqual((self.tp) / (self.tp + self.fp), precision) - - def test_recall(self): - recall = base_measures(self.tp, self.fp, self.tn, self.fn)[1] - self.assertEqual((self.tp) / (self.tp + self.fn), recall) - - def test_specificity(self): - specificity = base_measures(self.tp, self.fp, self.tn, self.fn)[2] - self.assertEqual((self.tn) / (self.tn + self.fp), specificity) - - def test_accuracy(self): - accuracy = base_measures(self.tp, self.fp, self.tn, self.fn)[3] - self.assertEqual( - (self.tp + self.tn) / (self.tp + self.tn + self.fp + self.fn), - accuracy, - ) - - def test_jaccard(self): - jaccard = base_measures(self.tp, self.fp, self.tn, self.fn)[4] - self.assertEqual(self.tp / (self.tp + self.fp + self.fn), jaccard) - - def test_f1(self): - p, r, s, a, j, f1 = base_measures(self.tp, self.fp, self.tn, self.fn) - self.assertEqual((2.0 * self.tp) / (2.0 * self.tp + self.fp + self.fn), f1) - self.assertAlmostEqual((2 * p * r) / (p + r), f1) # base definition - - -class TestBayesian: - """Unit test for bayesian base measures.""" - - def mean(self, k, i, lambda_): - return (k + lambda_) / (k + i + 2 * lambda_) - - def mode1(self, k, i, lambda_): # (k+lambda_), (i+lambda_) > 1 - return (k + lambda_ - 1) / (k + i + 2 * lambda_ - 2) - - def test_beta_credible_region_base(self): - k = 40 - i = 10 - lambda_ = 0.5 - cover = 0.95 - got = beta_credible_region(k, i, lambda_, cover) - # mean, mode, lower, upper - exp = ( - self.mean(k, i, lambda_), - self.mode1(k, i, lambda_), - 0.6741731038857685, - 0.8922659692341358, - ) - assert numpy.isclose(got, exp).all(), f"{got} <> {exp}" - - def test_beta_credible_region_small_k(self): - k = 4 - i = 1 - lambda_ = 0.5 - cover = 0.95 - got = beta_credible_region(k, i, lambda_, cover) - # mean, mode, lower, upper - exp = ( - self.mean(k, i, lambda_), - self.mode1(k, i, lambda_), - 0.37137359936800574, - 0.9774872340008449, - ) - assert numpy.isclose(got, exp).all(), f"{got} <> {exp}" - - def test_beta_credible_region_precision_jeffrey(self): - # simulation of situation for precision TP == FP == 0, Jeffrey's prior - k = 0 - i = 0 - lambda_ = 0.5 - cover = 0.95 - got = beta_credible_region(k, i, lambda_, cover) - # mean, mode, lower, upper - exp = ( - self.mean(k, i, lambda_), - 0.0, - 0.0015413331334360135, - 0.998458666866564, - ) - assert numpy.isclose(got, exp).all(), f"{got} <> {exp}" - - def test_beta_credible_region_precision_flat(self): - # simulation of situation for precision TP == FP == 0, flat prior - k = 0 - i = 0 - lambda_ = 1.0 - cover = 0.95 - got = beta_credible_region(k, i, lambda_, cover) - # mean, mode, lower, upper - exp = (self.mean(k, i, lambda_), 0.0, 0.025000000000000022, 0.975) - assert numpy.isclose(got, exp).all(), f"{got} <> {exp}" - - def test_bayesian_measures(self): - tp = random.randint(100000, 1000000) - fp = random.randint(100000, 1000000) - tn = random.randint(100000, 1000000) - fn = random.randint(100000, 1000000) - - _prec, _rec, _spec, _acc, _jac, _f1 = base_measures(tp, fp, tn, fn) - prec, rec, spec, acc, jac, f1 = bayesian_measures(tp, fp, tn, fn, 0.5, 0.95) - - # Notice that for very large k and l, the base frequentist measures - # should be approximately the same as the bayesian mean and mode - # extracted from the beta posterior. We test that here. - assert numpy.isclose(_prec, prec[0]), f"freq: {_prec} <> bays: {prec[0]}" - assert numpy.isclose(_prec, prec[1]), f"freq: {_prec} <> bays: {prec[1]}" - assert numpy.isclose(_rec, rec[0]), f"freq: {_rec} <> bays: {rec[0]}" - assert numpy.isclose(_rec, rec[1]), f"freq: {_rec} <> bays: {rec[1]}" - assert numpy.isclose(_spec, spec[0]), f"freq: {_spec} <> bays: {spec[0]}" - assert numpy.isclose(_spec, spec[1]), f"freq: {_spec} <> bays: {spec[1]}" - assert numpy.isclose(_acc, acc[0]), f"freq: {_acc} <> bays: {acc[0]}" - assert numpy.isclose(_acc, acc[1]), f"freq: {_acc} <> bays: {acc[1]}" - assert numpy.isclose(_jac, jac[0]), f"freq: {_jac} <> bays: {jac[0]}" - assert numpy.isclose(_jac, jac[1]), f"freq: {_jac} <> bays: {jac[1]}" - assert numpy.isclose(_f1, f1[0]), f"freq: {_f1} <> bays: {f1[0]}" - assert numpy.isclose(_f1, f1[1]), f"freq: {_f1} <> bays: {f1[1]}" - - # We also test that the interval in question includes the mode and the - # mean in this case. - assert (prec[2] < prec[1]) and ( - prec[1] < prec[3] - ), f"precision is out of bounds {_prec[2]} < {_prec[1]} < {_prec[3]}" - assert (rec[2] < rec[1]) and ( - rec[1] < rec[3] - ), f"recall is out of bounds {_rec[2]} < {_rec[1]} < {_rec[3]}" - assert (spec[2] < spec[1]) and ( - spec[1] < spec[3] - ), f"specif. is out of bounds {_spec[2]} < {_spec[1]} < {_spec[3]}" - assert (acc[2] < acc[1]) and ( - acc[1] < acc[3] - ), f"accuracy is out of bounds {_acc[2]} < {_acc[1]} < {_acc[3]}" - assert (jac[2] < jac[1]) and ( - jac[1] < jac[3] - ), f"jaccard is out of bounds {_jac[2]} < {_jac[1]} < {_jac[3]}" - assert (f1[2] < f1[1]) and ( - f1[1] < f1[3] - ), f"f1-score is out of bounds {_f1[2]} < {_f1[1]} < {_f1[3]}" - - -def test_auc(): - # basic tests - assert math.isclose(auc([0.0, 0.5, 1.0], [1.0, 1.0, 1.0]), 1.0) - assert math.isclose(auc([0.0, 0.5, 1.0], [1.0, 0.5, 0.0]), 0.5, rel_tol=0.001) - assert math.isclose(auc([0.0, 0.5, 1.0], [0.0, 0.0, 0.0]), 0.0, rel_tol=0.001) - assert math.isclose(auc([0.0, 0.5, 1.0], [0.0, 1.0, 0.0]), 0.5, rel_tol=0.001) - assert math.isclose(auc([0.0, 0.5, 1.0], [0.0, 0.5, 0.0]), 0.25, rel_tol=0.001) - assert math.isclose(auc([0.0, 0.5, 1.0], [0.0, 0.5, 0.0]), 0.25, rel_tol=0.001) - - # reversing tht is also true - assert math.isclose(auc([0.0, 0.5, 1.0][::-1], [1.0, 1.0, 1.0][::-1]), 1.0) - assert math.isclose( - auc([0.0, 0.5, 1.0][::-1], [1.0, 0.5, 0.0][::-1]), 0.5, rel_tol=0.001 - ) - assert math.isclose( - auc([0.0, 0.5, 1.0][::-1], [0.0, 0.0, 0.0][::-1]), 0.0, rel_tol=0.001 - ) - assert math.isclose( - auc([0.0, 0.5, 1.0][::-1], [0.0, 1.0, 0.0][::-1]), 0.5, rel_tol=0.001 - ) - assert math.isclose( - auc([0.0, 0.5, 1.0][::-1], [0.0, 0.5, 0.0][::-1]), 0.25, rel_tol=0.001 - ) - assert math.isclose( - auc([0.0, 0.5, 1.0][::-1], [0.0, 0.5, 0.0][::-1]), 0.25, rel_tol=0.001 - ) - - -def test_auc_raises_value_error(): - with pytest.raises(ValueError, match=r".*neither increasing nor decreasing.*"): - # x is **not** monotonically increasing or decreasing - assert math.isclose(auc([0.0, 0.5, 0.0], [1.0, 1.0, 1.0]), 1.0) - - -def test_auc_raises_assertion_error(): - with pytest.raises(AssertionError, match=r".*must have the same length.*"): - # x is **not** the same size as y - assert math.isclose(auc([0.0, 0.5, 1.0], [1.0, 1.0]), 1.0) - - -def test_sample_measures_mask_checkerbox(): - prediction = torch.ones((4, 4), dtype=float) - ground_truth = torch.ones((4, 4), dtype=float) - ground_truth[2:, :2] = 0.0 - ground_truth[:2, 2:] = 0.0 - mask = torch.zeros((4, 4), dtype=float) - mask[1:3, 1:3] = 1.0 - threshold = 0.5 - - # with this configuration, this should be the correct count - tp = 2 - fp = 2 - tn = 0 - fn = 0 - - assert (tp, fp, tn, fn) == sample_measures_for_threshold( - prediction, ground_truth, mask, threshold - ) - - -def test_sample_measures_mask_cross(): - prediction = torch.ones((10, 10), dtype=float) - prediction[0, :] = 0.0 - prediction[9, :] = 0.0 - ground_truth = torch.ones((10, 10), dtype=float) - ground_truth[:5,] = 0.0 # lower part is not to be set - mask = torch.zeros((10, 10), dtype=float) - mask[(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] = 1.0 - mask[(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (9, 8, 7, 6, 5, 4, 3, 2, 1, 0)] = 1.0 - threshold = 0.5 - - # with this configuration, this should be the correct count - tp = 8 - fp = 8 - tn = 2 - fn = 2 - - assert (tp, fp, tn, fn) == sample_measures_for_threshold( - prediction, ground_truth, mask, threshold - ) - - -def test_sample_measures_mask_border(): - prediction = torch.zeros((10, 10), dtype=float) - prediction[:, 4] = 1.0 - prediction[:, 5] = 1.0 - prediction[0, 4] = 0.0 - prediction[8, 4] = 0.0 - prediction[1, 6] = 1.0 - ground_truth = torch.zeros((10, 10), dtype=float) - ground_truth[:, 4] = 1.0 - ground_truth[:, 5] = 1.0 - mask = torch.ones((10, 10), dtype=float) - mask[:, 0] = 0.0 - mask[0, :] = 0.0 - mask[:, 9] = 0.0 - mask[9, :] = 0.0 - threshold = 0.5 - - # with this configuration, this should be the correct count - tp = 15 - fp = 1 - tn = 47 - fn = 1 - - assert (tp, fp, tn, fn) == sample_measures_for_threshold( - prediction, ground_truth, mask, threshold - ) + assert (tp / (tp + fp)) == p + assert (tp / (tp + fn)) == r + assert (tn / (tn + fp)) == s + assert ((tp + tn) / (tp + tn + fp + fn)) == a + assert (tp / (tp + fp + fn)) == j + assert ((2.0 * tp) / (2.0 * tp + fp + fn)) == f1 + assert numpy.isclose((2 * p * r) / (p + r), f1)