"""Module containing dynamic thresholding models.
"""
from __future__ import annotations
import abc
from typing import Optional, Union
import numpy as np
import scipy.ndimage as ndi
import skimage
from scipy.signal import find_peaks
import darsia
[docs]
class HistogrammBasedThresholding:
def __init__(self) -> None:
# Define tuning parameters for defining histograms,
# and smooth them. NOTE: They should be in general chosen
# tailored to the situation. However, these values should
# also work for most cases.
self._bins = 200
self._sigma = 10
def __call__(
self, signal: np.ndarray, roi: np.ndarray
) -> tuple[Optional[float], bool]:
"""
Dynamic thresholding of signal in provided roi.
Args:
signal (np.ndarray): signal
roi (np.ndarray): boolean array identifying the considered region
Returns:
float: new threshold value.
bool: identifier for success.
"""
# Reduce the signal to the effective mask
active_signal_values = np.ravel(signal)[np.ravel(roi)]
# Continue by analysing the histogram corresponding to the active signal.
# Smooth the histogram of the signal, and compute derivates
hist = ndi.gaussian_filter1d(
np.histogram(active_signal_values, bins=self._bins)[0],
sigma=self._sigma,
)
# Run provided analysis
new_threshold, have_new_value = self._analysis(active_signal_values, hist)
return new_threshold, have_new_value
@abc.abstractmethod
def _analysis(
self, active_signal_values: np.ndarray, hist: np.ndarray
) -> tuple[Optional[float], bool]:
"""
Abstract method for some histogram analysis.
Args:
active_signal_values (np.ndarray): 1d array with all signal values
hist (np.ndarray): 1d array, histogram
Returns:
float, optional: determined threshold value
bool: flag controlling whether the analysis has been successful.
"""
pass
[docs]
class StandardOtsu(HistogrammBasedThresholding):
"""
Wrapper for standard Otsu thresholding.
"""
def _analysis(
self, active_signal_values: np.ndarray, hist: np.ndarray
) -> tuple[Optional[float], bool]:
"""
Standard Otsu method for provided histogram.
Args:
active_signal_values (np.ndarray): 1d array with all signal values
hist (np.ndarray): 1d array, histogram
Returns:
float, optional: determined threshold value
bool: flag controlling whether the analysis has been successful.
"""
otsu_index = skimage.filters.threshold_otsu(hist=hist)
otsu_threshold = np.min(active_signal_values) + otsu_index / self._bins * (
np.max(active_signal_values) - np.min(active_signal_values)
)
return otsu_threshold, True
[docs]
class TwoPeakHistogrammAnalysis(HistogrammBasedThresholding):
"""
Class for histogramm analysis aiming at separating two signal peaks.
"""
def _analysis(
self, active_signal_values: np.ndarray, hist: np.ndarray
) -> tuple[Optional[float], bool]:
"""
Histogram analysis.
Return first local minimum after descending the first
peak, when moving from left to right. Provide global
minima if existing and local true minima as well.
Main goal:
Find separator of signal peaks.
Strategy:
1. Determine the minimum between two significant peaks.
2. If only one peak present or peaks not significant,
determine something like a "first local min" based
on relative criteria.
Args:
active_signal_values (np.ndarray): 1d array with all signal values
hist (np.ndarray): 1d array, histogram
Returns:
float, optional: determined threshold value
bool: flag controlling whether the analysis has been successful.
"""
# Initialize output
new_threshold = None
have_new_value = False
hist_1st_derivative = np.gradient(hist)
hist_2nd_derivative = np.gradient(hist_1st_derivative)
## For tuning the parameters, plot the histogram and its derivatives
# if self.verbosity >= 2:
# plt.figure("Histogram analysis")
# plt.plot(
# np.linspace(
# np.min(active_signal_values),
# np.max(active_signal_values),
# hist.shape[0],
# ),
# hist,
# label=f"Label {label}",
# )
# plt.legend()
# Prioritize global minima on the interval between the two
# largest peaks. If only a single peak exists, continue
# as in 'first local min'.
# Define tuning parameters for defining histograms,
# To allow edge values being peaks as well, add low
# numbers to the sides of the smooth histogram.
enriched_hist = np.hstack(
(
np.array([np.min(hist)]),
hist,
np.array([np.min(hist)]),
)
)
# Peak analysis.
# Find all peaks of the enriched smooth histogram,
# allowing end values to be identified as peaks.
peaks, _ = find_peaks(enriched_hist)
##################################################################
# Only continue if at least one peak is present
if len(peaks) > 0:
# Relate the indices with the original histogram
# And continue analysis with the original one.
peaks_indices = peaks - 1
# Cache the peak heights
peaks_heights = hist[peaks_indices]
# # Check whether peaks have passed
# if self.threshold_safety == "peaks passed":
# self.pre_peaks_passed[label] = False
# if (
# peaks_heights[0] < np.max(peaks_heights)
# and not self.peaks_passed[label]
# ):
# self.pre_peaks_passed[label] = True
# Fetch the modulus of the second derivative for all peaks
peaks_2nd_derivative = np.absolute(hist_2nd_derivative[peaks_indices])
##################################################################
# Track the feasibility of peaks. Initialize all peaks as feasible.
# Feasibility is considered only in the presence of multiple peaks.
# Determine feasibility. A peak is considered feasible if
# it is sufficiently far away from the global minimum.
min_height = np.min(hist)
peaks_are_distinct = peaks_heights - min_height > 0.2 * np.max(
peaks_heights - min_height
)
# Determine feasibility. A peak is considered feasible if
# the modulus of the second derivative is sufficiently large,
# relatively to the most prominent peak.
peaks_have_large_2nd_der = peaks_2nd_derivative > 0.2 * np.max(
peaks_2nd_derivative
)
peaks_are_feasible = np.logical_or(
peaks_are_distinct, peaks_have_large_2nd_der
)
# Cache the number of feasible peaks
num_feasible_peaks = np.count_nonzero(peaks_are_feasible)
##################################################################
# Determine the two feasible peaks with largest height.
# For this, first, restrict peaks to feasible ones.
feasible_peaks_indices = peaks_indices[peaks_are_feasible]
feasible_peaks_heights = peaks_heights[peaks_are_feasible]
# Sort the peak values from large to small, and restrict to
# the two largest
relative_max_indices = np.flip(np.argsort(feasible_peaks_heights))[
: min(2, num_feasible_peaks)
]
max_indices = feasible_peaks_indices[relative_max_indices]
sorted_max_indices = np.sort(max_indices)
##################################################################
# Continue only if there exist two feasible peaks, and the peaks
# are of similar size.
if num_feasible_peaks > 1:
##################################################################
# Determine whether the two peaks are significant.
# Consider the restricted histogram
restricted_hist = hist[np.arange(*sorted_max_indices)]
# Identify the global minimum as separator of signals
restricted_global_min_index = np.argmin(restricted_hist)
# Map the relative index from the restricted to the full
# (not-enriched) histogram.
global_min_index = sorted_max_indices[0] + restricted_global_min_index
min_value = hist[global_min_index]
# Check whether the both peaks values actually are sufficiently
# and relatively different from the min value. Discard the value
# otherwise.
peaks_significant = (hist[max_indices[1]] - min_value) > 0.1 * (
hist[max_indices[0]] - min_value
)
##################################################################
# Determine the global minimum (in terms of signal values),
# determining the candidate for the threshold value
# Thresh mapped onto range of values
# Cache some of the general variables needed for a two peak analysis.
self._hist = hist
self._restricted_hist = restricted_hist
self._sorted_max_indices = sorted_max_indices
# Apply specific analysis.
threshold_index = self._two_peak_analysis()
if peaks_significant:
new_threshold = np.min(
active_signal_values
) + threshold_index / self._bins * (
np.max(active_signal_values) - np.min(active_signal_values)
)
# Identify success of the method
have_new_value = True
# In case the above analysis has not been accepted (peaks not
# significant) there exists only one peak, perform an alternative
# step.
if not have_new_value and num_feasible_peaks == 0:
# Situation occurs when no peaks present.
pass
elif not have_new_value and (
num_feasible_peaks == 1 or not (peaks_significant)
):
# Restrict analysis to the signal right from the most
# significant peak.
# restricted_hist = hist[max_indices[0] :]
restricted_hist_1st_derivative = hist_1st_derivative[max_indices[0] :]
restricted_hist_2nd_derivative = hist_2nd_derivative[max_indices[0] :]
# max_peak_value = np.max(restricted_hist)
max_peak_derivative = np.max(
np.absolute(restricted_hist_1st_derivative)
)
# Restrict to positive 2nd derivative and small 1st derivative
max_value = 0.01 * max_peak_derivative
tol = 1e-6
feasible_restricted_indices = np.logical_and(
-max_value < restricted_hist_1st_derivative,
restricted_hist_2nd_derivative > tol,
)
# Only continue if non-empty
if np.count_nonzero(feasible_restricted_indices) > 0:
# Pick the first value in the feasible interval
min_restricted_index = np.min(
np.argwhere(feasible_restricted_indices)
)
# Relate to the full signal
min_index = max_indices[0] + min_restricted_index
# Threshold value in the mapped onto range of values
new_threshold = np.min(
active_signal_values
) + min_index / self._bins * (
np.max(active_signal_values) - np.min(active_signal_values)
)
# Identify success of the method
have_new_value = True
return new_threshold, have_new_value
@abc.abstractmethod
def _two_peak_analysis(self) -> int:
"""
Abstract method for determining the index corresponding
to the full histogramm, to define the threshold value.
Returns:
int: index in histogram self._hist corresponding
to considered threshold value.
"""
pass
[docs]
class GlobalMinTwoPeakHistogrammAnalysis(TwoPeakHistogrammAnalysis):
"""
Class defining a two peak analysis for dynamically
determining a threshold parameter used on a global
minimum analysis of the signal histogram.
"""
def _two_peak_analysis(self) -> int:
"""
Method to determine the threshold parameter based
on the global minimum attained between two peaks,
i.e., operating on the restricted histogram.
Returns:
int: index in histogram self._hist corresponding
to considered threshold value.
"""
# Identify the global minimum as separator of signals
restricted_global_min_index = np.argmin(self._restricted_hist)
# Map the relative index from the restricted to the full
# (not-enriched) histogram.
global_min_index = self._sorted_max_indices[0] + restricted_global_min_index
return global_min_index
[docs]
class OtsuTwoPeakHistogrammAnalysis(TwoPeakHistogrammAnalysis):
"""
Class defining a two peak analysis for dynamically
determining a threshold parameter used on a Otsu
analysis of the signal histogram.
"""
def _two_peak_analysis(self) -> int:
"""
Method to determine the threshold parameter based
on the global minimum attained between two peaks,
i.e., operating on the restricted histogram.
Returns:
int: index in histogram self._hist corresponding
to considered threshold value.
"""
otsu_index = skimage.filters.threshold_otsu(hist=self._hist)
return otsu_index
[docs]
class DynamicThresholdModel(darsia.StaticThresholdModel):
"""
Class for dynamic thresholding.
"""
def __init__(
self,
method: Optional[str] = None,
threshold_lower: Optional[Union[float, list[float]]] = None,
threshold_upper: Optional[Union[float, list[float]]] = None,
labels: Optional[np.ndarray] = None,
key: str = "",
**kwargs,
) -> None:
"""
Constructor of DynamicThresholdModel.
Args:
method (str): method name
threshold_lower (float or list of float): lower threshold value boundary
threshold_upper (float or list of float): upper threshold value boundary
labels (array): labeled domain
key (str): prefix for options
"""
# Determine threshold strategy and lower and upper bounds.
threshold_method = (
kwargs.get(key + "threshold method", "tailored global min")
if method is None
else method
)
threshold_lower = (
kwargs.get(key + "threshold value min", 0.0)
if threshold_lower is None
else threshold_lower
)
threshold_upper = (
kwargs.get(key + "threshold value max", None)
if threshold_upper is None
else threshold_upper
)
# Call the constructor if the static threshold model.
super().__init__(threshold_lower, threshold_upper, labels)
# Identify method and define corresponding thresholding object
if threshold_method == "tailored global min":
self.method = GlobalMinTwoPeakHistogrammAnalysis()
elif threshold_method == "tailored otsu":
self.method = OtsuTwoPeakHistogrammAnalysis()
elif threshold_method == "otsu":
self.method = StandardOtsu()
else:
raise ValueError(f"Method {method} not supported for dynamic thresholding")
# Identify the provided thresholds as fixed bounds, allowing to modify the
# threshold values.
self._threshold_lower_bound = (
self._threshold_lower.copy() if self._threshold_lower is not None else None
)
self._threshold_upper_bound = (
self._threshold_upper.copy() if self._threshold_upper is not None else None
)
# Only allow for lower threshold values. Deactive upper thresholding
self._threshold_upper = None
# Verbosity
self.verbosity = kwargs.get("verbosity", 0)
def __call__(
self, img: np.ndarray, mask: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Main method. Adapt thresholds and apply thresholding.
Args:
img (np.ndarray): image
mask (np.ndarray, optional): boolean mask of interest
Returns:
np.ndarray: booelean mask identifying signal according to current threshold values.
"""
self.calibrate([img], mask)
return super().__call__(img, mask)
[docs]
def calibrate(self, img: list[np.ndarray], mask: Optional[np.ndarray]) -> None:
"""
Adapt threshold values using a dynamic strategies.
"""
if self._is_homogeneous:
self._calibrate_homogeneous(img, mask)
else:
self._calibrate_heterogeneous(img, mask)
def _calibrate_homogeneous(
self, img: list[np.ndarray], mask: Optional[np.ndarray]
) -> None:
"""
Adapt threshold values globally using a dynamic stratgey.
Args:
img (list of np.ndarray): image(s)
"""
raise NotImplementedError(
"Currently the dynamic thresholding is only implemented for heterogeneous media."
)
pass
def _calibrate_heterogeneous(
self, img: list[np.ndarray], mask: Optional[np.ndarray]
) -> None:
"""
Adapt threshold values for each label using a dynamic strategy.
Args:
img (list of np.ndarray): image(s)
"""
# Extract main image for calibration
assert len(img) == 1
signal = img[0]
for label_count, label in enumerate(np.unique(self._labels)):
# Determine mask of interest, i.e., consider single label,
# the interval of interest and the provided mask.
label_mask = self._labels == label
effective_mask = (
label_mask if mask is None else np.logical_and(label_mask, mask)
)
# Only continue if mask not empty
if np.count_nonzero(effective_mask) > 0:
new_threshold, have_new_value = self.method(signal, effective_mask)
# Setting values
if have_new_value:
# Clip the new value to the appropriate interval
bounded_threshold_value = np.clip(
new_threshold,
self._threshold_lower_bound[label_count],
self._threshold_upper_bound[label_count],
)
# Set self.threshold_lower is used in self.__call__ as effective lower
# threshold value
self._threshold_lower[label_count] = bounded_threshold_value
# Display the threshold value
if self.verbosity >= 1:
print(f"Threshold value: {self._threshold_lower}")