"""Contains the EquationSystem, managing variables and equations for a system modelled
using the AD framework.
"""
from __future__ import annotations
from typing import Any, Callable, Literal, Optional, Sequence, Union
import numpy as np
import scipy.sparse as sps
from typing_extensions import TypeAlias
import porepy as pp
from . import _ad_utils
from .operators import MixedDimensionalVariable, Operator, Variable
__all__ = ["EquationSystem"]
GridLike = Union[pp.Grid, pp.MortarGrid]
"""A union type representing a domain either by a grid or mortar grid.
FIXME: Rename to Domain? Or GridLikeList/GridList below?"""
# For Python3.8, a direct definition of type aliases with list is apparently not posible
# (DomainList = Union[list[pp.Grid], list[pp.MortarGrid]]]), the same applies to dict
# and presumably tuple etc. As a temporary solution, we use a TypeAlias together with a
# string representation of the type. This can be replaced with the more straightforward
# definition when we drop support for Python3.8.
DomainList: TypeAlias = "Union[list[pp.Grid], list[pp.MortarGrid]]"
"""A union type representing a list of grids or mortar grids.
This is *not* a list of GridLike, as that would allow a list of mixed grids and
mortar grids."""
VariableList: TypeAlias = (
"Union[list[str], list[Variable], list[MixedDimensionalVariable]]"
)
"""A union type representing variables
Variables are defined through either
- names (:class:`str`),
- multiple :class:`~porepy.numerics.ad.operators.Variable` or
- :class:`~porepy.numerics.ad.operators.MixedDimensionalVariable`.
This type is accepted as input to various methods and parsed to a list of
:class:`~porepy.numerics.ad.operators.Variable` using
:meth:`~porepy.numerics.ad.equation_system.EquationSystem._parse_variable_list`.
"""
EquationList: TypeAlias = "Union[list[str], list[Operator]]"
"""A union type representing equations through either names (:class:`str`), or
:class:`~porepy.numerics.ad.operators.Operator`.
This type is accepted as input to various methods and parsed to a list of
:class:`~porepy.numerics.ad.operators.Operator` using
:meth:`~porepy.numerics.ad.equation_system.EquationSystem._parse_equations`.
"""
EquationRestriction: TypeAlias = "dict[Union[str, Operator], DomainList]"
"""A dictionary mapping equations to a list of domains on which the equation should be
applied.
The keys of the dictionary can be either the name of the equation, or the equation
itself represented as an :class:`~porepy.numerics.ad.operators.Operator`. The values of
the dictionary are DomainList, i.e., a list of grids or mortar grids.
This type is accepted as input to various methods and parsed to an index set
representing a restricted image of the equation by
:meth:`~porepy.numerics.ad.equation_system.EquationSystem._parse_equations`.
"""
# IMPLEMENTATION NOTE: EK could not find an elegant way to represent all types of
# equation input in a single type. The problem is that, while strings and Operators
# are naturally wrapped in lists, even if there is only one item, restrictions of
# equations are most naturally represented as a dictionary. This means iteration over
# equations and restrictions must be handled separately, as is now done in
# _parse_equations(). To avoid this, we could have introduced
#
# EquationType = Union[str, Operator, dict[Union[str, Operator], DomainList]]
#
# and allowed for list[EquationType] as input to various methods. This does however
# require passing a list of dictionaries to _parse_equations(), which was very
# awkward from the user side when EK tried it. The current solution seems like a fair
# compromise, and it has the positive side of being explicit on the difference between
# equations and restrictions of equations, but it does not feel like a fully
# satisfactory solution.
GridEntity = Literal["cells", "faces", "nodes"]
"""A union type representing a grid entity, either a cell, face or node.
This is used to define the domain of a variable or an equation,
i.e. whether it is defined on cells, faces or nodes.
"""
[docs]
class EquationSystem:
"""Represents an equation system, modelled by AD variables and equations in AD form.
This class provides functionalities to create and manage variables, as well as
managing equations on the form of :class:`~porepy.numerics.ad.operators.Operator`.
It further provides functions to assemble subsystems and using subsets of equations
and variables.
Note:
As of now, the system matrix (Jacobian) is assembled with respect to ALL
variables and then the columns belonging to the requested subset of variables
and grids are sliced out and returned. This will be optimized with minor changes
to the AD operator class and its recursive forward AD mode in the future.
Currently, this class optimizes the block structure of the Jacobian only
regarding the subdomains and interfaces. A more localized optimization (e.g.
cell-wise for equations without spatial differential operators) is not
performed.
"""
admissible_dof_types: tuple[
Literal["cells"], Literal["faces"], Literal["nodes"]
] = ("cells", "faces", "nodes")
"""A set denoting admissible types of local DOFs for variables.
- nodes: DOFs per grid node.
- cells: DOFs per grid cell.
- faces: DOFS per grid face.
"""
def __init__(self, mdg: pp.MixedDimensionalGrid) -> None:
### PUBLIC
self.mdg: pp.MixedDimensionalGrid = mdg
"""Mixed-dimensional domain passed at instantiation."""
self.assembled_equation_indices: dict[str, np.ndarray] = dict()
"""Contains the row indices in the last assembled (sub-) system for a given
equation name (key).
This dictionary changes with every call to any assemble-method.
"""
### PRIVATE
self._equations: dict[str, Operator] = dict()
"""Contains references to equations in AD operator form for a given name (key).
Private to avoid users setting equations directly and circumventing the current
set-method which includes information about the image space.
"""
self._equation_image_space_composition: dict[
str, dict[GridLike, np.ndarray]
] = dict()
"""Definition of image space for all equations.
Contains for every equation name (key) a dictionary, which provides again for
every involved grid (key) the indices of equations expressed through the
equation operator. The ordering of the items in the grid-array dictionaries is
consistent with the remaining PorePy framework. The ordering is local to the
equation, so it can be used to slice an eqution prior to concatenation of
equations into a global matrix.
"""
self._equation_image_size_info: dict[str, dict[GridEntity, int]] = dict()
"""Contains for every equation name (key) the number of equations per grid
entity.
"""
self._variables: list[Variable] = list()
"""Contains references to Variables.
A Variable is uniquely identified by its name and domain, stored as attributes
of the Variable object.
"""
self._Schur_complement: Optional[tuple] = None
"""Contains block matrices and the rhs of the last assembled Schur complement.
"""
self._variable_numbers: dict[Variable, int] = dict()
"""Dictionary containing the index of the variable in the system vector of the
last assembled system.
"""
self._variable_num_dofs: np.ndarray = np.array([], dtype=int)
"""Array containing the number of DOFS per block number.
The block number corresponds to this array's indexation, see also
attr:`_variable_numbers`.
"""
self._variable_dof_type: dict[Variable, dict[GridEntity, int]] = dict()
"""Dictionary containing the type of DOFs per variable.
The type is given as a dictionary with keys 'cells', 'faces' or 'nodes',
and integer values denoting the number of DOFs per grid entity.
"""
[docs]
def SubSystem(
self,
equation_names: Optional[EquationList] = None,
variable_names: Optional[VariableList] = None,
) -> EquationSystem:
"""Creates an :class:`EquationSystem` for a given subset of equations and
variables.
Currently only subsystems containing *whole* equations and variables in the
mixed-dimensional sense can be created. Restrictions of equations to subdomains
is not supported.
Parameters:
equation_names: Names of equations for the new subsystem. If None, all
equations known to the :class:`EquationSystem` are used.
variable_names: Names of known variables for the new subsystem. If None, all
variables known to the :class:`EquationSystem` are used.
Returns:
A new instance of :class:`EquationSystem`. The subsystem equations and
variables are ordered as imposed by this systems's order.
Raises:
ValueError: if passed names are not among created variables and set
equations.
"""
# Parsing of input arguments.
equations = list(self._parse_equations(equation_names).keys())
variables = self._parse_variable_type(variable_names)
# Check that the requested equations and variables are known to the system.
known_equations = set(self._equations.keys())
unknown_equations = set(equations).difference(known_equations)
if len(unknown_equations) > 0:
raise ValueError(f"Unknown variable(s) {unknown_equations}.")
unknown_variables = set(variables).difference(self._variables)
if len(unknown_variables) > 0:
raise ValueError(f"Unknown variable(s) {unknown_variables}.")
# Create the new subsystem.
new_equation_system = EquationSystem(self.mdg)
# IMPLEMENTATION NOTE: This method imitates the variable creation and equation
# setting procedures by calling private methods and accessing private
# attributes. This should be acceptable since this is a factory method.
# Loop over known variables to preserve DOF order.
for variable in self._variables:
if variable in variables:
# Update variables in subsystem.
new_equation_system._variables.append(variable)
# Update variable numbers in subsystem.
new_equation_system._variable_dof_type[
variable
] = self._variable_dof_type[variable]
# Create dofs in subsystem.
new_equation_system._append_dofs(variable)
# Loop over known equations to preserve row order.
for name in known_equations:
if name in equations:
equation = self._equations[name]
image_info = self._equation_image_size_info[name]
image_composition = self._equation_image_space_composition[name]
# et the information produced in set_equations directly.
new_equation_system._equation_image_space_composition.update(
{name: image_composition}
)
new_equation_system._equation_image_size_info.update({name: image_info})
new_equation_system._equations.update({name: equation})
return new_equation_system
@property
def equations(self) -> dict[str, Operator]:
"""Dictionary containing names of operators (keys) and operators (values), which
have been set as equations in this system.
"""
return self._equations
@property
def variables(self) -> list[Variable]:
"""List containing all :class:`~porepy.numerics.ad.Variable`s known to this
system.
"""
return self._variables
@property
def variable_domains(self) -> list[pp.GridLike]:
"""List containing all domains where at least one variable is defined."""
domains = set()
for var in self._variables:
domains.add(var.domain)
return list(domains)
### Variable management ------------------------------------------------------------
[docs]
def md_variable(
self, name: str, grids: Optional[DomainList] = None
) -> MixedDimensionalVariable:
"""Create a mixed-dimensional variable for a given name-domain list combination.
Parameters:
name (str): Name of the mixed-dimensional variable.
grids (optional): List of grids where the variable is defined. If None
(default), all grids where the variable is defined are used.
Returns:
A mixed-dimensional variable.
Raises:
ValueError: If variables name exist on both grids and interfaces and domain
type is not specified (grids is None).
"""
if grids is None:
variables = [var for var in self._variables if var.name == name]
# We don't allow combinations of variables with different domain types
# in a md variable.
heterogeneous_domain = False
if isinstance(variables[0].domain, pp.Grid):
heterogeneous_domain = any(
[isinstance(var.domain, pp.MortarGrid) for var in variables]
)
elif isinstance(variables[0].domain, pp.MortarGrid):
heterogeneous_domain = any(
[isinstance(var.domain, pp.Grid) for var in variables]
)
else:
raise ValueError("Unknown domain type for variable")
if heterogeneous_domain:
raise ValueError(
f"Variable {name} is defined on multiple domain types."
)
else:
variables = [
var
for var in self._variables
if var.name == name and var.domain in grids
]
return MixedDimensionalVariable(variables)
[docs]
def create_variables(
self,
name: str,
dof_info: Optional[dict[GridEntity, int]] = None,
subdomains: Optional[list[pp.Grid]] = None,
interfaces: Optional[list[pp.MortarGrid]] = None,
tags: Optional[dict[str, Any]] = None,
) -> MixedDimensionalVariable:
"""Creates new variables according to specifications.
This method does not assign any values to the variable. This has to be done in a
subsequent step (e.g. using :meth:`set_var_values`).
Examples:
An example on how to define a pressure variable with cell-wise one DOF
(default) on **all** subdomains and **no** interfaces would be
.. code:: Python
p = ad_system.create_variables('pressure', subdomains=mdg.subdomains())
Parameters:
name: Name of the variable.
dof_info: Dictionary containing information about number of DOFs per
admissible type. Defaults to ``{'cells':1}``.
subdomains (optional): List of subdomains on which the variable is defined.
If None, then it will not be defined on any subdomain.
interfaces (optional): list of interfaces on which the variable is defined.
If None, then it will not be defined on any interface.
tags (optional): dictionary containing tags for the variables. The tags are
assigned to all variables created by this method and can be updated
using :meth:`update_variable_tags`.
Returns:
A mixed-dimensional variable with above specifications.
Raises:
ValueError: If non-admissible DOF types are used as local DOFs.
ValueError: If one attempts to create a variable not defined on any grid.
KeyError: If a variable with given name is already defined.
"""
# Set default value for dof_info. This is a mutable object, so we need to
# create a new one each time and not set the default in the signature.
if dof_info is None:
dof_info = {"cells": 1}
# Sanity check for admissible DOF types.
requested_type = set(dof_info.keys())
if not requested_type.issubset(set(self.admissible_dof_types)):
non_admissible = requested_type.difference(self.admissible_dof_types)
raise ValueError(f"Non-admissible DOF types {non_admissible} requested.")
# Sanity check if variable is defined anywhere.
if subdomains is None and interfaces is None:
raise ValueError(
"Cannot create variable not defined on any subdomain or interface."
)
# Container for all grid variables.
variables = []
# Merge subdomains and interfaces into a single list.
assert subdomains is not None or interfaces is not None # for mypy
grids: Sequence[pp.GridLike] = subdomains if subdomains else interfaces # type: ignore
# Check if a md variable was already defined under that name on any of grids.
for var in self.variables:
if var.name == name and var.domain in grids:
raise KeyError(f"Variable {name} already defined on {var.domain}.")
if grids:
for grid in grids:
if subdomains:
assert isinstance(grid, pp.Grid) # mypy
data = self.mdg.subdomain_data(grid)
else:
assert isinstance(grid, pp.MortarGrid) # mypy
data = self.mdg.interface_data(grid)
# Prepare data dictionary if this was not done already.
if pp.STATE not in data:
data[pp.STATE] = dict()
if pp.ITERATE not in data[pp.STATE]:
data[pp.STATE][pp.ITERATE] = dict()
# Create grid variable.
new_variable = Variable(name, dof_info, domain=grid, tags=tags)
# Store it in the system
variables.append(new_variable)
self._variables.append(new_variable)
# Append the new DOFs to the global system.
self._variable_dof_type[new_variable] = dof_info
self._append_dofs(new_variable)
# Create an md variable that wrapps all the individual variables created on
# individual grids.
merged_variable = MixedDimensionalVariable(variables)
return merged_variable
[docs]
def get_variables(
self,
variables: Optional[VariableList] = None,
grids: Optional[list[pp.GridLike]] = None,
tag_name: Optional[str] = None,
tag_value: Optional[Any] = None,
) -> list[Variable]:
"""Filter variables based on grid, tag name and tag value.
Particular usage: calling without arguments will return all variables in the
system.
Parameters:
variables: List of variables to filter. If None, all variables in the system
are included. Variables can be given as a list of variables, mixed-
dimensional variables, or variable names (strings).
grids: List of grids to filter on. If None, all grids are included.
tag_name: Name of the tag to filter on. If None, no filtering on tags.
tag_value: Value of the tag to filter on. If None, no filtering on tag
values. If tag_name is not None, but tag_value is None, all variables
with the given tag_name are returned regardless of value.
Returns:
List of filtered variables.
"""
# Shortcut for efficiency.
# The same behavior is achieved without this, but it is slower.
if (
variables is None
and grids is None
and tag_name is None
and tag_value is None
):
return self._variables
# If no variables or grids are given, use full sets.
if variables is None:
variables = self._variables
if grids is None:
# Note: This gives all grids known to variables, not all grids in the
# md grid. The result of the filtering will be the same, though.
grids = self.variable_domains
filtered_variables = []
variables = self._parse_variable_type(variables)
for var in variables:
if var.domain in grids:
# Add variable if tag_name is not specified or if the variable has the
# tag and the tag value matches the requested value.
if tag_name is None:
filtered_variables.append(var)
elif tag_name in var.tags:
if tag_value is None or var.tags[tag_name] == tag_value:
filtered_variables.append(var)
return filtered_variables
[docs]
def get_variable_values(
self, variables: Optional[VariableList] = None, from_iterate: bool = False
) -> np.ndarray:
"""Assembles an array containing values for the passed variable-like argument.
The global order is preserved and independent of the order of the argument.
Parameters:
variables (optional): VariableType input for which the values are
requested. If None (default), the global vector of unknowns is returned.
from_iterate (optional): flag to return values stored as ITERATE,
instead of STATE (default).
Returns:
The respective (sub) vector in numerical format, size anywhere between 0 and
:meth:`num_dofs`.
Raises:
KeyError: If no values are stored for the VariableType input.
ValueError: If unknown VariableType arguments are passed.
"""
variables = self._parse_variable_type(variables)
# Storage for atomic blocks of the sub vector (identified by name-grid pairs).
values = []
# Loop over all blocks and process those requested.
# This ensures uniqueness and correct order.
for variable in self._variable_numbers:
if variable in variables:
name = variable.name
grid = variable.domain
if isinstance(grid, pp.Grid):
data = self.mdg.subdomain_data(grid)
elif isinstance(grid, pp.MortarGrid):
data = self.mdg.interface_data(grid)
# Extract a copy of requested values.
try:
if from_iterate:
values.append(data[pp.STATE][pp.ITERATE][name].copy())
else:
values.append(data[pp.STATE][name].copy())
except KeyError:
raise KeyError(
f"No values stored for variable {name}, "
f"from_iterate={from_iterate}"
f"\non grid {grid}."
)
# If there are matching blocks, concatenate and return.
if values:
return np.concatenate(values)
# Else return an empty vector.
else:
return np.array([])
[docs]
def set_variable_values(
self,
values: np.ndarray,
variables: Optional[VariableList] = None,
to_state: bool = False,
to_iterate: bool = False,
additive: bool = False,
) -> None:
"""Sets values for a (sub) vector of the global vector of unknowns.
The order of values is assumed to fit the global order.
Note:
The vector is assumed to be of proper size and will be dissected according
to the global order, starting with the index 0.
Mismatches of is-size and should-be-size according to the subspace specified
by ``variables`` will raise respective errors by numpy.
Parameters:
values: Vector of size corresponding to number of DOFs of the specified
variables.
variables (optional): VariableType input for which the values are
requested. If None (default), the global vector of unknowns will be
set.
to_state (optional): Flag to write values to ``pp.STATE``.
to_iterate (optional): Flag to write values to ``pp.ITERATE``.
additive (optional): Flag to write values additively. To be used in
iterative procedures.
Raises:
ValueError: If unknown VariableType arguments are passed.
"""
# Start of dissection.
dof_start = 0
dof_end = 0
variables = self._parse_variable_type(variables)
for variable, variable_number in self._variable_numbers.items():
if variable in variables:
name = variable.name
grid = variable.domain
num_dofs = int(self._variable_num_dofs[variable_number])
dof_end = dof_start + num_dofs
# Extract local vector.
# This will raise errors if indexation is out of range.
local_vec = values[dof_start:dof_end]
# Fetch the storage from the relevant dicitonary in the
# MixedDimensionalGrid.
if isinstance(grid, pp.Grid):
data = self.mdg.subdomain_data(grid)
elif isinstance(grid, pp.MortarGrid):
data = self.mdg.interface_data(grid)
# Data dict will have pp.STATE and pp.ITERATE entries already created
# during create_variables. If an error is returned here, a variable has
# been created in a non-standard way.
# Store new values as requested.
if additive:
if to_iterate:
# No need for a copy here, since we are adding to an existing
# array.
data[pp.STATE][pp.ITERATE][name] += local_vec
if to_state:
data[pp.STATE][name] += local_vec
else:
if to_iterate:
# The copy is critcial here.
data[pp.STATE][pp.ITERATE][name] = local_vec.copy()
if to_state:
data[pp.STATE][name] = local_vec.copy()
# Move dissection forward.
dof_start = dof_end
# Last sanity check if the vector was properly sized, or if it was too large.
# This imposes a theoretically unnecessary restriction on the input argument
# since we only require a vector of at least this size.
assert dof_end == values.size
### DOF management -----------------------------------------------------------------
def _append_dofs(self, variable: pp.ad.Variable) -> None:
"""Appends DOFs for a newly created variable.
Must only be called by :meth:`create_variables`.
Parameters:
variable: The newly created variable
"""
# number of totally created dof blocks so far
last_variable_number: int = len(self._variable_numbers)
# Sanity check that no previous data is overwritten. This should not happen,
# if class not used in hacky way.
assert variable not in self._variable_numbers
# Count number of dofs for this variable on this grid and store it.
# The number of dofs for each dof type defaults to zero.
local_dofs = self._variable_dof_type[variable]
# Both subdomains and interfaces have cell variables.
num_dofs = variable.domain.num_cells * local_dofs.get("cells", 0)
# For subdomains, but not interfaces, we also need to account for faces and
# nodes.
if isinstance(variable.domain, pp.Grid):
num_dofs += variable.domain.num_faces * local_dofs.get(
"faces", 0
) + variable.domain.num_nodes * local_dofs.get("nodes", 0)
# Update the global dofs and block numbers
self._variable_numbers.update({variable: last_variable_number})
self._variable_num_dofs = np.concatenate(
[self._variable_num_dofs, np.array([num_dofs], dtype=int)]
)
# first optimization of Jacobian structure
self._cluster_dofs_gridwise()
def _cluster_dofs_gridwise(self) -> None:
"""Re-arranges the DOFs grid-wise s.t. we obtain grid-blocks in the column sense
and reduce the matrix bandwidth.
The aim is to impose a more block-diagonal-like structure on the Jacobian where
blocks in the column sense represent single grids in the following order:
Note:
Off-diagonal blocks will still be present if subdomain-interface variables
are defined.
1. For each grid in ``mdg.subdomains``
1. For each variable defined on that grid
2. For each grid in ``mdg.interfaces``
1. For each variable defined on that mortar grid
The order of variables per grid is given by the order of variable creation
(stored as order of keys in ``self.variables``).
This method is called after each creation of variables and respective DOFs.
TODO: Revisit. I think I have broken it by looping over _variables instead of
subdomains and interfaces.
"""
# Data stracture for the new order of dofs.
new_variable_counter: int = 0
new_variable_numbers: dict[Variable, int] = dict()
new_block_dofs: list[int] = list()
for variable in self._variables:
# If this variable-grid combination is present, add it to the new
# order of dofs.
if variable in self._variable_numbers:
# Extract created number of dofs
local_dofs: int = self._variable_num_dofs[
self._variable_numbers[variable]
]
# Store new block number and dofs in new order.
new_block_dofs.append(local_dofs)
new_variable_numbers.update({variable: new_variable_counter})
new_variable_counter += 1
# Replace old block order
self._variable_num_dofs = np.array(new_block_dofs, dtype=int)
self._variable_numbers = new_variable_numbers
def _parse_variable_type(self, variables: Optional[VariableList]) -> list[Variable]:
"""Parse the input argument for the variable type.
This method is used to parse the input argument for the variable type in
several exposed methods, allowing the user to specify a single variable or a
list of variables more flexibly.
There is no filtering of the variables, for instance:
- No assumptions should be made on the order of the parsed variables.
- The variable list is not uniquified; if the same variable is passed twice
(say, as a Variable and by its string), it will duplicated in the list of
parsed variables.
Parameters:
variables: The input argument for the variable type.
The following interpretation rules are applied:
- If None, return all variables.
- If a list of variables, return same.
- If a list of strings, return all variables with those names.
- If mixed-dimensional variable, return sub-variables.
Returns:
List of Variables.
"""
if variables is None:
return self.get_variables()
parsed_variables = []
assert isinstance(variables, list)
for variable in variables:
if isinstance(variable, MixedDimensionalVariable):
parsed_variables += variable.sub_vars
elif isinstance(variable, Variable):
parsed_variables.append(variable)
elif isinstance(variable, str):
# Use _variables to avoid recursion (get_variables() calls this method)
vars = [var for var in self._variables if var.name == variable]
parsed_variables += vars
else:
raise ValueError(
"Variable type must be a string or a Variable, not {}".format(
type(variable)
)
)
return parsed_variables
def _gridbased_variable_complement(self, variables: VariableList) -> list[Variable]:
"""Finds the grid-based complement of a variable-like structure.
The grid-based complement consists of all variables known to this
EquationSystem, but which are not in the passed list ``variables``.
TODO: Revisit. This method is not used anywhere, and I am not sure it is
correct/does what it is supposed to do.
"""
# strings and md variables represent always a whole in the variable sense. Hence,
# the complement is empty
if isinstance(variables, (str, MixedDimensionalVariable)):
# TODO: Can we drop this, or is it possible that a single variable has made
# it into this subroutine?
return list()
# non sequential var-like structure
else:
grid_variables = list()
for variable in variables:
# same processing as above, only grid variables are of interest
if isinstance(variable, Variable):
md_variable = self.md_variable(variable.name)
grid_variables += [
var
for var in md_variable.sub_vars
if var.domain != variable.domain
]
# return a unique collection
return list(set(grid_variables))
[docs]
def num_dofs(self) -> int:
"""Returns the total number of dofs managed by this system."""
return int(sum(self._variable_num_dofs)) # cast numpy.int64 into Python int
[docs]
def projection_to(self, variables: Optional[VariableList] = None) -> sps.csr_matrix:
"""Create a projection matrix from the global vector of unknowns to a specified
subspace.
The transpose of the returned matrix can be used to slice respective columns out
of the global Jacobian.
The projection preserves the global order defined by the system, i.e. it
includes no permutation.
Parameters:
variables (optional): VariableType input for which the subspace is
requested. If no subspace is specified using ``variables``,
a null-space projection is returned.
Returns:
a sparse projection matrix of shape ``(M, num_dofs)``, where
``0 <= M <= num_dofs``.
"""
# current number of total dofs
num_dofs = self.num_dofs()
if variables:
# Array for the indices associated with argument.
# The sort is needed so as not to permute the columns of the projection.
indices = np.sort(self.dofs_of(variables))
# case where no dofs where found for the VariableType input
if len(indices) == 0:
return sps.csr_matrix((0, num_dofs))
else:
subspace_size = indices.size
return sps.coo_matrix(
(np.ones(subspace_size), (np.arange(subspace_size), indices)),
shape=(subspace_size, num_dofs),
).tocsr()
# Case where the subspace is null, i.e. no variables specified
else:
return sps.csr_matrix((0, num_dofs))
[docs]
def dofs_of(self, variables: VariableList) -> np.ndarray:
"""Get the indices in the global vector of unknowns belonging to the variable(s).
Parameters:
variables: VariableType input for which the indices are requested.
Returns:
an order-preserving array of indices of DOFs belonging to the VariableType input.
Raises:
ValueError: if unknown VariableType arguments are passed.
"""
variables = self._parse_variable_type(variables)
global_variable_dofs = np.hstack((0, np.cumsum(self._variable_num_dofs)))
# Storage of indices per requested variable.
indices = list()
for variable in variables:
var_number = self._variable_numbers[variable]
var_indices = np.arange(
global_variable_dofs[var_number],
global_variable_dofs[var_number + 1],
dtype=int,
)
indices.append(var_indices)
# Concatenate indices, if any
if len(indices) > 0:
return np.concatenate(indices, dtype=int)
else:
return np.array([], dtype=int)
[docs]
def identify_dof(self, dof: int) -> Variable:
"""Identifies the variable to which a specific DOF index belongs.
The intended use is to help identify entries in the global vector or the column
of the Jacobian.
Parameters:
dof: a single index in the global vector.
Returns: the identified Variable object.
Raises:
KeyError: if the dof is out of range (larger than ``num_dofs`` or smaller
than 0).
"""
num_dofs = self.num_dofs()
if not (0 <= dof < num_dofs): # indices go from 0 to num_dofs - 1
raise KeyError("Dof index out of range.")
global_variable_dofs = np.hstack((0, np.cumsum(self._variable_num_dofs)))
# Find the variable number belonging to this index
variable_number = np.argmax(global_variable_dofs > dof) - 1
# Get the variable key from _variable_numbers
variable = [
var for var, num in self._variable_numbers.items() if num == variable_number
][0]
return variable
### Equation management -------------------------------------------------------------------
[docs]
def set_equation(
self,
equation: Operator,
grids: DomainList,
equations_per_grid_entity: dict[GridEntity, int],
) -> None:
"""Sets an equation using the passed operator and uses its name as an identifier.
If an equation already exists under that name, it is overwritten.
Information about the image space must be provided for now, such that grid-wise
row slicing is possible. This will hopefully be provided automatically in the
future.
Note:
Regarding the number of equations, this method assumes that the AD framework
assembles row blocks per grid in subdomains, then per grid in interfaces,
for each operator representing an equation. This is assumed to be the way
PorePy AD works.
Parameters:
equation: An equation in AD operator form, assuming the right-hand side is
zero and this instance represents the left-hand side.
grids: A list of subdomain *or* interface grids on which the equation is
defined.
equations_per_grid_entity: a dictionary describing how many equations
``equation_operator`` provides. This is a temporary work-around until
operators are able to provide information on their image space.
The dictionary must contain the number of equations per grid entity
(cells, faces, nodes) for the operator.
Raises:
ValueError: If the equation operator has a name already assigned to a
previously set equation.
ValueError: If the equation is defined on both subdomains and interfaces.
AssertionError: If the equation is defined on an unknown grid.
ValueError: If indicated number of equations does not match the actual
number as per evaluation of operator.
"""
# The grid list is changed in place, so we need to make a copy
grids = grids[:]
# The function loops over all grids the operator is defined on and calculate the
# number of equations per grid quantity (cell, face, node). This information
# is then stored together with the equation itself.
image_info: dict[GridLike, np.ndarray] = dict()
total_num_equ = 0
# The domain of this equation is the set of grids on which it is defined
name = equation.name
if name in self._equations:
raise ValueError(
"The name of the equation operator is already used by another equation:"
f"\n{self._equations[name]}"
"\n\nMake sure your equations are uniquely named."
)
# If no grids are specified, there is nothing to do
if not grids:
self._equation_image_space_composition.update({name: image_info})
# Information on the size of the equation, in terms of the grids it is defined
# on.
self._equation_image_size_info.update({name: equations_per_grid_entity})
# Store the equation itself.
self._equations.update({name: equation})
return
# We require that equations are defined either on a set of subdomains, or a set
# of interfaces. The combination of the two is mathematically possible, provided
# a sufficiently general notation is used, but the chances of this being
# misused is considered high compared to the benefits of allowing such combined
# domains, and we therefore disallow it.
all_subdomains = all([isinstance(g, pp.Grid) for g in grids])
all_interfaces = all([isinstance(g, pp.MortarGrid) for g in grids])
# Allow for no subdomains or interfaces here (case < 1). This is relevant for
# equations stated for general md problems, but on domains that happened not to
# have, e.g., fractures.
if not all_interfaces + all_subdomains <= 1:
raise AssertionError(
"An equation should not be defined on both subdomains and interfaces."
)
# We loop over the subdomains and interfaces in that order to assert a correct
# indexation according to the global order (for grid in sds, for grid in intfs).
# The user does not have to care about the order in grids.
for sd in self.mdg.subdomains():
if sd in grids:
# Equations on subdomains can be defined on any grid quantity.
num_equ_per_grid = int(
sd.num_cells * equations_per_grid_entity.get("cells", 0)
+ sd.num_nodes * equations_per_grid_entity.get("nodes", 0)
+ sd.num_faces * equations_per_grid_entity.get("faces", 0)
)
# Row indices for this grid, cast to integers.
block_idx = np.arange(num_equ_per_grid, dtype=int) + total_num_equ
# Cumulate total number of equations.
total_num_equ += num_equ_per_grid
# Store block idx per grid.
image_info.update({sd: block_idx})
# Remove the subdomain from the domain list.
# Ignore mypy error here, since we know that sd is in grids.
grids.remove(sd) # type: ignore
for intf in self.mdg.interfaces():
if intf in grids:
# Equations on interfaces can only be defined on cells.
num_equ_per_grid = int(
intf.num_cells * equations_per_grid_entity.get("cells", 0)
)
# Row indices for this grid, cast to integers.
block_idx = np.arange(num_equ_per_grid, dtype=int) + total_num_equ
# Cumulate total number of equations.
total_num_equ += num_equ_per_grid
# Store block idx per grid
image_info.update({intf: block_idx})
# Remove the grid from the domain list
# Ignore mypy error here, since we know that intf is in grids.
grids.remove(intf) # type: ignore
# Assert the equation is not defined on an unknown domain.
assert len(grids) == 0
# If all good, we store the information:
# The rows (referring to a global indexation) that this equation provides.
self._equation_image_space_composition.update({name: image_info})
# Information on the size of the equation, in terms of the grids it is defined
# on.
self._equation_image_size_info.update({name: equations_per_grid_entity})
# Store the equation itself.
self._equations.update({name: equation})
[docs]
def remove_equation(self, name: str) -> Operator | None:
"""Removes a previously set equation and all related information.
Returns:
A reference to the equation in operator form or None, if the equation is
unknown.
Raises:
ValueError: If an unknown equation is attempted removed.
"""
if name in self._equations:
# Remove the equation from the storage
equ = self._equations.pop(name)
# Remove the image space information.
# Note that there is no need to modify the numbering of the other equations,
# since this is a local (to the equation) numbering.
del self._equation_image_space_composition[name]
return equ
else:
raise ValueError(f"Cannot remove unknown equation {name}")
### System assembly and discretization ----------------------------------------------------
@staticmethod
def _recursive_discretization_search(operator: Operator, discr: list) -> list:
"""Recursive search in the tree of this operator to identify all discretizations
represented in the operator.
Parameters:
operator: top level operator to be searched.
discr: list storing found discretizations
"""
if len(operator.tree.children) > 0:
# Go further in recursion
for child in operator.tree.children:
discr += EquationSystem._recursive_discretization_search(child, list())
if isinstance(operator, _ad_utils.MergedOperator):
# We have reached the bottom; this is a discretization (example: mpfa.flux)
discr.append(operator)
return discr
def _parse_equations(
self, equations: Optional[EquationList | EquationRestriction] = None
) -> dict[str, None | np.ndarray]:
"""Helper method to parse equations into a properly ordered structure.
The equations will be ordered according to the order in self._equations (which
is the order in which they were added to the equation system manager and which
alsois fixed since iteration of dictionaries is so).
Parameters:
equations: A list of equations or a dictionary of equation restrictions.
Returns:
A dictionary with the index set of the restricted equations (referring to
equation rows) as values. If no restriction is given, the value is None.
"""
# The default return value is all equations with no grid restrictions.
if equations is None:
return dict((name, None) for name in self._equations)
# We need to parse the input.
# Storage for requested blocks, unique information per equation name.
requested_row_blocks = dict()
# Storage for restricted equations.
restricted_equations = dict()
# Get the row indices (in the global system) associated with this equation.
# If the equation is restricted (the user has provided a dictionary with
# grids on which the equation should be evaluated), the variable blocks
# will contain only the row indices associated with the restricted grids.
for equation in equations:
# Store restrictions, using different storage for restricted and
# unrestricted equations.
if isinstance(equations, dict):
block = self._parse_single_equation({equation: equations[equation]})
# A dictionary means the equation is restricted to a subset of grids.
restricted_equations.update(block)
else:
# This equation is not restricted to a subset of grids.
block = self._parse_single_equation(equation)
requested_row_blocks.update(block)
# Update the requested blocks with the restricted to overwrite the indices if
# an equation was passed in both restricted and unrestricted structure.
requested_row_blocks.update(restricted_equations)
# Build the restricted set of equations, using the order in self._equations.
# The latter is critical for ensuring determinism of the system.
ordered_blocks = dict()
for equation in self._equations:
# By now, all equations are contained in requested_row_blocks.
if equation in requested_row_blocks:
ordered_blocks.update({equation: requested_row_blocks[equation]})
return ordered_blocks
def _parse_single_equation(
self, equation: str | Operator | EquationRestriction
) -> dict[str, None | np.ndarray]:
"""Helper method to identify possible restrictions of a single equation.
Parameters:
equation: Equation to be parsed.
Returns:
A dictionary with the name of the equation as key and the corresponding
restricted indices as values. If no restriction is given, the value is None.
Raises:
ValueError: If an unknown equation name is requested.
ValueError: If an unknown operator is requested.
ValueError: If an equation is requested restricted to a grid on which it is
not defined.
TypeError: If the input is not an equation.
"""
# If the equation is a dictionary, the dictionary values are grids (subdomains
# or interfaces) that defines restrictions of the equation; these must be
# identified. If the equation is not a dictionary, there will be restriction.
# Equation represented by string - return the corresponding equation.
if isinstance(equation, str):
if equation not in self._equations:
raise ValueError(f"Unknown equation name {equation}.")
return {equation: None}
# Equation represented by Operator. Return the
elif isinstance(equation, Operator):
if equation.name not in self._equations:
raise ValueError(f"Unknown equation operator {equation}.")
# No restriction.
return {equation.name: None}
# Equations represented by dict with restriction to grids: get target row
# indices.
elif isinstance(equation, dict):
block: dict[str, None | np.ndarray] = dict()
for equ, grids in equation.items():
# equ is an identifier of the equation (either a string or an operator)
# grids is a list of grids (subdomains or interfaces) that defines
# a restriction of the equation domain.
# Translate equ into a name (string).
if isinstance(equ, Operator):
name = equ.name
if name not in self._equations:
raise ValueError(f"Unknown equation name {equation}.")
elif isinstance(equ, str):
name = equ
if name not in self._equations:
raise ValueError(f"Unknown equation operator {equation}.")
else:
raise TypeError(
f"Item ({type(equ)}, {type(grids)}) not parsable as equation."
)
# Get the indices associated with this equation.
img_info = self._equation_image_space_composition[name]
# Check if the user requests a properly defined subsets of the grids
# associated with this equation.
unknown_grids = set(grids).difference(set(img_info.keys()))
if len(unknown_grids) > 0:
# Getting an error here means the user has requested a grid that is
# not associated with this equation. This is not a meaningful
# operation.
raise ValueError(
f"Equation {name} not defined on grids {unknown_grids}"
)
# The indices (row indices in the global system) associated with this
# equation and the grids requested by the user.
block_idx: list[np.ndarray] = list()
# Loop over image space information to ensure correct order.
# Note that looping over the grids risks that the order does not
# correspond to the order in the equation was defined. This will surely
# lead to trouble down the line.
for grid in img_info:
if grid in grids:
block_idx.append(img_info[grid])
# If indices not empty, concatenate and return
if block_idx:
block.update({name: np.concatenate(block_idx, dtype=int)})
# indices should by logic always be found, if not alert the user.
else:
# TODO: Should this not be permissible, say, due to a filtering of
# the grids? However, it may lead to errors downstream.
raise TypeError(
f"Equation-like item ({type(equ)}, {type(grids)}) yielded no rows."
)
return block
else:
# Getting an error here means the user has passed a type that is not
# an equation.
raise TypeError(f"Type {type(equation)} not parsable as an equation.")
def _gridbased_equation_complement(
self, equations: dict[str, None | np.ndarray]
) -> dict[str, None | np.ndarray]:
"""Takes the information from equation parsing and finds for each equation
(identified by its name string) the indices which were excluded in the
grid-sense.
Parameters:
equations: Dictionary with equation names as keys and indices as values.
The indices are the indices of the rows in the global system that
were included in the last parsing of the equations.
Returns:
A dictionary with the name of the equation as key and the grid-complement
as values. If the complement is empty, the value is None.
"""
complement: dict[str, None | np.ndarray] = dict()
for name, idx in equations.items():
# If indices were filtered based on grids, we find the complementing
# indices.
# If idx is None, this means no filtering was done.
if idx is not None:
# Get the indices associated with this equation.
img_info = self._equation_image_space_composition[name]
# Ensure ordering and uniqueness of equation indexation.
img_values: list[np.ndarray] = list(img_info.values())
all_idx = np.unique(np.hstack(img_values))
# Complementing indices are found by deleting the filtered indices.
complement_idx = np.delete(all_idx, idx)
complement.update({name: complement_idx})
# If there was no grid-based row filtering, the complement is empty.
else:
complement.update({name: None})
return complement
[docs]
def discretize(
self, equations: Optional[EquationList | EquationRestriction] = None
) -> None:
"""Find and loop over all discretizations in the equation operators, extract
unique references and discretize.
This is more efficient than discretizing on the Operator level, since
discretizations which occur more than once in a set of equations will be
identified and only discretized once.
Parameters:
equations (optional): A subset of equations. If not provided (None), all
known equations will be discretized.
"""
equation_names = list(self._parse_equations(equations).keys())
# List containing all discretizations
discr: list = []
# TODO: the search can be done once (in some kind of initialization). Revisit
# this during update of the Ad machinery.
for name in equation_names:
# this raises a key error if a given equation name is unknown
eqn = self._equations[name]
# This will expand the list discr with new discretizations.
# The list may contain duplicates.
discr += self._recursive_discretization_search(eqn, list())
# Uniquify to save computational time, then discretize.
unique_discr = _ad_utils.uniquify_discretization_list(discr)
_ad_utils.discretize_from_list(unique_discr, self.mdg)
[docs]
def assemble(
self,
state: Optional[np.ndarray] = None,
) -> tuple[sps.spmatrix, np.ndarray]:
"""Assemble Jacobian matrix and residual vector of the whole system.
This is a shallow wrapper of :meth:`assemble_subsystem`. Here, the subsystem is
the complete set of equations, variables and grids.
Parameters:
state (optional): see :meth:`assemble_subsystem`. Defaults to None.
Returns:
Tuple containing
sps.spmatrix: Jacobian matrix corresponding to the targeted state.
The ordering of the equations (rows) is determined by the order the
equations were added. The DOFs (columns) are ordered according to the
global order.
np.ndarray: Residual vector corresponding to the targeted state, scaled
by -1 (moved to rhs).
"""
return self.assemble_subsystem(state=state)
[docs]
def assemble_subsystem(
self,
equations: Optional[EquationList | EquationRestriction] = None,
variables: Optional[VariableList] = None,
state: Optional[np.ndarray] = None,
) -> tuple[sps.spmatrix, np.ndarray]:
"""Assemble Jacobian matrix and residual vector using a specified subset of
equations, variables and grids.
The method is intended for use in splitting algorithms. Matrix blocks not
included will simply be sliced out.
Note:
The ordering of columns in the returned system are defined by the global
DOF order. The row blocks are in the same order as equations were added to
this system. If an equation is defined on multiple grids, the respective
row-block is internally ordered as given by the mixed-dimensional grid
(for sd in subdomains, for intf in interfaces).
The columns of the subsystem are assumed to be properly defined by
``variables``, otherwise a matrix of shape ``(M,)`` is returned. This
happens if grid variables are passed which are unknown to this
:class:`EquationSystem`.
Parameters:
equations (optional): a subset of equations to which the subsystem should be
restricted. If not provided (None), all equations known to this
:class:`EquationSystem` will be included.
The user can specify grids per equation (name) to which the subsystem
should be restricted in the row-sense. Grids not belonging to the domain
of an equation will raise an error.
variables (optional): VariableType input specifying the subspace in
column-sense. If not provided (None), all variables will be included.
state (optional): State vector to assemble from. By default, the stored
``pp.ITERATE`` or ``pp.STATE`` are used, in that order.
Returns:
Tuple with two elements
spmatrix: (Part of the) Jacobian matrix corresponding to the targeted
variable state, for the specified equations and variables.
ndarray: Residual vector corresponding to the targeted variable state,
for the specified equations. Scaled with -1 (moved to rhs).
"""
if variables is None:
variables = self._variables
# equ_blocks is a dictionary with equation names as keys and the corresponding
# row indices of the equations. If the user has requested that equations are
# restricted to a subset of grids, the row indices are restricted accordingly.
# If no such request has been made, the value is None.
equ_blocks: dict[str, np.ndarray | None] = self._parse_equations(equations)
# Data structures for building matrix and residual vector
mat: list[sps.spmatrix] = []
rhs: list[np.ndarray] = []
# Keep track of DOFs for each equation/block
ind_start = 0
self.assembled_equation_indices = dict()
# Iterate over equations, assemble.
# Also keep track of the row indices of each equation, and store it in
# assembled_equation_indices.
for equ_name, rows in equ_blocks.items():
# This will raise a key error if the equation name is unknown.
eq = self._equations[equ_name]
ad = eq.evaluate(self, state)
# If restriction to grid-related row blocks was made,
# perform row slicing based on information we have obtained from parsing.
if rows is not None:
mat.append(ad.jac.tocsr()[rows])
rhs.append(ad.val[rows])
block_length = len(rhs[-1])
# If no grid-related row restriction was made, append the whole thing.
else:
mat.append(ad.jac)
rhs.append(ad.val)
block_length = len(ad.val)
# Create indices range and shift to correct position.
block_indices = np.arange(block_length) + ind_start
# Extract last index and add 1 to get the starting point for next block of
# indices.
self.assembled_equation_indices.update({equ_name: block_indices})
if block_length > 0:
ind_start = block_indices[-1] + 1
# Concatenate results equation-wise.
if len(mat) > 0:
A = sps.vstack(mat, format="csr")
rhs_cat = np.concatenate(rhs)
else:
# Special case if the restriction produced an empty system.
A = sps.csr_matrix((0, self.num_dofs()))
rhs_cat = np.empty(0)
# Slice out the columns belonging to the requested subsets of variables and
# grid-related column blocks by using the transposed projection to respective
# subspace.
# Multiply rhs by -1 to move to the rhs.
column_projection = self.projection_to(variables).transpose()
return A * column_projection, -rhs_cat
[docs]
def assemble_schur_complement_system(
self,
primary_equations: EquationList | EquationRestriction,
primary_variables: VariableList,
inverter: Optional[Callable[[sps.spmatrix], sps.spmatrix]] = None,
state: Optional[np.ndarray] = None,
) -> tuple[sps.spmatrix, np.ndarray]:
r"""Assemble Jacobian matrix and residual vector using a Schur complement
elimination of the variables and equations not to be included.
The specified equations and variables will define blocks of the linearized
system as
.. math::
\left [ \begin{matrix} A_{pp} & A_{ps} \\ A_{sp} & A_{ss} \end{matrix} \right]
\left [ \begin{matrix} x_p \\ x_s \end{matrix}\right]
= \left [ \begin{matrix} b_p \\ b_s \end{matrix}\right]
where subscripts p and s define primary and secondary blocks.
The Schur complement system is then given by
.. math::
\left( A_{pp} - A_{ps} * A_{ss}^{-1} * A_{sp}\right) * x_p
= b_p - A_{ps} * A_{ss} * b_s
The Schur complement is well-defined only if the inverse of :math:`A_{ss}`
exists, and the efficiency of the approach assumes that an efficient inverter
for :math:`A_{ss}` can be found.
**The user must ensure both requirements are fulfilled.**
Note:
The optional arguments defining the secondary block, and the flag
``excl_loc_prim_to_sec`` are meant for nested Schur-complements and
splitting solvers. This is an advanced usage and requires the user to be
careful, since the resulting blocks :math:`A_{pp}` and :math:`A_{ss}` might
end up to be not square. This will result in errors.
Examples:
The default inverter can be defined by
.. code-block:: python
import scipy.sparse as sps
inverter = lambda A: sps.csr_matrix(sps.linalg.inv(A.A))
It is costly in terms of computational time and memory, though.
TODO: We should rather use the block inverter in pp.matrix_operations. This
will require some work on ensuring the system is block-diagonal.
Parameters:
primary_equations: a subset of equations specifying the primary subspace in
row-sense.
primary_variables: VariableType input specifying the primary subspace in
column-sense.
inverter (optional): callable object to compute the inverse of the matrix
:math:`A_{ss}`. By default, the scipy direct sparse inverter is used.
state (optional): see :meth:`assemble_subsystem`. Defaults to None.
Returns:
Tuple containing
sps.spmatrix: Jacobian matrix representing the Schur complement with
respect to the targeted state.
np.ndarray: Residual vector for the Schur complement with respect to the
targeted state. Scaled with -1 (moved to rhs).
Raises:
AssertionError:
- If the primary block would have 0 rows or columns.
- If the secondary block would have 0 rows or columns.
- If the secondary block is not square.
ValueError: If primary and secondary columns overlap.
"""
if inverter is None:
inverter = lambda A: sps.csr_matrix(sps.linalg.inv(A.A))
# Find the rows of the primary block. This can include both equations defined
# on their full image, and equations specified on a subset of grids.
# The variable primary_rows will contain the indices in the global system
# corresponding to the primary block.
primary_rows = self._parse_equations(primary_equations)
# Find indices of equations involved in the primary block, but on grids that
# were filtered out. These will be added to the secondary block.
excluded_primary_rows = self._gridbased_equation_complement(primary_rows)
# Names of equations that form the primary block.
primary_equation_names = list(primary_rows.keys())
# Get the primary variables, represented as Ad variables.
active_variables = self._parse_variable_type(primary_variables)
# Projection of variables to the set of primary blocks.
primary_projection = self.projection_to(active_variables)
# Assert non-emptiness of primary block.
assert len(primary_rows) > 0
assert primary_projection.shape[0] > 0
# Equations that are not part of the primary block. These will form parts of the
# secondary block, as will the equations that are defined on grids that were
# excluded.
secondary_equation_names: list[str] = list(
set(self._equations.keys()).difference(set(primary_rows.keys()))
)
secondary_variables = list(set(self.variables).difference(active_variables))
secondary_projection = self.projection_to(secondary_variables)
# Assert non-emptiness of secondary block. We do not check the length of
# sequandary_equation_names, since this can empty if the secondary block is
# defined by a subset of grids.
assert secondary_projection.shape[0] > 0
# Storage of primary and secondary row blocks.
A_sec: list[sps.csr_matrix] = list()
b_sec: list[np.ndarray] = list()
A_prim: list[sps.csr_matrix] = list()
b_prim: list[np.ndarray] = list()
# Keep track of indices or primary block.
ind_start = 0
assembled_equation_indices = dict()
# We loop over stored equations to ensure the correct order but process only
# primary equations.
# Excluded local primary blocks are stored as top rows in the secondary block.
for name in self._equations:
if name in primary_equation_names:
A_temp, b_temp = self.assemble_subsystem(equations=[name], state=state)
idx_p = primary_rows[name]
# Check if a grid filter was applied for that equation
if idx_p is not None:
# Append the respective rows.
A_prim.append(A_temp[idx_p])
b_prim.append(b_temp[idx_p])
# If requested, the excluded primary rows are appended as secondary.
idx_excl_p = excluded_primary_rows[name]
A_sec.append(A_temp[idx_excl_p])
b_sec.append(b_temp[idx_excl_p])
else:
# If no filter was applied, the whole row block is appended.
A_prim.append(A_temp)
b_prim.append(b_temp)
# Track indices of block rows. Only primary equations are included.
row_idx = np.arange(b_prim[-1].size, dtype=int)
indices = row_idx + ind_start
ind_start += row_idx.size
assembled_equation_indices.update({name: indices})
# store the assembled row indices for the primary block only (Schur)
self.assembled_equation_indices = assembled_equation_indices
# We loop again over stored equation to ensure a correct order
# but process only secondary equations.
for name in self._equations:
# Secondary equations (those not explicitly given as being primary) are
# assembled wholesale to the secondary block.
if name in secondary_equation_names:
A_temp, b_temp = self.assemble_subsystem(equations=[name], state=state)
A_sec.append(A_temp)
b_sec.append(b_temp)
# stack the results
A_p = sps.vstack(A_prim, format="csr")
b_p = np.concatenate(b_prim)
A_s = sps.vstack(A_sec, format="csr")
b_s = np.concatenate(b_sec)
# turn the projections into prolongations
primary_projection = primary_projection.transpose()
secondary_projection = secondary_projection.transpose()
# Matrices involved in the Schur complements
A_pp = A_p * primary_projection
A_ps = A_p * secondary_projection
A_sp = A_s * primary_projection
A_ss = A_s * secondary_projection
# Last sanity check, if A_ss is square.
assert A_ss.shape[0] == A_ss.shape[1]
# Compute the inverse of A_ss using the passed inverter.
inv_A_ss = inverter(A_ss)
S = A_pp - A_ps * inv_A_ss * A_sp
rhs_S = b_p - A_ps * inv_A_ss * b_s
# Store information necessary for expanding the Schur complement later.
self._Schur_complement = (
inv_A_ss,
b_s,
A_sp,
primary_projection,
secondary_projection,
)
return S, rhs_S
[docs]
def expand_schur_complement_solution(
self, reduced_solution: np.ndarray
) -> np.ndarray:
r"""Expands the solution of the *last assembled* Schur complement system to the
whole solution.
With ``reduced_solution`` as :math:`x_p` from
.. math::
\left [ \begin{matrix} A_{pp} & A_{ps} \\ A_{sp} & A_{ss} \end{matrix} \right]
\left [ \begin{matrix} x_p \\ x_s \end{matrix}\right]
= \left [ \begin{matrix} b_p \\ b_s \end{matrix}\right],
the method returns the whole vector :math:`[x_p, x_s]`, where
.. math::
x_s = A_{ss}^{-1} * (b_s - A_{sp} * x_p).
Note:
Independent of how the primary and secondary blocks were chosen, this method
always returns a vector of size ``num_dofs``.
Especially when the primary and secondary variables did not constitute the
whole vector of unknowns, the result is still of size ``num_dofs``.
The entries corresponding to the excluded grid variables are zero.
Parameters:
reduced_solution: Solution to the linear system returned by
:meth:`assemble_schur_complement_system`.
Returns:
The expanded Schur solution in global size.
Raises:
ValueError: If the Schur complement system was not assembled before.
"""
if self._Schur_complement is None:
raise ValueError("Schur complement system was not assembled before.")
# Get data stored from last constructed Schur complement.
inv_A_ss, b_s, A_sp, prolong_p, prolong_s = self._Schur_complement
# Calculate the complement solution.
x_s = inv_A_ss * (b_s - A_sp * reduced_solution)
# Prolong primary and secondary block to global-sized arrays
X = prolong_p * reduced_solution + prolong_s * x_s
return X
### Special methods ----------------------------------------------------------------
def __repr__(self) -> str:
s = (
"EquationSystem for mixed-dimensional grid with "
f"{self.mdg.num_subdomains()} subdomains "
f"and {self.mdg.num_interfaces()}"
" interfaces.\n"
)
# Sort variables alphabetically, not case-sensitive
all_variables = set([var.name for var in self.variables])
s += "Variables present on at least one grid or interface:\n\t"
s += ", ".join(all_variables) + "\n"
if self._equations is not None:
eq_names = [name for name in self._equations]
s += f"In total {len(self._equations)} equations, with names: \n\t"
s += ", ".join(eq_names)
return s
def __str__(self) -> str:
s = (
"EquationSystem for mixed-dimensional grid with "
f"{self.mdg.num_subdomains()} subdomains "
f"and {self.mdg.num_interfaces()}"
" interfaces.\n"
)
all_variables: set[str] = set([var.name for var in self.variables])
variable_grid: dict[str, list[pp.GridLike]] = {var: [] for var in all_variables}
for var in self.variables:
variable_grid[var.name].append(var.domain)
s += (
f"There are in total {len(all_variables)} variables,"
+ " distributed as follows:\n"
)
# Sort variables alphabetically, not case-sensitive
for var_name, grids in variable_grid.items():
s += "\t" + f"{var_name} is present on"
if isinstance(grids[0], pp.Grid):
assert all([isinstance(g, pp.Grid) for g in grids])
sorted_grids = self.mdg.sort_subdomains(grids) # type: ignore
s += " subdomains with id: " + ", ".join(
[str(g.id) for g in sorted_grids]
)
else:
assert all([isinstance(g, pp.MortarGrid) for g in grids])
sorted_grids = self.mdg.sort_interfaces(grids) # type: ignore
s += " interfaces with id: " + ", ".join(
[str(g.id) for g in sorted_grids]
)
s += "\n"
if self._equations is not None:
eq_names = [name for name in self._equations]
s += f"In total {len(self._equations)} equations, with names: \n\t"
s += "\n\t".join(eq_names) + "\n"
return s