Source code for darsia.multi_image_analysis.balancing_calibration

"""Module collecting several calibration tools for models.

In particular provided are objective functions for calibration in
ConcentrationAnalysis.calibrate_balancing().

"""

from __future__ import annotations

import abc
from itertools import combinations
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import numpy as np
import skimage

import darsia


[docs] class AbstractBalancingCalibration: """ Abstract class for defining an objective function to be called in ConcentrationAnalysis.calibrate_balancing(). """
[docs] @abc.abstractmethod def optimize_balancing( self, input_images: list[np.ndarray], images_diff: list[np.ndarray], relative_times: list[float], options: dict, ): """ Abstract method to define an objective function. Returns: callable: objective function. """ pass
[docs] def update_balancing_for_calibration( self, parameters: np.ndarray, options: dict ) -> None: """ Wrapper for updating the balancing (provided as a model), depending on whether it is a single model or a combined model. Args: parameters (np.ndarray): model parameters, options (dict): further tuning parameters and extra info. """ # Check whether the balancing is part of a combined model, # and possibly determine position of the model if isinstance(self.balancing, darsia.CombinedModel): pos_balancing = options.get("balancing position") self.balancing.update_model_parameters(parameters, pos_balancing) else: self.balancing.update_model_parameters(parameters)
[docs] def calibrate_balancing( self, images: list[darsia.Image], options: dict, ) -> bool: """ Utility for calibrating the balancing used in darsia.ConcentrationAnalysis. NOTE: Require to combine darsia.ConcentrationAnalysis with a calibration model mixin via multiple inheritance. Args: images (list of darsia.Image): calibration images options (dict): container holding tuning information for the numerical calibration routine Returns: bool: success of the calibration study. """ # Apply the same steps as in __call__ to all images, until before balancing is applied. # Prepare calibration and determine fixed data images_diff = [self._subtract_background(img) for img in images] # Extract monochromatic version and take difference wrt the baseline image images_signal = [self._extract_scalar_information(diff) for diff in images_diff] # Clean signal images_clean_signal = [self._clean_signal(signal) for signal in images_signal] # The missing steps: balancing, restoration, and the model are not applied. # Instead, the balancing and a lightweight restoration will be part of # a calibration routine. if not hasattr(self, "optimize_balancing"): raise NotImplementedError( """The concentration analysis is not equipped with a calibration model for balancing.""" ) opt_result, opt_success = self.optimize_balancing(images_clean_signal, options) # Report results if opt_success: print( f"Calibration successful with obtained model parameters {opt_result}." ) else: print("Calibration not successful.") # Update model self.update_balancing_for_calibration(opt_result, options) return opt_success
[docs] class ContinuityBasedBalancingCalibrationMixin(AbstractBalancingCalibration): """ Calibration balancing based on reducing jumps over interfaces for a given labeled image. Has to be combined with ConcentrationAnalysis. """ # ! ---- Main routine
[docs] def optimize_balancing( self, images: list[np.ndarray], options: dict, ) -> tuple[np.ndarray, bool]: """ Define objective function such that the root is the min. Args: input_images (list of np.ndarray): input for _convert_signal images_diff (list of np.ndarray): plain differences wrt background image relative_times (list of float): times options (dict): dictionary with objective value, here the injection rate Returns: np.ndarray: optimized model parameters bool: success flag """ # ! ---- Safety check if not isinstance(self.balancing, darsia.HeterogeneousLinearModel): raise NotImplementedError( """Balancing optimization only implemented for darsia.HeterogeneousModel.""" ) # ! ---- Perform (expensive) setup and cache # Find thick contours self._scan_labeled_image(options) # Find the balancing aiming by investigating interfaces # and populate the answer to the segments. optimized_scaling = self._least_squares_minimizer(images, options) # Enhance parameters with the same offset as already set optimized_offset = self.balancing._offset # Collect results optimized_parameters = np.hstack((optimized_scaling, optimized_offset)) success = True return optimized_parameters, success
# ! ---- Auxiliary routines. def _scan_labeled_image(self, options: dict) -> None: """ Find thick contours of labeled image. Args: options (dict): dictionary with possibility to tune the definition and detection of a thick contour. """ # Cache label info self.unique_labels = np.unique(self.labels) self.num_labels = len(self.unique_labels) # Check if required information is stored already segmentation_contour_path = Path( options.get("contour path", "cache/contours.npy") ) if segmentation_contour_path.exists(): # Read data from cache cached_segmentation_contour = np.load( segmentation_contour_path, allow_pickle=True ).item() self.contour_mask = cached_segmentation_contour["contour_mask"] self.label_couplings = cached_segmentation_contour["label_couplings"] self.coupling_strength = cached_segmentation_contour["coupling_strength"] else: # Generate data from scratch # Setup parameters contour_thickness = options.get("contour_thickness", 10) contour_overlap_threshold = options.get("contour_overlap_threshold", 1000) # Final assignment of contours contour_mask = {} for label in self.unique_labels: contour_mask[label] = self._labeled_mask_to_contour_mask( self.labels == label, contour_thickness ) # User interact. Provide possibility to hardcode label_couplings to be optimized. excluded_couplings = options.get("exclude couplings", []) only_couplings = options.get("only couplings", None) # Determine common intersection of thick contours shared by neighboring segments # Find relevant couplings of masked labels. coupling_strength = [] label_couplings = [] for label1 in self.unique_labels: for label2 in self.unique_labels: # Consider only directed pairs if label1 < label2: coupling = (label1, label2) # User interaction. Possibilities: # 1. Restrict to a provided list of label couplings. # 2. Exclude particular couplings. if only_couplings is not None: if ( coupling not in only_couplings and tuple(reversed(coupling)) not in only_couplings ): continue elif coupling in excluded_couplings: continue # Check if labeled regions share significant part of contour elif ( np.count_nonzero( np.logical_and( contour_mask[label1], contour_mask[label2] ) ) > contour_overlap_threshold ): # Track coupling label_couplings.append(coupling) # Determine the coupling strength depending on the size of the # overlap (later adjusted relatively to the global data). # Approximation of the size of the interface in terms of the # number of pixels for the thick contour. coupling_strength.append( np.count_nonzero( np.logical_and( contour_mask[label1], contour_mask[label2] ) ) ) # Cache the central objects self.contour_mask = contour_mask self.label_couplings = label_couplings self.coupling_strength = [ cs / max(coupling_strength) for cs in coupling_strength ] # Store to file segmentation_contour_path.parents[0].mkdir(parents=True, exist_ok=True) np.save( segmentation_contour_path, { "contour_mask": self.contour_mask, "label_couplings": self.label_couplings, "coupling_strength": self.coupling_strength, }, ) # Define thick contours for all labels def _labeled_mask_to_contour_mask( self, labeled_mask: np.ndarray, thickness: int ) -> np.ndarray: """ # Auxiliary method in _scan_labeled_image. Starting from a boolean array identifying a region, find the contours with a user-defined bandwidth. Args: labeled_mask (np.ndarray): boolean array identifying a connected region. thickness (int): contour thickness obtained through dilation Returns: np.ndarray: boolean array identifying a band width of the contours """ # Determine the contours of the labeled mask contours, _ = cv2.findContours( skimage.img_as_ubyte(labeled_mask), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE, ) # Extract the contour as mask contour_mask = np.zeros(self.base.img.shape[:2], dtype=bool) for c in contours: c = (c[:, 0, 1], c[:, 0, 0]) contour_mask[c] = True # Dilate to generate a thick contour contour_mask = skimage.morphology.binary_dilation( contour_mask, np.ones((thickness, thickness), np.uint8) ) # Convert to boolean mask contour_mask = skimage.img_as_bool(contour_mask) return contour_mask def _least_squares_minimizer( self, images: list[np.ndarray], options: dict ) -> np.ndarray: """ Args: images (list of np.ndarray): signals which in principle went through ConcentrationAnalysis.__call__() including ConcentrationAnalysis._convert_signal() Returns: np.ndarray: suggested heterogeneous scaling """ # Strategy: Quantify the discontinuity jump of the signal at # all boundaries between different segments. These are stored # in interace_ratio_container. These ratios will be used to # define a segment-wise scaling factor. To transfer the infos # on interfaces to segments a least-squares problem is solved. # ! ---- Initialization # Initialize collection of interface ratios with empty lists, # as well as flag of trustable information by identifying # none of the coupling as trustworthy. interface_ratio_container: dict = {} trustable_summary = {} for coupling in self.label_couplings: interface_ratio_container[coupling] = [] trustable_summary[coupling] = False # ! ---- Interface analysis # Read tuning parameters from options mean_thresh = options.get("mean thresh") median_disk_radius = options.get("median disk radius") # From the labels determine the region properties regionprops = skimage.measure.regionprops(self.labels) # Find a suitable segmentation_scaling vector for each separate image. for signal in images: # Define the segment-wise median median = np.zeros(signal.shape[:2], dtype=np.uint8) for regionprop in regionprops: # Get mask mask = self.labels == regionprop.label # determine bounding box for labels == label, from regionprops bbox = regionprop.bbox # Define the bbox as roi roi = (slice(bbox[0], bbox[2]), slice(bbox[1], bbox[3])) # Extract image and mask in roi mask_roi = mask[roi] signal_roi = signal[roi] # Determine median on roi median_label_roi = skimage.filters.rank.median( skimage.img_as_ubyte(signal_roi), skimage.morphology.disk(median_disk_radius), mask=mask_roi, ) # Remove signal outside the mask median_label_roi[~mask_roi] = 0 # Extend to full image median[roi] += median_label_roi # Make discontinuity analysis and obtain interface ratios for this image. # Find ratio of mean median values for each boundary. # The idea will be that this ratio will be required # for one-sided scaling in order to correct for # discontinuities. Store the ratios in a dictionary. # To keep track of orientation, consider sorted pairs # of combinations. Also keep track which ratios are # trustable or not (only if sufficient information # provided). interface_ratio = {} trustable = {} for coupling in self.label_couplings: # Fetch the single labels label1, label2 = coupling # Fetch the common boundary common_boundary = np.logical_and( self.contour_mask[label1], self.contour_mask[label2] ) # Restrict the common boundary to the separate subdomains roi1 = np.logical_and(common_boundary, self.labels == label1) roi2 = np.logical_and(common_boundary, self.labels == label2) # Consider the mean of the median of the signal on the separate bands # TODO Uniformly split contours in N segments and determine # jump in an integral sense, instead of throwing away the spatial info. mean1 = np.mean(median[roi1]) mean2 = np.mean(median[roi2]) # Check whether this result can be trusted - require sufficient signal. trustable[coupling] = min(mean1, mean2) >= mean_thresh # Define the ratio / later scaling - if value not trustable (for # better usability) choose ratio equal to 1 interface_ratio[coupling] = mean1 / mean2 if trustable[coupling] else 1 # Keep only trustable information and collect. for coupling, ratio in interface_ratio.items(): if trustable[coupling]: interface_ratio_container[coupling].append(ratio) trustable_summary[coupling] = True # ! ---- Summarize analysis # Reduce multiple data points per interface to single data point # by simply taking the mean. summarized_interface_ratio = {} for coupling in self.label_couplings: if trustable_summary[coupling]: summarized_interface_ratio[coupling] = np.mean( np.array(interface_ratio_container[coupling]) ) else: # Aim at equalizing scaling parameters summarized_interface_ratio[coupling] = 1.0 # Fix the lowest trusted label - requires that the trusted interfaces # define a connected graph. lowest_trusted_label = np.min( [coupling[0] for coupling in trustable_summary.keys()] ) # ! ---- Assembly: Define conditions combining the interfaces # Based on the interface ratios, build a linear (overdetermined) system, # which characterizes the optimal scaling. matrix = np.zeros((0, self.num_labels), dtype=float) rhs = np.zeros((0, 1), dtype=float) num_constraints = 0 # Four main contributions: # 1. Constraint: Add weight on fixing one label # 2. Constraint: Relate label groups # 3. Regularization: Weak # 4. Interface ratios ############################################################################ # 1. Fix reference labels, here chosen to be the label with lowest label id for label in [lowest_trusted_label]: basis_vector = np.zeros((1, self.num_labels), dtype=float) basis_vector[0, label] = 1 matrix = np.vstack((matrix, basis_vector)) rhs = np.vstack((rhs, np.array([[1]]))) num_constraints += 1 ############################################################################ # 2. Add weak constraint and connect similar labels. label_groups = options.get("label groups", None) similarity_weight = 1 # NOTE: Other values could add more weight for group in label_groups: if len(group) > 1: for coupling in list(combinations(group, 2)): label1, label2 = coupling similarity_balance = np.zeros((1, self.num_labels), dtype=float) similarity_balance[0, label1] = similarity_weight similarity_balance[0, label2] = -similarity_weight matrix = np.vstack((matrix, similarity_balance)) rhs = np.vstack((rhs, np.array([0]))) num_constraints += 1 ############################################################################ # 3. Add a weak constraint on all parameters to stay close to 1 # removing possible singularity. regularization_parameter = 0.0 for label in self.unique_labels: basis_vector = np.zeros((1, self.num_labels), dtype=float) basis_vector[0, label] = regularization_parameter matrix = np.vstack((matrix, basis_vector)) rhs = np.vstack((rhs, np.array([[regularization_parameter]]))) num_constraints += 1 ############################################################################ # 4. Add trusted couplings and main information on interface ratios.. for coupling in self.label_couplings: label1, label2 = coupling scaling_balance = np.zeros((1, self.num_labels), dtype=float) scaling_balance[0, label1] = summarized_interface_ratio[coupling] scaling_balance[0, label2] = -1 matrix = np.vstack((matrix, scaling_balance)) rhs = np.vstack((rhs, np.array([0]))) # Scale matrix and rhs with coupling strength to prioritize significant interfaces matrix[num_constraints:, :] = np.matmul( np.diag(self.coupling_strength), matrix[num_constraints:, :] ) rhs[num_constraints:] = np.matmul( np.diag(self.coupling_strength), rhs[num_constraints:] ) # ! ---- Obtain parameters as least squares solution # Determine suitable scaling by solving the overdetermined system using # a least-squares approach. segmentation_scaling = np.linalg.lstsq(matrix, np.ravel(rhs), rcond=None)[0] # ! ---- User interaction verbosity = options.get("verbosity", 0) if verbosity > 0: # Print the solution print(f"Computed segmentation scaling: {segmentation_scaling}") plt.figure() plt.imshow(self.labels) scaling_image = np.zeros(self.labels.shape[:2], dtype=float) for label in range(self.num_labels): mask = self.labels == label scaling_image[mask] = segmentation_scaling[label] plt.figure() plt.imshow(scaling_image) plt.show() return segmentation_scaling