Sanitized file: src/graph/incidence.py

Exact copy of the incidence-matrix builder checked into the private repository (commit f7f0b0b). All comments and docstrings preserved.

Request additional modules

Module contents

"""Incidence matrix construction for double-entry validation."""

from __future__ import annotations

import numpy as np
import scipy.sparse as sp

from .postings import JournalEntry


class IncidenceMatrix:
    """
    Incidence matrix P for chart of accounts (signed posting matrix).

    Mathematical definition:
        P[i,j] = +1 if entry j debits account i
        P[i,j] = -1 if entry j credits account i
        P[i,j] = 0 otherwise

    Conservation laws for double-entry bookkeeping:

        THEOREM 1 (Balanced entry = left-nullspace condition):
            For any balanced journal entry, the column sum is zero:
                1ᵀ P_j = 0 for all entry columns j
            Equivalently the ones vector lies in the left nullspace: 1ᵀ P = 0.

        THEOREM 2 (Stock-flow evolution):
            Account balances evolve via:
                x_{t+1} = x_t + P a_t
            where:
                - x_t ∈ ℝⁿ is the balance vector at time t
                - P ∈ ℝⁿˣᵐ is the signed posting matrix (debits +, credits -)
                - a_t ∈ ℝᵐ is the vector of posted amounts

        COROLLARY (Global conservation):
            Total system "mass" is conserved across internal postings:
                1ᵀ (x_{t+1} - x_t) = 1ᵀ P a_t = (1ᵀ P) a_t = 0

            This is the discrete analogue of ∂ρ/∂t + ∇·J = 0.

    Note: We do NOT generally have P a_t = 0 (that would imply no balance changes).
    Conservation applies to the total (sum across accounts), not account-wise.
    """

    def __init__(self, account_ids: list[str]):
        """Initialize incidence matrix builder."""
        self.account_ids = account_ids
        self.account_index: dict[str, int] = {aid: i for i, aid in enumerate(account_ids)}
        self.n_accounts = len(account_ids)
        self.entries: list[JournalEntry] = []
        self._matrix: sp.csr_matrix | None = None

    def add_entry(self, entry: JournalEntry) -> None:
        """
        Add journal entry to matrix.

        Args:
            entry: Journal entry (must be balanced)

        Raises:
            ValueError: If entry is unbalanced
        """
        if not entry.is_balanced():
            raise ValueError(f"Journal entry not balanced: {entry.description}")

        self.entries.append(entry)
        self._matrix = None  # Invalidate cached matrix

    def build_matrix(self) -> sp.csr_matrix:
        """Construct sparse incidence matrix."""
        n_entries = len(self.entries)

        if n_entries == 0:
            # Return empty matrix
            self._matrix = sp.csr_matrix((self.n_accounts, 0))
            return self._matrix

        # Build sparse matrix from (row, col, data) triples
        rows: list[int] = []
        cols: list[int] = []
        data: list[int] = []

        for j, entry in enumerate(self.entries):
            for posting in entry.postings:
                if posting.account_id not in self.account_index:
                    raise ValueError(f"Account {posting.account_id} not in account list")

                i = self.account_index[posting.account_id]

                # Debit = +1, Credit = -1
                sign = +1 if posting.amount > 0 else -1

                rows.append(i)
                cols.append(j)
                data.append(sign)

        # Create sparse matrix
        B = sp.csr_matrix(
            (data, (rows, cols)),
            shape=(self.n_accounts, n_entries),
            dtype=np.int8,
        )

        self._matrix = B
        return B

    @property
    def matrix(self) -> sp.csr_matrix:
        """Get incidence matrix (build if needed)."""
        if self._matrix is None:
            return self.build_matrix()
        return self._matrix

    @staticmethod
    def _entry_scale_base(entry: JournalEntry) -> float:
        """Compute baseline magnitude for scaling entry postings."""
        positive_total = sum(max(posting.amount, 0.0) for posting in entry.postings)
        if positive_total > 0:
            return positive_total

        negative_total = sum(-posting.amount for posting in entry.postings if posting.amount < 0)
        if negative_total > 0:
            return negative_total

        return 0.0

    def validate_balanced_entries(self, tolerance: float = 1e-6) -> bool:
        """
        Check that every journal entry column satisfies Theorem 1 (1ᵀ P_j = 0).

        Args:
            tolerance: Numerical tolerance for floating-point comparisons.

        Returns:
            True if all column sums are zero (entries balanced).
        """
        B = self.matrix

        if B.shape[1] == 0:
            return True

        column_sums = np.array(
            [sum(posting.amount for posting in entry.postings) for entry in self.entries],
            dtype=float,
        )
        return np.allclose(column_sums, 0, atol=tolerance)

    def validate_global_conservation(self, amounts: np.ndarray, tolerance: float = 1e-6) -> bool:
        """
        Check global conservation law: 1ᵀ Δx = 0 given posting amounts a_t.

        Args:
            amounts: Vector of posted amounts (len = number of entries).
            tolerance: Tolerance for numerical comparison.

        Returns:
            True if total change in balances sums to ~0.
        """
        B = self.matrix
        amounts = np.asarray(amounts, dtype=float)

        if B.shape[1] != len(amounts):
            raise ValueError(
                f"Amount vector length {len(amounts)} doesn't match entries {B.shape[1]}"
            )

        delta_x = np.zeros(self.matrix.shape[0], dtype=float)

        for j, entry in enumerate(self.entries):
            base = self._entry_scale_base(entry)
            scale = 0.0 if base == 0 else amounts[j] / base
            if scale == 0.0:
                continue
            for posting in entry.postings:
                idx = self.account_index[posting.account_id]
                delta_x[idx] += posting.amount * scale

        total_change = float(np.sum(delta_x))
        return abs(total_change) < tolerance

    def compute_stock_flow_evolution(
        self,
        x_current: np.ndarray,
        amounts: np.ndarray,
    ) -> tuple[np.ndarray, dict]:
        """
        Implement Theorem 2: x_{t+1} = x_t + P a_t.

        Args:
            x_current: Current balance vector (len = number of accounts).
            amounts: Posted amounts (len = number of entries).

        Returns:
            Tuple of (x_next, diagnostics).
        """
        if len(x_current) != self.matrix.shape[0]:
            raise ValueError(
                f"Balance vector length {len(x_current)} doesn't match accounts {self.matrix.shape[0]}"
            )

        if len(amounts) != self.matrix.shape[1]:
            raise ValueError(
                f"Amount vector length {len(amounts)} doesn't match entries {self.matrix.shape[1]}"
            )

        amounts = np.asarray(amounts, dtype=float)

        delta_x = np.zeros(self.matrix.shape[0], dtype=float)
        for j, entry in enumerate(self.entries):
            base = self._entry_scale_base(entry)
            scale = 0.0 if base == 0 else amounts[j] / base
            if scale == 0.0:
                continue
            for posting in entry.postings:
                idx = self.account_index[posting.account_id]
                delta_x[idx] += posting.amount * scale

        x_next = x_current + delta_x

        diagnostics = {
            "delta_x": delta_x,
            "total_change": float(np.sum(delta_x)),
            "max_change": float(np.max(np.abs(delta_x))) if delta_x.size > 0 else 0.0,
            "accounts_changed": int(np.sum(np.abs(delta_x) > 1e-6)),
        }

        return x_next, diagnostics

    def get_divergence(self) -> np.ndarray:
        """Compute divergence at each account."""
        B = self.matrix

        if B.shape[1] == 0:
            return np.zeros(self.n_accounts)

        f = np.array([sum(p.amount for p in entry.postings) for entry in self.entries])

        return B @ f

    def get_divergence_per_account(self) -> dict[str, float]:
        """Get divergence for each account (useful for debugging)."""
        divergence = self.get_divergence()

        return {aid: float(divergence[i]) for i, aid in enumerate(self.account_ids)}

    def to_dense(self) -> np.ndarray:
        """Convert to dense numpy array (for inspection/debugging)."""
        return self.matrix.toarray()