Sanitized file: src/graph/incidence.py
Exact copy of the incidence-matrix builder checked into the private repository (commit f7f0b0b). All comments and docstrings preserved.
Request additional modulesModule contents
"""Incidence matrix construction for double-entry validation."""
from __future__ import annotations
import numpy as np
import scipy.sparse as sp
from .postings import JournalEntry
class IncidenceMatrix:
"""
Incidence matrix P for chart of accounts (signed posting matrix).
Mathematical definition:
P[i,j] = +1 if entry j debits account i
P[i,j] = -1 if entry j credits account i
P[i,j] = 0 otherwise
Conservation laws for double-entry bookkeeping:
THEOREM 1 (Balanced entry = left-nullspace condition):
For any balanced journal entry, the column sum is zero:
1ᵀ P_j = 0 for all entry columns j
Equivalently the ones vector lies in the left nullspace: 1ᵀ P = 0.
THEOREM 2 (Stock-flow evolution):
Account balances evolve via:
x_{t+1} = x_t + P a_t
where:
- x_t ∈ ℝⁿ is the balance vector at time t
- P ∈ ℝⁿˣᵐ is the signed posting matrix (debits +, credits -)
- a_t ∈ ℝᵐ is the vector of posted amounts
COROLLARY (Global conservation):
Total system "mass" is conserved across internal postings:
1ᵀ (x_{t+1} - x_t) = 1ᵀ P a_t = (1ᵀ P) a_t = 0
This is the discrete analogue of ∂ρ/∂t + ∇·J = 0.
Note: We do NOT generally have P a_t = 0 (that would imply no balance changes).
Conservation applies to the total (sum across accounts), not account-wise.
"""
def __init__(self, account_ids: list[str]):
"""Initialize incidence matrix builder."""
self.account_ids = account_ids
self.account_index: dict[str, int] = {aid: i for i, aid in enumerate(account_ids)}
self.n_accounts = len(account_ids)
self.entries: list[JournalEntry] = []
self._matrix: sp.csr_matrix | None = None
def add_entry(self, entry: JournalEntry) -> None:
"""
Add journal entry to matrix.
Args:
entry: Journal entry (must be balanced)
Raises:
ValueError: If entry is unbalanced
"""
if not entry.is_balanced():
raise ValueError(f"Journal entry not balanced: {entry.description}")
self.entries.append(entry)
self._matrix = None # Invalidate cached matrix
def build_matrix(self) -> sp.csr_matrix:
"""Construct sparse incidence matrix."""
n_entries = len(self.entries)
if n_entries == 0:
# Return empty matrix
self._matrix = sp.csr_matrix((self.n_accounts, 0))
return self._matrix
# Build sparse matrix from (row, col, data) triples
rows: list[int] = []
cols: list[int] = []
data: list[int] = []
for j, entry in enumerate(self.entries):
for posting in entry.postings:
if posting.account_id not in self.account_index:
raise ValueError(f"Account {posting.account_id} not in account list")
i = self.account_index[posting.account_id]
# Debit = +1, Credit = -1
sign = +1 if posting.amount > 0 else -1
rows.append(i)
cols.append(j)
data.append(sign)
# Create sparse matrix
B = sp.csr_matrix(
(data, (rows, cols)),
shape=(self.n_accounts, n_entries),
dtype=np.int8,
)
self._matrix = B
return B
@property
def matrix(self) -> sp.csr_matrix:
"""Get incidence matrix (build if needed)."""
if self._matrix is None:
return self.build_matrix()
return self._matrix
@staticmethod
def _entry_scale_base(entry: JournalEntry) -> float:
"""Compute baseline magnitude for scaling entry postings."""
positive_total = sum(max(posting.amount, 0.0) for posting in entry.postings)
if positive_total > 0:
return positive_total
negative_total = sum(-posting.amount for posting in entry.postings if posting.amount < 0)
if negative_total > 0:
return negative_total
return 0.0
def validate_balanced_entries(self, tolerance: float = 1e-6) -> bool:
"""
Check that every journal entry column satisfies Theorem 1 (1ᵀ P_j = 0).
Args:
tolerance: Numerical tolerance for floating-point comparisons.
Returns:
True if all column sums are zero (entries balanced).
"""
B = self.matrix
if B.shape[1] == 0:
return True
column_sums = np.array(
[sum(posting.amount for posting in entry.postings) for entry in self.entries],
dtype=float,
)
return np.allclose(column_sums, 0, atol=tolerance)
def validate_global_conservation(self, amounts: np.ndarray, tolerance: float = 1e-6) -> bool:
"""
Check global conservation law: 1ᵀ Δx = 0 given posting amounts a_t.
Args:
amounts: Vector of posted amounts (len = number of entries).
tolerance: Tolerance for numerical comparison.
Returns:
True if total change in balances sums to ~0.
"""
B = self.matrix
amounts = np.asarray(amounts, dtype=float)
if B.shape[1] != len(amounts):
raise ValueError(
f"Amount vector length {len(amounts)} doesn't match entries {B.shape[1]}"
)
delta_x = np.zeros(self.matrix.shape[0], dtype=float)
for j, entry in enumerate(self.entries):
base = self._entry_scale_base(entry)
scale = 0.0 if base == 0 else amounts[j] / base
if scale == 0.0:
continue
for posting in entry.postings:
idx = self.account_index[posting.account_id]
delta_x[idx] += posting.amount * scale
total_change = float(np.sum(delta_x))
return abs(total_change) < tolerance
def compute_stock_flow_evolution(
self,
x_current: np.ndarray,
amounts: np.ndarray,
) -> tuple[np.ndarray, dict]:
"""
Implement Theorem 2: x_{t+1} = x_t + P a_t.
Args:
x_current: Current balance vector (len = number of accounts).
amounts: Posted amounts (len = number of entries).
Returns:
Tuple of (x_next, diagnostics).
"""
if len(x_current) != self.matrix.shape[0]:
raise ValueError(
f"Balance vector length {len(x_current)} doesn't match accounts {self.matrix.shape[0]}"
)
if len(amounts) != self.matrix.shape[1]:
raise ValueError(
f"Amount vector length {len(amounts)} doesn't match entries {self.matrix.shape[1]}"
)
amounts = np.asarray(amounts, dtype=float)
delta_x = np.zeros(self.matrix.shape[0], dtype=float)
for j, entry in enumerate(self.entries):
base = self._entry_scale_base(entry)
scale = 0.0 if base == 0 else amounts[j] / base
if scale == 0.0:
continue
for posting in entry.postings:
idx = self.account_index[posting.account_id]
delta_x[idx] += posting.amount * scale
x_next = x_current + delta_x
diagnostics = {
"delta_x": delta_x,
"total_change": float(np.sum(delta_x)),
"max_change": float(np.max(np.abs(delta_x))) if delta_x.size > 0 else 0.0,
"accounts_changed": int(np.sum(np.abs(delta_x) > 1e-6)),
}
return x_next, diagnostics
def get_divergence(self) -> np.ndarray:
"""Compute divergence at each account."""
B = self.matrix
if B.shape[1] == 0:
return np.zeros(self.n_accounts)
f = np.array([sum(p.amount for p in entry.postings) for entry in self.entries])
return B @ f
def get_divergence_per_account(self) -> dict[str, float]:
"""Get divergence for each account (useful for debugging)."""
divergence = self.get_divergence()
return {aid: float(divergence[i]) for i, aid in enumerate(self.account_ids)}
def to_dense(self) -> np.ndarray:
"""Convert to dense numpy array (for inspection/debugging)."""
return self.matrix.toarray()