feat(reconcile): two-source reconciliation tool

Bank-feed-vs-ledger style matcher: 4-pass greedy assignment (key →
exact → tolerance → fuzzy) with ambiguous candidates routed to a
review bucket instead of arbitrary picks. CLI mirrors the
cli_text_clean preview/--apply pattern; Streamlit page registered
in the automations section.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 22:33:14 +00:00
parent 450d4fc9a8
commit e44af3a45e
5 changed files with 1449 additions and 0 deletions

198
src/cli_reconcile.py Normal file
View File

@@ -0,0 +1,198 @@
"""CLI for the DataTools reconciliation tool.
Usage:
python -m src.cli_reconcile bank.csv ledger.csv \\
--left-amount amount --right-amount amt \\
--left-date date --right-date posted # dry-run preview
python -m src.cli_reconcile bank.csv ledger.csv \\
--left-amount amount --right-amount amt \\
--left-date date --right-date posted --apply # write matched/unmatched CSVs
python -m src.cli_reconcile --help # full help
Outputs (with --apply) sit beside the LEFT input file:
{stem}_matched.csv one row per accepted pair
{stem}_unmatched_left.csv left rows with no counterpart
{stem}_unmatched_right.csv right rows with no counterpart
{stem}_review.csv ambiguous pairs flagged for review
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import typer
from loguru import logger
app = typer.Typer(
name="reconcile",
help=(
"Reconcile two data sources (e.g. bank feed vs. ledger export).\n\n"
"By default, runs in preview mode — shows the match stats without "
"writing anything. Add --apply to write the four output CSVs.\n\n"
"Examples:\n\n"
" # Bank feed vs ledger, exact match\n"
" python -m src.cli_reconcile bank.csv ledger.csv \\\n"
" --left-amount amount --right-amount amt \\\n"
" --left-date date --right-date posted\n\n"
" # Allow 2-day posting drift and a cent of rounding tolerance\n"
" python -m src.cli_reconcile bank.csv ledger.csv \\\n"
" --left-amount amount --right-amount amt \\\n"
" --left-date date --right-date posted \\\n"
" --date-tolerance 2 --amount-tolerance 0.01 --apply\n\n"
" # Bank shows debits as positive; ledger as negative\n"
" python -m src.cli_reconcile bank.csv ledger.csv \\\n"
" --left-amount amount --right-amount amt --invert-right-sign --apply\n"
),
add_completion=False,
no_args_is_help=True,
)
def _setup_logging(log_dir: Path) -> Path:
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = log_dir / f"reconcile_{ts}.log"
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{message}")
logger.add(
str(log_path),
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}",
)
return log_path
def _split_csv_arg(raw: Optional[str]) -> list[str]:
if raw is None:
return []
return [c.strip() for c in raw.split(",") if c.strip()]
@app.command()
def run(
left_file: str = typer.Argument(..., help="Path to the LEFT input (e.g. bank feed)."),
right_file: str = typer.Argument(..., help="Path to the RIGHT input (e.g. ledger)."),
left_amount: str = typer.Option(..., "--left-amount", help="Amount column on the LEFT."),
right_amount: str = typer.Option(..., "--right-amount", help="Amount column on the RIGHT."),
left_date: Optional[str] = typer.Option(None, "--left-date", help="Date column on the LEFT."),
right_date: Optional[str] = typer.Option(None, "--right-date", help="Date column on the RIGHT."),
left_keys: Optional[str] = typer.Option(
None, "--left-keys",
help="Comma-separated reference/key columns on the LEFT (paired with --right-keys).",
),
right_keys: Optional[str] = typer.Option(
None, "--right-keys",
help="Comma-separated reference/key columns on the RIGHT (paired with --left-keys).",
),
left_desc: Optional[str] = typer.Option(None, "--left-desc", help="Description column on the LEFT (fuzzy)."),
right_desc: Optional[str] = typer.Option(None, "--right-desc", help="Description column on the RIGHT (fuzzy)."),
desc_min_score: int = typer.Option(
0, "--desc-min-score",
help="Min description similarity (0-100) to accept a fuzzy match. 0 disables.",
),
amount_tolerance: float = typer.Option(
0.0, "--amount-tolerance",
help="Absolute amount tolerance (e.g. 0.01 to absorb cent-rounding).",
),
date_tolerance: int = typer.Option(
0, "--date-tolerance",
help="Date tolerance in calendar days (± N).",
),
invert_right_sign: bool = typer.Option(
False, "--invert-right-sign",
help="Negate the RIGHT amount before matching (use when sign conventions differ).",
),
apply: bool = typer.Option(
False, "--apply",
help="Write the four output CSV files. Without this flag, only stats are shown.",
),
):
"""Reconcile two CSV/Excel files."""
from src.core.io import read_file, write_file
from src.core.reconcile import ReconcileOptions, reconcile
left_path = Path(left_file)
right_path = Path(right_file)
for p in (left_path, right_path):
if not p.exists():
typer.echo(f"Error: File not found: {p}", err=True)
raise typer.Exit(1)
log_path = _setup_logging(Path("logs"))
typer.echo(f"Reading {left_path.name}...")
try:
left_df = read_file(left_path)
except Exception as e:
typer.echo(f"Error reading {left_path.name}: {e}", err=True)
raise typer.Exit(1)
typer.echo(f" {len(left_df)} rows, {len(left_df.columns)} columns")
typer.echo(f"Reading {right_path.name}...")
try:
right_df = read_file(right_path)
except Exception as e:
typer.echo(f"Error reading {right_path.name}: {e}", err=True)
raise typer.Exit(1)
typer.echo(f" {len(right_df)} rows, {len(right_df.columns)} columns")
options = ReconcileOptions(
left_amount=left_amount,
right_amount=right_amount,
left_date=left_date,
right_date=right_date,
left_keys=_split_csv_arg(left_keys),
right_keys=_split_csv_arg(right_keys),
left_desc=left_desc,
right_desc=right_desc,
desc_min_score=desc_min_score,
amount_tolerance=amount_tolerance,
date_tolerance_days=date_tolerance,
invert_right_sign=invert_right_sign,
)
typer.echo("Reconciling...")
try:
result = reconcile(left_df, right_df, options)
except ValueError as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
_print_stats(result.stats)
if apply:
stem = left_path.stem
out_dir = left_path.parent
write_file(result.matched, out_dir / f"{stem}_matched.csv")
write_file(result.unmatched_left, out_dir / f"{stem}_unmatched_left.csv")
write_file(result.unmatched_right, out_dir / f"{stem}_unmatched_right.csv")
write_file(result.review, out_dir / f"{stem}_review.csv")
typer.echo(f"\nWrote 4 files to {out_dir}:")
for suffix in ("matched", "unmatched_left", "unmatched_right", "review"):
typer.echo(f" {stem}_{suffix}.csv")
else:
typer.echo("\nThis was a preview. Add --apply to write the output files.")
typer.echo(f"Log: {log_path}")
def _print_stats(stats: dict) -> None:
typer.echo(f"\n{''*50}")
typer.echo(f" Left rows: {stats['left_rows']}")
typer.echo(f" Right rows: {stats['right_rows']}")
typer.echo(f" Matched: {stats['matched']}")
typer.echo(f" Review (ambiguous): {stats['review']}")
typer.echo(f" Unmatched left: {stats['unmatched_left']}")
typer.echo(f" Unmatched right: {stats['unmatched_right']}")
typer.echo(f"{''*50}")
def main():
app()
if __name__ == "__main__":
main()

598
src/core/reconcile.py Normal file
View File

@@ -0,0 +1,598 @@
"""Two-source data reconciliation.
Given two DataFrames (typically a bank/credit-card feed and a ledger
export), find which rows on the left correspond to rows on the right
based on amount, date, and optional reference/description fields.
Output buckets:
matched — one row per accepted pair, with both originals.
unmatched_left — left rows with no acceptable right counterpart.
unmatched_right — right rows with no acceptable left counterpart.
review — ambiguous cases (a left row had >1 equally good
right candidates, or vice versa) surfaced for the
user to disambiguate manually.
Matching strategy is a multi-pass greedy one-to-one assignment:
Pass 1: exact key match (when ``key_columns`` is set on either side)
Pass 2: exact (amount, date) match
Pass 3: amount within tolerance AND date within window
Pass 4: + optional description fuzzy similarity boost
Within each pass, candidate pairs are scored and assigned greedily by
descending score; ties for the same left row that span multiple right
rows (or vice versa) are sent to ``review`` instead of being matched
arbitrarily.
The module is pure: no I/O, no Streamlit, no logging side effects beyond
loguru. Caller drives file reading and result rendering.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
import pandas as pd
from loguru import logger
try:
from rapidfuzz import fuzz as _rf_fuzz
_HAS_RAPIDFUZZ = True
except ImportError: # pragma: no cover — rapidfuzz is in requirements.txt
_HAS_RAPIDFUZZ = False
# ---------------------------------------------------------------------------
# Options & result
# ---------------------------------------------------------------------------
@dataclass
class ReconcileOptions:
"""Configuration for :func:`reconcile`.
``left_amount`` / ``right_amount`` are required: every match needs
an amount to anchor on. Everything else is optional.
"""
# Amount columns (required). Values are coerced to float; non-numeric
# rows are dropped from matching but appear in the unmatched buckets.
left_amount: str = ""
right_amount: str = ""
# Date columns. When both are set, candidates must fall within
# ``date_tolerance_days``. When unset, date is ignored entirely.
left_date: Optional[str] = None
right_date: Optional[str] = None
# Optional reference / key columns for exact-match Pass 1. List
# forms must be the same length so the i-th left key pairs with the
# i-th right key (e.g. ``["check_no"]`` ↔ ``["ref"]``).
left_keys: list[str] = field(default_factory=list)
right_keys: list[str] = field(default_factory=list)
# Description columns for fuzzy similarity boost (optional). Only
# used when ``desc_min_score`` > 0 AND rapidfuzz is installed.
left_desc: Optional[str] = None
right_desc: Optional[str] = None
desc_min_score: int = 0 # 0100; 0 disables fuzzy.
# Tolerances. Defaults are exact match.
amount_tolerance: float = 0.0 # absolute (e.g. 0.01 for cent rounding)
date_tolerance_days: int = 0 # ± N calendar days
# Some bank feeds use opposite sign convention from the ledger
# (debits positive vs. negative). Flipping this multiplies the
# right side's amount by -1 before matching.
invert_right_sign: bool = False
@dataclass
class ReconcileResult:
"""Outcome of a reconcile run.
All four DataFrames preserve the original columns from each side,
prefixed with ``left_`` and ``right_`` where applicable, plus a
small set of bookkeeping columns (``match_pass``, ``amount_diff``,
``date_diff_days``, ``desc_score``).
"""
matched: pd.DataFrame
unmatched_left: pd.DataFrame
unmatched_right: pd.DataFrame
review: pd.DataFrame
stats: dict[str, int] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def reconcile(
left: pd.DataFrame,
right: pd.DataFrame,
options: ReconcileOptions,
) -> ReconcileResult:
"""Reconcile *left* against *right* using *options*.
Neither input is mutated. The result's frames hold copies of the
relevant rows from the originals, joined via the bookkeeping
columns described on :class:`ReconcileResult`.
"""
_validate_options(left, right, options)
# Normalize amounts and dates to typed columns we can reason about
# without re-parsing per pass. The work columns live on copies so
# callers' inputs are untouched.
L = _prep_side(left, options, side="left")
R = _prep_side(right, options, side="right")
# Track which left/right indices remain unmatched across passes.
# Seeded from the FULL input frame, not the prepped one — rows
# dropped during prep (unparseable amount/date) must still surface
# in the unmatched bucket so users can see they exist. Candidate
# generators iterate L.index, so prep-dropped rows simply never
# get claimed.
left_open: set = set(left.index)
right_open: set = set(right.index)
matched_pairs: list[dict] = []
review_pairs: list[dict] = []
# Pass 1 — exact key match on user-supplied reference columns.
if options.left_keys and options.right_keys:
_run_pass(
L, R, left_open, right_open, matched_pairs, review_pairs,
options=options, pass_name="key",
candidate_fn=_candidates_by_key,
)
# Pass 2 — exact (amount, date) match.
_run_pass(
L, R, left_open, right_open, matched_pairs, review_pairs,
options=options, pass_name="exact",
candidate_fn=_candidates_exact,
)
# Pass 3 — tolerance-window match.
if options.amount_tolerance > 0 or options.date_tolerance_days > 0:
_run_pass(
L, R, left_open, right_open, matched_pairs, review_pairs,
options=options, pass_name="tolerance",
candidate_fn=_candidates_tolerance,
)
# Pass 4 — description fuzzy boost (only over what's left).
if (
options.desc_min_score > 0
and options.left_desc
and options.right_desc
and _HAS_RAPIDFUZZ
):
_run_pass(
L, R, left_open, right_open, matched_pairs, review_pairs,
options=options, pass_name="fuzzy",
candidate_fn=_candidates_fuzzy,
)
# Build the four output frames from what remains.
matched_df = _build_matched(left, right, matched_pairs, options)
review_df = _build_matched(left, right, review_pairs, options, review=True)
unmatched_left_df = left.loc[sorted(left_open)].copy()
unmatched_right_df = right.loc[sorted(right_open)].copy()
stats = {
"left_rows": len(left),
"right_rows": len(right),
"matched": len(matched_pairs),
"review": len(review_pairs),
"unmatched_left": len(unmatched_left_df),
"unmatched_right": len(unmatched_right_df),
}
logger.debug("reconcile stats: {}", stats)
return ReconcileResult(
matched=matched_df,
unmatched_left=unmatched_left_df,
unmatched_right=unmatched_right_df,
review=review_df,
stats=stats,
)
# ---------------------------------------------------------------------------
# Input validation & prep
# ---------------------------------------------------------------------------
def _validate_options(
left: pd.DataFrame, right: pd.DataFrame, options: ReconcileOptions
) -> None:
if not options.left_amount or not options.right_amount:
raise ValueError(
"Reconcile requires both left_amount and right_amount columns."
)
if options.left_amount not in left.columns:
raise ValueError(
f"left_amount column {options.left_amount!r} not in left DataFrame."
)
if options.right_amount not in right.columns:
raise ValueError(
f"right_amount column {options.right_amount!r} not in right DataFrame."
)
if bool(options.left_date) != bool(options.right_date):
raise ValueError(
"left_date and right_date must both be set or both be None."
)
if options.left_date and options.left_date not in left.columns:
raise ValueError(f"left_date column {options.left_date!r} not in left.")
if options.right_date and options.right_date not in right.columns:
raise ValueError(f"right_date column {options.right_date!r} not in right.")
if len(options.left_keys) != len(options.right_keys):
raise ValueError(
"left_keys and right_keys must be the same length "
f"(got {len(options.left_keys)} vs {len(options.right_keys)})."
)
for c in options.left_keys:
if c not in left.columns:
raise ValueError(f"left key column {c!r} not in left DataFrame.")
for c in options.right_keys:
if c not in right.columns:
raise ValueError(f"right key column {c!r} not in right DataFrame.")
if options.amount_tolerance < 0:
raise ValueError("amount_tolerance must be >= 0.")
if options.date_tolerance_days < 0:
raise ValueError("date_tolerance_days must be >= 0.")
if not (0 <= options.desc_min_score <= 100):
raise ValueError("desc_min_score must be between 0 and 100.")
def _prep_side(
df: pd.DataFrame, options: ReconcileOptions, side: str
) -> pd.DataFrame:
"""Return a copy with ``_amt`` and ``_date`` work columns added.
Rows whose amount cannot be parsed as a number are dropped from the
matching frame so they fall through to the unmatched bucket on the
caller side. The same is true for unparseable dates when date
matching is in use — date is required-when-configured.
"""
work = df.copy()
amt_col = options.left_amount if side == "left" else options.right_amount
date_col = options.left_date if side == "left" else options.right_date
work["_amt"] = pd.to_numeric(work[amt_col], errors="coerce")
if side == "right" and options.invert_right_sign:
work["_amt"] = -work["_amt"]
if date_col:
work["_date"] = pd.to_datetime(work[date_col], errors="coerce")
else:
work["_date"] = pd.NaT
# Drop rows that lack the inputs needed to participate. Their
# original index labels are intentionally preserved on the source
# frame so they show up in unmatched buckets below.
bad_amt = work["_amt"].isna()
bad_date = work["_date"].isna() if date_col else pd.Series(False, index=work.index)
keep = ~(bad_amt | bad_date)
if (~keep).any():
logger.debug(
"{} side: dropping {} row(s) with unparseable amount/date",
side, (~keep).sum(),
)
return work.loc[keep].copy()
# ---------------------------------------------------------------------------
# Per-pass orchestration
# ---------------------------------------------------------------------------
def _run_pass(
L: pd.DataFrame,
R: pd.DataFrame,
left_open: set,
right_open: set,
matched_pairs: list[dict],
review_pairs: list[dict],
*,
options: ReconcileOptions,
pass_name: str,
candidate_fn,
) -> None:
"""Run one matching pass over the still-open indices.
The pass collects (left_idx, right_idx, score, extras) candidates
from ``candidate_fn``, then greedily assigns by descending score.
A left row with two right candidates tied at the top score (and
vice versa) gets routed to the review bucket so we don't pick one
arbitrarily.
"""
L_open = L.loc[L.index.intersection(left_open)]
R_open = R.loc[R.index.intersection(right_open)]
if L_open.empty or R_open.empty:
return
candidates = candidate_fn(L_open, R_open, options)
if not candidates:
return
# Group candidates by left index. For each left row, partition into
# confident-best (single top score) vs. ambiguous (top score tied).
by_left: dict = {}
for cand in candidates:
by_left.setdefault(cand["left_idx"], []).append(cand)
# Two-pointer assignment by best-score-first, with reverse-direction
# ambiguity check so a right row claimed by two equally-good lefts
# also routes to review.
by_right_top: dict = {}
for li, cands in by_left.items():
cands.sort(key=lambda c: c["score"], reverse=True)
top = cands[0]["score"]
leaders = [c for c in cands if c["score"] == top]
for c in leaders:
by_right_top.setdefault(c["right_idx"], []).append(c)
# Sort left rows by their leader's score so high-confidence matches
# claim their right counterpart first; low-confidence rows lose
# contention if the right row was already taken.
left_order = sorted(
by_left.keys(),
key=lambda li: -by_left[li][0]["score"],
)
for li in left_order:
if li not in left_open:
continue
cands = by_left[li]
top_score = cands[0]["score"]
leaders = [c for c in cands if c["score"] == top_score]
# Filter to still-open right indices.
leaders = [c for c in leaders if c["right_idx"] in right_open]
if not leaders:
continue
if len(leaders) > 1:
# Left row is ambiguous on its own side — multiple equally
# good right candidates remain. Park them all in review.
for c in leaders:
review_pairs.append({**c, "pass": pass_name})
left_open.discard(li)
for c in leaders:
right_open.discard(c["right_idx"])
continue
pick = leaders[0]
ri = pick["right_idx"]
# Mirror check: is the right row contested by another left at
# the same top score? If so, both lefts go to review and the
# right row is consumed.
contenders = [
c for c in by_right_top.get(ri, [])
if c["left_idx"] in left_open and c["score"] == pick["score"]
]
if len(contenders) > 1:
for c in contenders:
review_pairs.append({**c, "pass": pass_name})
left_open.discard(c["left_idx"])
right_open.discard(ri)
continue
matched_pairs.append({**pick, "pass": pass_name})
left_open.discard(li)
right_open.discard(ri)
# ---------------------------------------------------------------------------
# Candidate generators (one per pass)
# ---------------------------------------------------------------------------
def _candidates_by_key(
L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions
) -> list[dict]:
"""Exact match on the user-supplied key columns + same amount.
Amount must still tie out; otherwise a shared reference number
(e.g. a check number reused across years) would over-match. We do
NOT require date in this pass — the assumption is that a confirmed
reference like an invoice number is authoritative even when the
posting date drifts.
"""
if not options.left_keys:
return []
# Build a composite key on each side as a tuple of stringified values.
L_key = L[options.left_keys].astype(str).agg("|".join, axis=1)
R_key = R[options.right_keys].astype(str).agg("|".join, axis=1)
R_by_key: dict = {}
for ri, k in R_key.items():
R_by_key.setdefault(k, []).append(ri)
out: list[dict] = []
for li, k in L_key.items():
if k == "" or k == "|".join([""] * len(options.left_keys)):
continue
for ri in R_by_key.get(k, []):
if abs(L.at[li, "_amt"] - R.at[ri, "_amt"]) <= options.amount_tolerance:
out.append(_score_pair(L, R, li, ri, base_score=1000))
return out
def _candidates_exact(
L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions
) -> list[dict]:
"""Exact match on amount (and date if configured)."""
out: list[dict] = []
has_date = options.left_date is not None
# Bucket right side by amount for cheap lookup.
R_by_amt: dict = {}
for ri, amt in R["_amt"].items():
R_by_amt.setdefault(amt, []).append(ri)
for li, amt in L["_amt"].items():
for ri in R_by_amt.get(amt, []):
if has_date and L.at[li, "_date"] != R.at[ri, "_date"]:
continue
out.append(_score_pair(L, R, li, ri, base_score=900))
return out
def _candidates_tolerance(
L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions
) -> list[dict]:
"""Amount within tolerance and (if configured) date within window.
Quadratic in the open set size. For typical reconciliation sizes
(a month of statements: low thousands of rows) this is fine; if a
user hands us 100k×100k we'll need a smarter blocking strategy.
"""
out: list[dict] = []
has_date = options.left_date is not None
tol = options.amount_tolerance
win = pd.Timedelta(days=options.date_tolerance_days) if has_date else None
R_amts = R["_amt"].to_numpy()
R_dates = R["_date"].to_numpy() if has_date else None
R_index = R.index.to_numpy()
for li in L.index:
l_amt = L.at[li, "_amt"]
l_date = L.at[li, "_date"] if has_date else None
amt_ok = (R_amts >= l_amt - tol) & (R_amts <= l_amt + tol)
if has_date:
date_diff = R_dates - l_date.to_datetime64()
date_ok = (date_diff >= -win.to_timedelta64()) & (
date_diff <= win.to_timedelta64()
)
mask = amt_ok & date_ok
else:
mask = amt_ok
for ri in R_index[mask]:
out.append(_score_pair(L, R, li, ri, base_score=500))
return out
def _candidates_fuzzy(
L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions
) -> list[dict]:
"""Tolerance-pass candidates re-scored by description similarity.
Only kept when the description similarity meets the threshold AND
the amount is within tolerance. Score blends the two so a strong
description match outranks a marginal amount match within the same
pass.
"""
if not (_HAS_RAPIDFUZZ and options.left_desc and options.right_desc):
return []
out: list[dict] = []
has_date = options.left_date is not None
tol = options.amount_tolerance
win = pd.Timedelta(days=options.date_tolerance_days) if has_date else None
min_score = options.desc_min_score
L_desc = L[options.left_desc].astype(str)
R_desc = R[options.right_desc].astype(str)
for li in L.index:
l_amt = L.at[li, "_amt"]
l_date = L.at[li, "_date"] if has_date else None
l_text = L_desc.at[li]
for ri in R.index:
if abs(R.at[ri, "_amt"] - l_amt) > tol:
continue
if has_date:
diff = R.at[ri, "_date"] - l_date
if abs(diff) > win:
continue
score = int(_rf_fuzz.token_set_ratio(l_text, R_desc.at[ri]))
if score < min_score:
continue
# Base 300 keeps fuzzy below exact/tolerance passes; the
# 0100 description score breaks ties within the pass.
out.append(
_score_pair(L, R, li, ri, base_score=300 + score, desc_score=score)
)
return out
# ---------------------------------------------------------------------------
# Scoring & output assembly
# ---------------------------------------------------------------------------
def _score_pair(
L: pd.DataFrame,
R: pd.DataFrame,
li,
ri,
*,
base_score: int,
desc_score: int = 0,
) -> dict:
"""Build the candidate record used by the assignment phase."""
amt_diff = float(L.at[li, "_amt"] - R.at[ri, "_amt"])
l_date = L.at[li, "_date"]
r_date = R.at[ri, "_date"]
if pd.notna(l_date) and pd.notna(r_date):
date_diff_days = int((l_date - r_date).days)
else:
date_diff_days = None
# Penalize larger differences within the same pass so closer matches
# win ties. Cap penalty so it can't flip pass ordering.
penalty = min(abs(amt_diff) * 10, 50)
if date_diff_days is not None:
penalty += min(abs(date_diff_days), 50)
return {
"left_idx": li,
"right_idx": ri,
"score": base_score - penalty,
"amount_diff": amt_diff,
"date_diff_days": date_diff_days,
"desc_score": desc_score,
}
def _build_matched(
left: pd.DataFrame,
right: pd.DataFrame,
pairs: list[dict],
options: ReconcileOptions,
*,
review: bool = False,
) -> pd.DataFrame:
"""Assemble a matched/review frame: bookkeeping cols + originals."""
if not pairs:
cols = ["match_pass", "score", "amount_diff", "date_diff_days", "desc_score"]
cols += [f"left_{c}" for c in left.columns]
cols += [f"right_{c}" for c in right.columns]
return pd.DataFrame(columns=cols)
rows = []
for p in pairs:
li, ri = p["left_idx"], p["right_idx"]
row = {
"match_pass": p["pass"],
"score": p["score"],
"amount_diff": p["amount_diff"],
"date_diff_days": p["date_diff_days"],
"desc_score": p["desc_score"],
}
for c in left.columns:
row[f"left_{c}"] = left.at[li, c]
for c in right.columns:
row[f"right_{c}"] = right.at[ri, c]
rows.append(row)
out = pd.DataFrame(rows)
# Stable ordering: review by left_idx so paired rows stay adjacent;
# matched by score descending so the user sees the strongest pairs
# first.
if review:
out = out.sort_values("score", ascending=False, kind="stable")
else:
out = out.sort_values("score", ascending=False, kind="stable")
return out.reset_index(drop=True)

View File

@@ -0,0 +1,324 @@
"""DataTools Reconcile — Streamlit page.
Two-source reconciliation (e.g. bank feed vs. ledger): upload both
files, pick the amount/date columns on each side, choose tolerance
settings, then download four output CSVs (matched, unmatched-left,
unmatched-right, review).
"""
from __future__ import annotations
import io
import sys
from pathlib import Path
import pandas as pd
import streamlit as st
_project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.audit import log_event, log_page_open
from src.gui.components import (
back_to_home_link,
hide_streamlit_chrome,
html_download_button,
render_sticky_footer,
)
from src.core.reconcile import ReconcileOptions, reconcile
hide_streamlit_chrome()
render_sticky_footer()
back_to_home_link()
log_page_open("11_Reconciler")
# ---------------------------------------------------------------------------
# Header
# ---------------------------------------------------------------------------
st.title("Reconcile Two Files")
st.caption(
"Match transactions between two sources (e.g. bank feed vs. ledger). "
"Outputs four buckets: matched, unmatched-left, unmatched-right, and "
"ambiguous-for-review."
)
# ---------------------------------------------------------------------------
# File readers
# ---------------------------------------------------------------------------
@st.cache_data(show_spinner=False)
def _read_uploaded(name: str, data: bytes) -> pd.DataFrame:
"""Read uploaded bytes into a DataFrame. Mirrors the helper used by
other tool pages — keeps everything as strings so the user controls
coercion via the column-type selectors below."""
suffix = Path(name).suffix.lower()
bio = io.BytesIO(data)
if suffix in (".xlsx", ".xls"):
return pd.read_excel(bio, dtype=str, keep_default_na=False)
for enc in ("utf-8", "utf-8-sig", "latin-1"):
try:
bio.seek(0)
sep = "\t" if suffix == ".tsv" else ","
return pd.read_csv(
bio, dtype=str, keep_default_na=False,
encoding=enc, sep=sep, on_bad_lines="warn",
)
except UnicodeDecodeError:
continue
bio.seek(0)
return pd.read_csv(bio, dtype=str, keep_default_na=False, encoding="latin-1")
def _side_panel(side_label: str, key_prefix: str):
"""Render one side's upload + preview. Returns the DataFrame or None."""
st.markdown(f"**{side_label}**")
upload = st.file_uploader(
f"Upload {side_label.lower()} file (CSV / Excel)",
type=["csv", "tsv", "xlsx", "xls"],
key=f"{key_prefix}_upload",
label_visibility="collapsed",
)
if upload is None:
st.caption(f"_No {side_label.lower()} file yet._")
return None, None
try:
df = _read_uploaded(upload.name, upload.getvalue())
except Exception as e:
st.error(f"Could not read `{upload.name}`: {e}")
return None, None
st.caption(f"`{upload.name}` — {len(df)} rows, {len(df.columns)} columns")
with st.expander(f"Preview {side_label.lower()}", expanded=False):
st.dataframe(df.head(10), width="stretch")
return df, upload.name
# ---------------------------------------------------------------------------
# Side-by-side upload
# ---------------------------------------------------------------------------
col_left, col_right = st.columns(2)
with col_left:
left_df, left_name = _side_panel("Left (e.g. bank feed)", "left")
with col_right:
right_df, right_name = _side_panel("Right (e.g. ledger)", "right")
if left_df is None or right_df is None:
st.info("Upload both files to continue.")
st.stop()
# ---------------------------------------------------------------------------
# Column mapping
# ---------------------------------------------------------------------------
st.divider()
st.subheader("Match settings")
map_left, map_right = st.columns(2)
def _col_pick(label: str, df: pd.DataFrame, key: str, *, allow_none: bool):
"""Selectbox for picking a column. Optional 'None' slot for date/desc."""
cols = list(df.columns)
if allow_none:
cols = ["(none)"] + cols
pick = st.selectbox(label, cols, key=key)
return None if pick == "(none)" else pick
with map_left:
st.markdown("**Left columns**")
left_amount = _col_pick("Amount column", left_df, "left_amount_col", allow_none=False)
left_date = _col_pick("Date column (optional)", left_df, "left_date_col", allow_none=True)
left_desc = _col_pick("Description column (optional)", left_df, "left_desc_col", allow_none=True)
left_keys = st.multiselect(
"Reference columns (optional, e.g. check / invoice no.)",
list(left_df.columns), key="left_keys_col",
)
with map_right:
st.markdown("**Right columns**")
right_amount = _col_pick("Amount column", right_df, "right_amount_col", allow_none=False)
right_date = _col_pick("Date column (optional)", right_df, "right_date_col", allow_none=True)
right_desc = _col_pick("Description column (optional)", right_df, "right_desc_col", allow_none=True)
right_keys = st.multiselect(
"Reference columns (must match left count)",
list(right_df.columns), key="right_keys_col",
)
# ---------------------------------------------------------------------------
# Tolerances & options
# ---------------------------------------------------------------------------
with st.expander("Tolerances & options", expanded=True):
tol_a, tol_b, tol_c = st.columns(3)
with tol_a:
amount_tolerance = st.number_input(
"Amount tolerance",
min_value=0.0, value=0.0, step=0.01, format="%.4f",
help="Absolute tolerance on amount (e.g. 0.01 to absorb cent rounding).",
)
with tol_b:
date_tolerance = st.number_input(
"Date tolerance (days)",
min_value=0, value=0, step=1,
help="Allow N calendar days of drift between posting dates.",
)
with tol_c:
invert_right_sign = st.checkbox(
"Invert right amount sign",
value=False,
help="Use when one side records debits as positive and the other as negative.",
)
desc_min_score = st.slider(
"Description similarity boost (0 disables)",
min_value=0, max_value=100, value=0, step=5,
help=(
"When both sides have a description column set, accept matches with "
"this minimum fuzzy similarity even if amount/date are merely within "
"tolerance. Lower = more permissive."
),
)
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
st.divider()
if st.button("Reconcile", type="primary", width="stretch"):
if len(left_keys) != len(right_keys):
st.error(
"Reference columns must match in count: "
f"left has {len(left_keys)}, right has {len(right_keys)}."
)
st.stop()
options = ReconcileOptions(
left_amount=left_amount,
right_amount=right_amount,
left_date=left_date,
right_date=right_date,
left_keys=list(left_keys),
right_keys=list(right_keys),
left_desc=left_desc,
right_desc=right_desc,
desc_min_score=int(desc_min_score),
amount_tolerance=float(amount_tolerance),
date_tolerance_days=int(date_tolerance),
invert_right_sign=bool(invert_right_sign),
)
with st.spinner("Reconciling..."):
try:
result = reconcile(left_df, right_df, options)
except ValueError as e:
st.error(str(e))
st.stop()
st.session_state["reconcile_result"] = result
st.session_state["reconcile_left_name"] = left_name
log_event("tool_run", "Reconcile run", page="11_Reconciler")
result = st.session_state.get("reconcile_result")
if result is None:
st.stop()
# ---------------------------------------------------------------------------
# Results
# ---------------------------------------------------------------------------
st.subheader("Results")
stats = result.stats
m1, m2, m3, m4 = st.columns(4)
m1.metric("Matched", stats["matched"])
m2.metric("Review", stats["review"])
m3.metric("Unmatched left", stats["unmatched_left"])
m4.metric("Unmatched right", stats["unmatched_right"])
# Health bar: matched / max(left, right)
denom = max(stats["left_rows"], stats["right_rows"]) or 1
pct = stats["matched"] / denom * 100
st.caption(f"Coverage: {pct:.1f}% of the larger side")
tab_matched, tab_review, tab_left, tab_right = st.tabs(
[
f"Matched ({stats['matched']})",
f"Review ({stats['review']})",
f"Unmatched left ({stats['unmatched_left']})",
f"Unmatched right ({stats['unmatched_right']})",
]
)
with tab_matched:
if result.matched.empty:
st.info("No matches.")
else:
st.dataframe(result.matched, width="stretch", hide_index=True)
with tab_review:
if result.review.empty:
st.info("Nothing to review — no ambiguous candidates.")
else:
st.caption(
"Pairs flagged because the algorithm couldn't pick a single "
"best match (e.g. multiple equally-good candidates). Use the "
"left/right indices to disambiguate manually."
)
st.dataframe(result.review, width="stretch", hide_index=True)
with tab_left:
if result.unmatched_left.empty:
st.info("Every left row was matched.")
else:
st.dataframe(result.unmatched_left, width="stretch", hide_index=True)
with tab_right:
if result.unmatched_right.empty:
st.info("Every right row was matched.")
else:
st.dataframe(result.unmatched_right, width="stretch", hide_index=True)
# ---------------------------------------------------------------------------
# Downloads
# ---------------------------------------------------------------------------
st.divider()
stem = Path(st.session_state.get("reconcile_left_name", "reconcile")).stem
dl_a, dl_b, dl_c, dl_d = st.columns(4)
with dl_a:
html_download_button(
"Matched CSV",
result.matched.to_csv(index=False).encode("utf-8-sig"),
file_name=f"{stem}_matched.csv",
mime="text/csv",
disabled=result.matched.empty,
)
with dl_b:
html_download_button(
"Review CSV",
result.review.to_csv(index=False).encode("utf-8-sig"),
file_name=f"{stem}_review.csv",
mime="text/csv",
disabled=result.review.empty,
)
with dl_c:
html_download_button(
"Unmatched left",
result.unmatched_left.to_csv(index=False).encode("utf-8-sig"),
file_name=f"{stem}_unmatched_left.csv",
mime="text/csv",
disabled=result.unmatched_left.empty,
)
with dl_d:
html_download_button(
"Unmatched right",
result.unmatched_right.to_csv(index=False).encode("utf-8-sig"),
file_name=f"{stem}_unmatched_right.csv",
mime="text/csv",
disabled=result.unmatched_right.empty,
)

View File

@@ -157,6 +157,18 @@ TOOLS: list[Tool] = [
status="Ready",
section="transformations",
),
Tool(
tool_id="11_reconciler",
icon=":material/compare_arrows:",
name="Reconcile Two Files",
description=(
"Match transactions between two sources (e.g. bank feed vs. "
"ledger) with amount and date tolerance."
),
page_slug="11_Reconciler",
status="Ready",
section="automations",
),
]

317
tests/test_reconcile.py Normal file
View File

@@ -0,0 +1,317 @@
"""Tests for src.core.reconcile — two-source matching engine."""
import pandas as pd
import pytest
from src.core.reconcile import (
ReconcileOptions,
ReconcileResult,
reconcile,
)
def _bank(rows):
return pd.DataFrame(rows, columns=["date", "amount", "desc"])
def _ledger(rows):
return pd.DataFrame(rows, columns=["posted", "amt", "memo"])
class TestExactMatch:
def test_one_to_one_exact(self):
left = _bank([
("2026-01-05", 100.00, "ACME"),
("2026-01-06", 250.00, "WIDGET CO"),
])
right = _ledger([
("2026-01-05", 100.00, "Acme Inc"),
("2026-01-06", 250.00, "Widget"),
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert result.stats["matched"] == 2
assert result.stats["unmatched_left"] == 0
assert result.stats["unmatched_right"] == 0
assert (result.matched["match_pass"] == "exact").all()
def test_unmatched_left_and_right(self):
left = _bank([
("2026-01-05", 100.00, "ACME"),
("2026-01-07", 99.99, "ONLY ON LEFT"),
])
right = _ledger([
("2026-01-05", 100.00, "Acme"),
("2026-01-08", 500.00, "Only on right"),
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert result.stats["matched"] == 1
assert result.stats["unmatched_left"] == 1
assert result.stats["unmatched_right"] == 1
# The unmatched rows preserve their original columns.
assert "ONLY ON LEFT" in result.unmatched_left["desc"].tolist()
assert "Only on right" in result.unmatched_right["memo"].tolist()
def test_amount_only_no_date(self):
# No date columns set — match purely on amount. Distinct
# amounts pair off one-to-one.
left = _bank([
("2026-01-01", 42.50, "A"),
("2026-02-15", 99.00, "B"),
])
right = _ledger([
("2099-12-31", 42.50, "X"),
("1970-01-01", 99.00, "Y"),
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
))
assert result.stats["matched"] == 2
def test_identical_amounts_with_no_date_are_ambiguous(self):
# Without a date column to disambiguate, two left rows with
# the same amount and two right rows with the same amount
# are genuinely undecidable — route to review.
left = _bank([
("2026-01-01", 42.50, "A"),
("2026-02-15", 42.50, "B"),
])
right = _ledger([
("2099-12-31", 42.50, "X"),
("1970-01-01", 42.50, "Y"),
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
))
assert result.stats["matched"] == 0
assert result.stats["review"] >= 2
class TestAmountTolerance:
def test_amount_within_tolerance(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.02, "X")])
# Exact pass misses (100.00 != 100.02). Tolerance pass catches it.
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
amount_tolerance=0.05,
))
assert result.stats["matched"] == 1
assert result.matched.iloc[0]["match_pass"] == "tolerance"
assert abs(result.matched.iloc[0]["amount_diff"] - -0.02) < 1e-9
def test_outside_tolerance_unmatched(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.50, "X")])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
amount_tolerance=0.05,
))
assert result.stats["matched"] == 0
assert result.stats["unmatched_left"] == 1
assert result.stats["unmatched_right"] == 1
class TestDateWindow:
def test_date_within_window(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-07", 100.00, "X")]) # 2 days later
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
date_tolerance_days=3,
))
assert result.stats["matched"] == 1
assert result.matched.iloc[0]["date_diff_days"] == -2
def test_date_outside_window(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-20", 100.00, "X")]) # 15 days later
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
date_tolerance_days=5,
))
assert result.stats["matched"] == 0
class TestSignInversion:
def test_invert_right_sign(self):
# Bank: deposit = +100 ; Ledger: deposit recorded as -100.
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", -100.00, "X")])
# Without inversion: no match.
r1 = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert r1.stats["matched"] == 0
# With inversion: match.
r2 = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
invert_right_sign=True,
))
assert r2.stats["matched"] == 1
class TestAmbiguity:
def test_two_equal_candidates_go_to_review(self):
# One left row, two identical right rows → ambiguous.
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([
("2026-01-05", 100.00, "X"),
("2026-01-05", 100.00, "Y"),
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert result.stats["matched"] == 0
assert result.stats["review"] == 2 # both candidate pairs flagged
# Left was consumed by the ambiguity, both rights too.
assert result.stats["unmatched_left"] == 0
assert result.stats["unmatched_right"] == 0
def test_uniquely_better_match_wins(self):
# Two left rows, two right rows; one pair is a closer match.
left = _bank([
("2026-01-05", 100.00, "A"),
("2026-01-05", 100.05, "B"),
])
right = _ledger([
("2026-01-05", 100.00, "X"), # closer to A
("2026-01-05", 100.05, "Y"), # closer to B
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
amount_tolerance=0.10,
))
# Both should pair uniquely on the exact pass (penalty inside
# exact pass breaks the symmetric near-ties).
assert result.stats["matched"] == 2
class TestKeyMatch:
def test_reference_number_authoritative(self):
# Same check number, same amount, different posting dates.
# Key match should pair them even though dates differ.
left = pd.DataFrame([
{"date": "2026-01-05", "amount": 100.00, "check_no": "1042"},
])
right = pd.DataFrame([
{"posted": "2026-01-12", "amt": 100.00, "ref": "1042"},
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
left_keys=["check_no"], right_keys=["ref"],
date_tolerance_days=0, # exact-pass would miss
))
assert result.stats["matched"] == 1
assert result.matched.iloc[0]["match_pass"] == "key"
def test_key_requires_amount_to_tie(self):
# Same ref but mismatched amounts → not a key match.
left = pd.DataFrame([
{"date": "2026-01-05", "amount": 100.00, "check_no": "1042"},
])
right = pd.DataFrame([
{"posted": "2026-01-05", "amt": 200.00, "ref": "1042"},
])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
left_keys=["check_no"], right_keys=["ref"],
))
assert result.stats["matched"] == 0
class TestInputValidation:
def test_missing_amount_columns(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.00, "X")])
with pytest.raises(ValueError, match="left_amount"):
reconcile(left, right, ReconcileOptions(
right_amount="amt",
))
def test_left_date_without_right_date(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.00, "X")])
with pytest.raises(ValueError, match="both be set or both be None"):
reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", # right_date missing
))
def test_mismatched_key_lengths(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.00, "X")])
with pytest.raises(ValueError, match="same length"):
reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_keys=["a", "b"], right_keys=["x"],
))
def test_negative_tolerance_rejected(self):
left = _bank([("2026-01-05", 100.00, "A")])
right = _ledger([("2026-01-05", 100.00, "X")])
with pytest.raises(ValueError, match="amount_tolerance"):
reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
amount_tolerance=-0.01,
))
class TestUnparseableInputs:
def test_non_numeric_amount_falls_through(self):
# Left row with garbage amount should land in unmatched_left
# (it can't participate in matching but must be visible).
left = pd.DataFrame([
{"date": "2026-01-05", "amount": "not a number", "desc": "BAD"},
{"date": "2026-01-05", "amount": 100.00, "desc": "OK"},
])
right = _ledger([("2026-01-05", 100.00, "X")])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert result.stats["matched"] == 1
# The garbage row appears in unmatched_left.
assert "BAD" in result.unmatched_left["desc"].tolist()
class TestResultShape:
def test_matched_carries_both_sides(self):
left = _bank([("2026-01-05", 100.00, "ACME")])
right = _ledger([("2026-01-05", 100.00, "Acme Inc")])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
row = result.matched.iloc[0]
assert row["left_desc"] == "ACME"
assert row["right_memo"] == "Acme Inc"
assert row["left_amount"] == 100.00
assert row["right_amt"] == 100.00
def test_empty_inputs_return_empty_result(self):
left = _bank([])
right = _ledger([])
result = reconcile(left, right, ReconcileOptions(
left_amount="amount", right_amount="amt",
left_date="date", right_date="posted",
))
assert result.stats["matched"] == 0
assert result.matched.empty
assert result.unmatched_left.empty
assert result.unmatched_right.empty