diff --git a/src/cli_reconcile.py b/src/cli_reconcile.py new file mode 100644 index 0000000..6283069 --- /dev/null +++ b/src/cli_reconcile.py @@ -0,0 +1,198 @@ +"""CLI for the DataTools reconciliation tool. + +Usage: + python -m src.cli_reconcile bank.csv ledger.csv \\ + --left-amount amount --right-amount amt \\ + --left-date date --right-date posted # dry-run preview + python -m src.cli_reconcile bank.csv ledger.csv \\ + --left-amount amount --right-amount amt \\ + --left-date date --right-date posted --apply # write matched/unmatched CSVs + python -m src.cli_reconcile --help # full help + +Outputs (with --apply) sit beside the LEFT input file: + {stem}_matched.csv one row per accepted pair + {stem}_unmatched_left.csv left rows with no counterpart + {stem}_unmatched_right.csv right rows with no counterpart + {stem}_review.csv ambiguous pairs flagged for review +""" + +from __future__ import annotations + +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional + +import typer +from loguru import logger + +app = typer.Typer( + name="reconcile", + help=( + "Reconcile two data sources (e.g. bank feed vs. ledger export).\n\n" + "By default, runs in preview mode — shows the match stats without " + "writing anything. Add --apply to write the four output CSVs.\n\n" + "Examples:\n\n" + " # Bank feed vs ledger, exact match\n" + " python -m src.cli_reconcile bank.csv ledger.csv \\\n" + " --left-amount amount --right-amount amt \\\n" + " --left-date date --right-date posted\n\n" + " # Allow 2-day posting drift and a cent of rounding tolerance\n" + " python -m src.cli_reconcile bank.csv ledger.csv \\\n" + " --left-amount amount --right-amount amt \\\n" + " --left-date date --right-date posted \\\n" + " --date-tolerance 2 --amount-tolerance 0.01 --apply\n\n" + " # Bank shows debits as positive; ledger as negative\n" + " python -m src.cli_reconcile bank.csv ledger.csv \\\n" + " --left-amount amount --right-amount amt --invert-right-sign --apply\n" + ), + add_completion=False, + no_args_is_help=True, +) + + +def _setup_logging(log_dir: Path) -> Path: + log_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + log_path = log_dir / f"reconcile_{ts}.log" + logger.remove() + logger.add(sys.stderr, level="WARNING", format="{message}") + logger.add( + str(log_path), + level="DEBUG", + format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}", + ) + return log_path + + +def _split_csv_arg(raw: Optional[str]) -> list[str]: + if raw is None: + return [] + return [c.strip() for c in raw.split(",") if c.strip()] + + +@app.command() +def run( + left_file: str = typer.Argument(..., help="Path to the LEFT input (e.g. bank feed)."), + right_file: str = typer.Argument(..., help="Path to the RIGHT input (e.g. ledger)."), + left_amount: str = typer.Option(..., "--left-amount", help="Amount column on the LEFT."), + right_amount: str = typer.Option(..., "--right-amount", help="Amount column on the RIGHT."), + left_date: Optional[str] = typer.Option(None, "--left-date", help="Date column on the LEFT."), + right_date: Optional[str] = typer.Option(None, "--right-date", help="Date column on the RIGHT."), + left_keys: Optional[str] = typer.Option( + None, "--left-keys", + help="Comma-separated reference/key columns on the LEFT (paired with --right-keys).", + ), + right_keys: Optional[str] = typer.Option( + None, "--right-keys", + help="Comma-separated reference/key columns on the RIGHT (paired with --left-keys).", + ), + left_desc: Optional[str] = typer.Option(None, "--left-desc", help="Description column on the LEFT (fuzzy)."), + right_desc: Optional[str] = typer.Option(None, "--right-desc", help="Description column on the RIGHT (fuzzy)."), + desc_min_score: int = typer.Option( + 0, "--desc-min-score", + help="Min description similarity (0-100) to accept a fuzzy match. 0 disables.", + ), + amount_tolerance: float = typer.Option( + 0.0, "--amount-tolerance", + help="Absolute amount tolerance (e.g. 0.01 to absorb cent-rounding).", + ), + date_tolerance: int = typer.Option( + 0, "--date-tolerance", + help="Date tolerance in calendar days (± N).", + ), + invert_right_sign: bool = typer.Option( + False, "--invert-right-sign", + help="Negate the RIGHT amount before matching (use when sign conventions differ).", + ), + apply: bool = typer.Option( + False, "--apply", + help="Write the four output CSV files. Without this flag, only stats are shown.", + ), +): + """Reconcile two CSV/Excel files.""" + from src.core.io import read_file, write_file + from src.core.reconcile import ReconcileOptions, reconcile + + left_path = Path(left_file) + right_path = Path(right_file) + for p in (left_path, right_path): + if not p.exists(): + typer.echo(f"Error: File not found: {p}", err=True) + raise typer.Exit(1) + + log_path = _setup_logging(Path("logs")) + + typer.echo(f"Reading {left_path.name}...") + try: + left_df = read_file(left_path) + except Exception as e: + typer.echo(f"Error reading {left_path.name}: {e}", err=True) + raise typer.Exit(1) + typer.echo(f" {len(left_df)} rows, {len(left_df.columns)} columns") + + typer.echo(f"Reading {right_path.name}...") + try: + right_df = read_file(right_path) + except Exception as e: + typer.echo(f"Error reading {right_path.name}: {e}", err=True) + raise typer.Exit(1) + typer.echo(f" {len(right_df)} rows, {len(right_df.columns)} columns") + + options = ReconcileOptions( + left_amount=left_amount, + right_amount=right_amount, + left_date=left_date, + right_date=right_date, + left_keys=_split_csv_arg(left_keys), + right_keys=_split_csv_arg(right_keys), + left_desc=left_desc, + right_desc=right_desc, + desc_min_score=desc_min_score, + amount_tolerance=amount_tolerance, + date_tolerance_days=date_tolerance, + invert_right_sign=invert_right_sign, + ) + + typer.echo("Reconciling...") + try: + result = reconcile(left_df, right_df, options) + except ValueError as e: + typer.echo(f"Error: {e}", err=True) + raise typer.Exit(1) + + _print_stats(result.stats) + + if apply: + stem = left_path.stem + out_dir = left_path.parent + write_file(result.matched, out_dir / f"{stem}_matched.csv") + write_file(result.unmatched_left, out_dir / f"{stem}_unmatched_left.csv") + write_file(result.unmatched_right, out_dir / f"{stem}_unmatched_right.csv") + write_file(result.review, out_dir / f"{stem}_review.csv") + typer.echo(f"\nWrote 4 files to {out_dir}:") + for suffix in ("matched", "unmatched_left", "unmatched_right", "review"): + typer.echo(f" {stem}_{suffix}.csv") + else: + typer.echo("\nThis was a preview. Add --apply to write the output files.") + + typer.echo(f"Log: {log_path}") + + +def _print_stats(stats: dict) -> None: + typer.echo(f"\n{'─'*50}") + typer.echo(f" Left rows: {stats['left_rows']}") + typer.echo(f" Right rows: {stats['right_rows']}") + typer.echo(f" Matched: {stats['matched']}") + typer.echo(f" Review (ambiguous): {stats['review']}") + typer.echo(f" Unmatched left: {stats['unmatched_left']}") + typer.echo(f" Unmatched right: {stats['unmatched_right']}") + typer.echo(f"{'─'*50}") + + +def main(): + app() + + +if __name__ == "__main__": + main() diff --git a/src/core/reconcile.py b/src/core/reconcile.py new file mode 100644 index 0000000..c3ac1ac --- /dev/null +++ b/src/core/reconcile.py @@ -0,0 +1,598 @@ +"""Two-source data reconciliation. + +Given two DataFrames (typically a bank/credit-card feed and a ledger +export), find which rows on the left correspond to rows on the right +based on amount, date, and optional reference/description fields. + +Output buckets: + matched — one row per accepted pair, with both originals. + unmatched_left — left rows with no acceptable right counterpart. + unmatched_right — right rows with no acceptable left counterpart. + review — ambiguous cases (a left row had >1 equally good + right candidates, or vice versa) surfaced for the + user to disambiguate manually. + +Matching strategy is a multi-pass greedy one-to-one assignment: + Pass 1: exact key match (when ``key_columns`` is set on either side) + Pass 2: exact (amount, date) match + Pass 3: amount within tolerance AND date within window + Pass 4: + optional description fuzzy similarity boost + +Within each pass, candidate pairs are scored and assigned greedily by +descending score; ties for the same left row that span multiple right +rows (or vice versa) are sent to ``review`` instead of being matched +arbitrarily. + +The module is pure: no I/O, no Streamlit, no logging side effects beyond +loguru. Caller drives file reading and result rendering. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Optional + +import pandas as pd +from loguru import logger + +try: + from rapidfuzz import fuzz as _rf_fuzz + _HAS_RAPIDFUZZ = True +except ImportError: # pragma: no cover — rapidfuzz is in requirements.txt + _HAS_RAPIDFUZZ = False + + +# --------------------------------------------------------------------------- +# Options & result +# --------------------------------------------------------------------------- + + +@dataclass +class ReconcileOptions: + """Configuration for :func:`reconcile`. + + ``left_amount`` / ``right_amount`` are required: every match needs + an amount to anchor on. Everything else is optional. + """ + + # Amount columns (required). Values are coerced to float; non-numeric + # rows are dropped from matching but appear in the unmatched buckets. + left_amount: str = "" + right_amount: str = "" + + # Date columns. When both are set, candidates must fall within + # ``date_tolerance_days``. When unset, date is ignored entirely. + left_date: Optional[str] = None + right_date: Optional[str] = None + + # Optional reference / key columns for exact-match Pass 1. List + # forms must be the same length so the i-th left key pairs with the + # i-th right key (e.g. ``["check_no"]`` ↔ ``["ref"]``). + left_keys: list[str] = field(default_factory=list) + right_keys: list[str] = field(default_factory=list) + + # Description columns for fuzzy similarity boost (optional). Only + # used when ``desc_min_score`` > 0 AND rapidfuzz is installed. + left_desc: Optional[str] = None + right_desc: Optional[str] = None + desc_min_score: int = 0 # 0–100; 0 disables fuzzy. + + # Tolerances. Defaults are exact match. + amount_tolerance: float = 0.0 # absolute (e.g. 0.01 for cent rounding) + date_tolerance_days: int = 0 # ± N calendar days + + # Some bank feeds use opposite sign convention from the ledger + # (debits positive vs. negative). Flipping this multiplies the + # right side's amount by -1 before matching. + invert_right_sign: bool = False + + +@dataclass +class ReconcileResult: + """Outcome of a reconcile run. + + All four DataFrames preserve the original columns from each side, + prefixed with ``left_`` and ``right_`` where applicable, plus a + small set of bookkeeping columns (``match_pass``, ``amount_diff``, + ``date_diff_days``, ``desc_score``). + """ + + matched: pd.DataFrame + unmatched_left: pd.DataFrame + unmatched_right: pd.DataFrame + review: pd.DataFrame + stats: dict[str, int] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def reconcile( + left: pd.DataFrame, + right: pd.DataFrame, + options: ReconcileOptions, +) -> ReconcileResult: + """Reconcile *left* against *right* using *options*. + + Neither input is mutated. The result's frames hold copies of the + relevant rows from the originals, joined via the bookkeeping + columns described on :class:`ReconcileResult`. + """ + _validate_options(left, right, options) + + # Normalize amounts and dates to typed columns we can reason about + # without re-parsing per pass. The work columns live on copies so + # callers' inputs are untouched. + L = _prep_side(left, options, side="left") + R = _prep_side(right, options, side="right") + + # Track which left/right indices remain unmatched across passes. + # Seeded from the FULL input frame, not the prepped one — rows + # dropped during prep (unparseable amount/date) must still surface + # in the unmatched bucket so users can see they exist. Candidate + # generators iterate L.index, so prep-dropped rows simply never + # get claimed. + left_open: set = set(left.index) + right_open: set = set(right.index) + + matched_pairs: list[dict] = [] + review_pairs: list[dict] = [] + + # Pass 1 — exact key match on user-supplied reference columns. + if options.left_keys and options.right_keys: + _run_pass( + L, R, left_open, right_open, matched_pairs, review_pairs, + options=options, pass_name="key", + candidate_fn=_candidates_by_key, + ) + + # Pass 2 — exact (amount, date) match. + _run_pass( + L, R, left_open, right_open, matched_pairs, review_pairs, + options=options, pass_name="exact", + candidate_fn=_candidates_exact, + ) + + # Pass 3 — tolerance-window match. + if options.amount_tolerance > 0 or options.date_tolerance_days > 0: + _run_pass( + L, R, left_open, right_open, matched_pairs, review_pairs, + options=options, pass_name="tolerance", + candidate_fn=_candidates_tolerance, + ) + + # Pass 4 — description fuzzy boost (only over what's left). + if ( + options.desc_min_score > 0 + and options.left_desc + and options.right_desc + and _HAS_RAPIDFUZZ + ): + _run_pass( + L, R, left_open, right_open, matched_pairs, review_pairs, + options=options, pass_name="fuzzy", + candidate_fn=_candidates_fuzzy, + ) + + # Build the four output frames from what remains. + matched_df = _build_matched(left, right, matched_pairs, options) + review_df = _build_matched(left, right, review_pairs, options, review=True) + unmatched_left_df = left.loc[sorted(left_open)].copy() + unmatched_right_df = right.loc[sorted(right_open)].copy() + + stats = { + "left_rows": len(left), + "right_rows": len(right), + "matched": len(matched_pairs), + "review": len(review_pairs), + "unmatched_left": len(unmatched_left_df), + "unmatched_right": len(unmatched_right_df), + } + logger.debug("reconcile stats: {}", stats) + + return ReconcileResult( + matched=matched_df, + unmatched_left=unmatched_left_df, + unmatched_right=unmatched_right_df, + review=review_df, + stats=stats, + ) + + +# --------------------------------------------------------------------------- +# Input validation & prep +# --------------------------------------------------------------------------- + + +def _validate_options( + left: pd.DataFrame, right: pd.DataFrame, options: ReconcileOptions +) -> None: + if not options.left_amount or not options.right_amount: + raise ValueError( + "Reconcile requires both left_amount and right_amount columns." + ) + if options.left_amount not in left.columns: + raise ValueError( + f"left_amount column {options.left_amount!r} not in left DataFrame." + ) + if options.right_amount not in right.columns: + raise ValueError( + f"right_amount column {options.right_amount!r} not in right DataFrame." + ) + if bool(options.left_date) != bool(options.right_date): + raise ValueError( + "left_date and right_date must both be set or both be None." + ) + if options.left_date and options.left_date not in left.columns: + raise ValueError(f"left_date column {options.left_date!r} not in left.") + if options.right_date and options.right_date not in right.columns: + raise ValueError(f"right_date column {options.right_date!r} not in right.") + if len(options.left_keys) != len(options.right_keys): + raise ValueError( + "left_keys and right_keys must be the same length " + f"(got {len(options.left_keys)} vs {len(options.right_keys)})." + ) + for c in options.left_keys: + if c not in left.columns: + raise ValueError(f"left key column {c!r} not in left DataFrame.") + for c in options.right_keys: + if c not in right.columns: + raise ValueError(f"right key column {c!r} not in right DataFrame.") + if options.amount_tolerance < 0: + raise ValueError("amount_tolerance must be >= 0.") + if options.date_tolerance_days < 0: + raise ValueError("date_tolerance_days must be >= 0.") + if not (0 <= options.desc_min_score <= 100): + raise ValueError("desc_min_score must be between 0 and 100.") + + +def _prep_side( + df: pd.DataFrame, options: ReconcileOptions, side: str +) -> pd.DataFrame: + """Return a copy with ``_amt`` and ``_date`` work columns added. + + Rows whose amount cannot be parsed as a number are dropped from the + matching frame so they fall through to the unmatched bucket on the + caller side. The same is true for unparseable dates when date + matching is in use — date is required-when-configured. + """ + work = df.copy() + amt_col = options.left_amount if side == "left" else options.right_amount + date_col = options.left_date if side == "left" else options.right_date + + work["_amt"] = pd.to_numeric(work[amt_col], errors="coerce") + if side == "right" and options.invert_right_sign: + work["_amt"] = -work["_amt"] + + if date_col: + work["_date"] = pd.to_datetime(work[date_col], errors="coerce") + else: + work["_date"] = pd.NaT + + # Drop rows that lack the inputs needed to participate. Their + # original index labels are intentionally preserved on the source + # frame so they show up in unmatched buckets below. + bad_amt = work["_amt"].isna() + bad_date = work["_date"].isna() if date_col else pd.Series(False, index=work.index) + keep = ~(bad_amt | bad_date) + if (~keep).any(): + logger.debug( + "{} side: dropping {} row(s) with unparseable amount/date", + side, (~keep).sum(), + ) + return work.loc[keep].copy() + + +# --------------------------------------------------------------------------- +# Per-pass orchestration +# --------------------------------------------------------------------------- + + +def _run_pass( + L: pd.DataFrame, + R: pd.DataFrame, + left_open: set, + right_open: set, + matched_pairs: list[dict], + review_pairs: list[dict], + *, + options: ReconcileOptions, + pass_name: str, + candidate_fn, +) -> None: + """Run one matching pass over the still-open indices. + + The pass collects (left_idx, right_idx, score, extras) candidates + from ``candidate_fn``, then greedily assigns by descending score. + A left row with two right candidates tied at the top score (and + vice versa) gets routed to the review bucket so we don't pick one + arbitrarily. + """ + L_open = L.loc[L.index.intersection(left_open)] + R_open = R.loc[R.index.intersection(right_open)] + if L_open.empty or R_open.empty: + return + + candidates = candidate_fn(L_open, R_open, options) + if not candidates: + return + + # Group candidates by left index. For each left row, partition into + # confident-best (single top score) vs. ambiguous (top score tied). + by_left: dict = {} + for cand in candidates: + by_left.setdefault(cand["left_idx"], []).append(cand) + + # Two-pointer assignment by best-score-first, with reverse-direction + # ambiguity check so a right row claimed by two equally-good lefts + # also routes to review. + by_right_top: dict = {} + for li, cands in by_left.items(): + cands.sort(key=lambda c: c["score"], reverse=True) + top = cands[0]["score"] + leaders = [c for c in cands if c["score"] == top] + for c in leaders: + by_right_top.setdefault(c["right_idx"], []).append(c) + + # Sort left rows by their leader's score so high-confidence matches + # claim their right counterpart first; low-confidence rows lose + # contention if the right row was already taken. + left_order = sorted( + by_left.keys(), + key=lambda li: -by_left[li][0]["score"], + ) + + for li in left_order: + if li not in left_open: + continue + cands = by_left[li] + top_score = cands[0]["score"] + leaders = [c for c in cands if c["score"] == top_score] + + # Filter to still-open right indices. + leaders = [c for c in leaders if c["right_idx"] in right_open] + if not leaders: + continue + + if len(leaders) > 1: + # Left row is ambiguous on its own side — multiple equally + # good right candidates remain. Park them all in review. + for c in leaders: + review_pairs.append({**c, "pass": pass_name}) + left_open.discard(li) + for c in leaders: + right_open.discard(c["right_idx"]) + continue + + pick = leaders[0] + ri = pick["right_idx"] + + # Mirror check: is the right row contested by another left at + # the same top score? If so, both lefts go to review and the + # right row is consumed. + contenders = [ + c for c in by_right_top.get(ri, []) + if c["left_idx"] in left_open and c["score"] == pick["score"] + ] + if len(contenders) > 1: + for c in contenders: + review_pairs.append({**c, "pass": pass_name}) + left_open.discard(c["left_idx"]) + right_open.discard(ri) + continue + + matched_pairs.append({**pick, "pass": pass_name}) + left_open.discard(li) + right_open.discard(ri) + + +# --------------------------------------------------------------------------- +# Candidate generators (one per pass) +# --------------------------------------------------------------------------- + + +def _candidates_by_key( + L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions +) -> list[dict]: + """Exact match on the user-supplied key columns + same amount. + + Amount must still tie out; otherwise a shared reference number + (e.g. a check number reused across years) would over-match. We do + NOT require date in this pass — the assumption is that a confirmed + reference like an invoice number is authoritative even when the + posting date drifts. + """ + if not options.left_keys: + return [] + # Build a composite key on each side as a tuple of stringified values. + L_key = L[options.left_keys].astype(str).agg("|".join, axis=1) + R_key = R[options.right_keys].astype(str).agg("|".join, axis=1) + R_by_key: dict = {} + for ri, k in R_key.items(): + R_by_key.setdefault(k, []).append(ri) + + out: list[dict] = [] + for li, k in L_key.items(): + if k == "" or k == "|".join([""] * len(options.left_keys)): + continue + for ri in R_by_key.get(k, []): + if abs(L.at[li, "_amt"] - R.at[ri, "_amt"]) <= options.amount_tolerance: + out.append(_score_pair(L, R, li, ri, base_score=1000)) + return out + + +def _candidates_exact( + L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions +) -> list[dict]: + """Exact match on amount (and date if configured).""" + out: list[dict] = [] + has_date = options.left_date is not None + # Bucket right side by amount for cheap lookup. + R_by_amt: dict = {} + for ri, amt in R["_amt"].items(): + R_by_amt.setdefault(amt, []).append(ri) + + for li, amt in L["_amt"].items(): + for ri in R_by_amt.get(amt, []): + if has_date and L.at[li, "_date"] != R.at[ri, "_date"]: + continue + out.append(_score_pair(L, R, li, ri, base_score=900)) + return out + + +def _candidates_tolerance( + L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions +) -> list[dict]: + """Amount within tolerance and (if configured) date within window. + + Quadratic in the open set size. For typical reconciliation sizes + (a month of statements: low thousands of rows) this is fine; if a + user hands us 100k×100k we'll need a smarter blocking strategy. + """ + out: list[dict] = [] + has_date = options.left_date is not None + tol = options.amount_tolerance + win = pd.Timedelta(days=options.date_tolerance_days) if has_date else None + + R_amts = R["_amt"].to_numpy() + R_dates = R["_date"].to_numpy() if has_date else None + R_index = R.index.to_numpy() + + for li in L.index: + l_amt = L.at[li, "_amt"] + l_date = L.at[li, "_date"] if has_date else None + amt_ok = (R_amts >= l_amt - tol) & (R_amts <= l_amt + tol) + if has_date: + date_diff = R_dates - l_date.to_datetime64() + date_ok = (date_diff >= -win.to_timedelta64()) & ( + date_diff <= win.to_timedelta64() + ) + mask = amt_ok & date_ok + else: + mask = amt_ok + for ri in R_index[mask]: + out.append(_score_pair(L, R, li, ri, base_score=500)) + return out + + +def _candidates_fuzzy( + L: pd.DataFrame, R: pd.DataFrame, options: ReconcileOptions +) -> list[dict]: + """Tolerance-pass candidates re-scored by description similarity. + + Only kept when the description similarity meets the threshold AND + the amount is within tolerance. Score blends the two so a strong + description match outranks a marginal amount match within the same + pass. + """ + if not (_HAS_RAPIDFUZZ and options.left_desc and options.right_desc): + return [] + out: list[dict] = [] + has_date = options.left_date is not None + tol = options.amount_tolerance + win = pd.Timedelta(days=options.date_tolerance_days) if has_date else None + min_score = options.desc_min_score + + L_desc = L[options.left_desc].astype(str) + R_desc = R[options.right_desc].astype(str) + + for li in L.index: + l_amt = L.at[li, "_amt"] + l_date = L.at[li, "_date"] if has_date else None + l_text = L_desc.at[li] + for ri in R.index: + if abs(R.at[ri, "_amt"] - l_amt) > tol: + continue + if has_date: + diff = R.at[ri, "_date"] - l_date + if abs(diff) > win: + continue + score = int(_rf_fuzz.token_set_ratio(l_text, R_desc.at[ri])) + if score < min_score: + continue + # Base 300 keeps fuzzy below exact/tolerance passes; the + # 0–100 description score breaks ties within the pass. + out.append( + _score_pair(L, R, li, ri, base_score=300 + score, desc_score=score) + ) + return out + + +# --------------------------------------------------------------------------- +# Scoring & output assembly +# --------------------------------------------------------------------------- + + +def _score_pair( + L: pd.DataFrame, + R: pd.DataFrame, + li, + ri, + *, + base_score: int, + desc_score: int = 0, +) -> dict: + """Build the candidate record used by the assignment phase.""" + amt_diff = float(L.at[li, "_amt"] - R.at[ri, "_amt"]) + l_date = L.at[li, "_date"] + r_date = R.at[ri, "_date"] + if pd.notna(l_date) and pd.notna(r_date): + date_diff_days = int((l_date - r_date).days) + else: + date_diff_days = None + # Penalize larger differences within the same pass so closer matches + # win ties. Cap penalty so it can't flip pass ordering. + penalty = min(abs(amt_diff) * 10, 50) + if date_diff_days is not None: + penalty += min(abs(date_diff_days), 50) + return { + "left_idx": li, + "right_idx": ri, + "score": base_score - penalty, + "amount_diff": amt_diff, + "date_diff_days": date_diff_days, + "desc_score": desc_score, + } + + +def _build_matched( + left: pd.DataFrame, + right: pd.DataFrame, + pairs: list[dict], + options: ReconcileOptions, + *, + review: bool = False, +) -> pd.DataFrame: + """Assemble a matched/review frame: bookkeeping cols + originals.""" + if not pairs: + cols = ["match_pass", "score", "amount_diff", "date_diff_days", "desc_score"] + cols += [f"left_{c}" for c in left.columns] + cols += [f"right_{c}" for c in right.columns] + return pd.DataFrame(columns=cols) + + rows = [] + for p in pairs: + li, ri = p["left_idx"], p["right_idx"] + row = { + "match_pass": p["pass"], + "score": p["score"], + "amount_diff": p["amount_diff"], + "date_diff_days": p["date_diff_days"], + "desc_score": p["desc_score"], + } + for c in left.columns: + row[f"left_{c}"] = left.at[li, c] + for c in right.columns: + row[f"right_{c}"] = right.at[ri, c] + rows.append(row) + out = pd.DataFrame(rows) + # Stable ordering: review by left_idx so paired rows stay adjacent; + # matched by score descending so the user sees the strongest pairs + # first. + if review: + out = out.sort_values("score", ascending=False, kind="stable") + else: + out = out.sort_values("score", ascending=False, kind="stable") + return out.reset_index(drop=True) diff --git a/src/gui/pages/11_Reconciler.py b/src/gui/pages/11_Reconciler.py new file mode 100644 index 0000000..f9e8054 --- /dev/null +++ b/src/gui/pages/11_Reconciler.py @@ -0,0 +1,324 @@ +"""DataTools Reconcile — Streamlit page. + +Two-source reconciliation (e.g. bank feed vs. ledger): upload both +files, pick the amount/date columns on each side, choose tolerance +settings, then download four output CSVs (matched, unmatched-left, +unmatched-right, review). +""" + +from __future__ import annotations + +import io +import sys +from pathlib import Path + +import pandas as pd +import streamlit as st + +_project_root = Path(__file__).resolve().parent.parent.parent.parent +if str(_project_root) not in sys.path: + sys.path.insert(0, str(_project_root)) + +from src.audit import log_event, log_page_open +from src.gui.components import ( + back_to_home_link, + hide_streamlit_chrome, + html_download_button, + render_sticky_footer, +) +from src.core.reconcile import ReconcileOptions, reconcile + +hide_streamlit_chrome() +render_sticky_footer() +back_to_home_link() +log_page_open("11_Reconciler") + + +# --------------------------------------------------------------------------- +# Header +# --------------------------------------------------------------------------- + +st.title("Reconcile Two Files") +st.caption( + "Match transactions between two sources (e.g. bank feed vs. ledger). " + "Outputs four buckets: matched, unmatched-left, unmatched-right, and " + "ambiguous-for-review." +) + + +# --------------------------------------------------------------------------- +# File readers +# --------------------------------------------------------------------------- + + +@st.cache_data(show_spinner=False) +def _read_uploaded(name: str, data: bytes) -> pd.DataFrame: + """Read uploaded bytes into a DataFrame. Mirrors the helper used by + other tool pages — keeps everything as strings so the user controls + coercion via the column-type selectors below.""" + suffix = Path(name).suffix.lower() + bio = io.BytesIO(data) + if suffix in (".xlsx", ".xls"): + return pd.read_excel(bio, dtype=str, keep_default_na=False) + for enc in ("utf-8", "utf-8-sig", "latin-1"): + try: + bio.seek(0) + sep = "\t" if suffix == ".tsv" else "," + return pd.read_csv( + bio, dtype=str, keep_default_na=False, + encoding=enc, sep=sep, on_bad_lines="warn", + ) + except UnicodeDecodeError: + continue + bio.seek(0) + return pd.read_csv(bio, dtype=str, keep_default_na=False, encoding="latin-1") + + +def _side_panel(side_label: str, key_prefix: str): + """Render one side's upload + preview. Returns the DataFrame or None.""" + st.markdown(f"**{side_label}**") + upload = st.file_uploader( + f"Upload {side_label.lower()} file (CSV / Excel)", + type=["csv", "tsv", "xlsx", "xls"], + key=f"{key_prefix}_upload", + label_visibility="collapsed", + ) + if upload is None: + st.caption(f"_No {side_label.lower()} file yet._") + return None, None + try: + df = _read_uploaded(upload.name, upload.getvalue()) + except Exception as e: + st.error(f"Could not read `{upload.name}`: {e}") + return None, None + st.caption(f"`{upload.name}` — {len(df)} rows, {len(df.columns)} columns") + with st.expander(f"Preview {side_label.lower()}", expanded=False): + st.dataframe(df.head(10), width="stretch") + return df, upload.name + + +# --------------------------------------------------------------------------- +# Side-by-side upload +# --------------------------------------------------------------------------- + +col_left, col_right = st.columns(2) +with col_left: + left_df, left_name = _side_panel("Left (e.g. bank feed)", "left") +with col_right: + right_df, right_name = _side_panel("Right (e.g. ledger)", "right") + +if left_df is None or right_df is None: + st.info("Upload both files to continue.") + st.stop() + + +# --------------------------------------------------------------------------- +# Column mapping +# --------------------------------------------------------------------------- + +st.divider() +st.subheader("Match settings") + +map_left, map_right = st.columns(2) + + +def _col_pick(label: str, df: pd.DataFrame, key: str, *, allow_none: bool): + """Selectbox for picking a column. Optional 'None' slot for date/desc.""" + cols = list(df.columns) + if allow_none: + cols = ["(none)"] + cols + pick = st.selectbox(label, cols, key=key) + return None if pick == "(none)" else pick + + +with map_left: + st.markdown("**Left columns**") + left_amount = _col_pick("Amount column", left_df, "left_amount_col", allow_none=False) + left_date = _col_pick("Date column (optional)", left_df, "left_date_col", allow_none=True) + left_desc = _col_pick("Description column (optional)", left_df, "left_desc_col", allow_none=True) + left_keys = st.multiselect( + "Reference columns (optional, e.g. check / invoice no.)", + list(left_df.columns), key="left_keys_col", + ) + +with map_right: + st.markdown("**Right columns**") + right_amount = _col_pick("Amount column", right_df, "right_amount_col", allow_none=False) + right_date = _col_pick("Date column (optional)", right_df, "right_date_col", allow_none=True) + right_desc = _col_pick("Description column (optional)", right_df, "right_desc_col", allow_none=True) + right_keys = st.multiselect( + "Reference columns (must match left count)", + list(right_df.columns), key="right_keys_col", + ) + +# --------------------------------------------------------------------------- +# Tolerances & options +# --------------------------------------------------------------------------- + +with st.expander("Tolerances & options", expanded=True): + tol_a, tol_b, tol_c = st.columns(3) + with tol_a: + amount_tolerance = st.number_input( + "Amount tolerance", + min_value=0.0, value=0.0, step=0.01, format="%.4f", + help="Absolute tolerance on amount (e.g. 0.01 to absorb cent rounding).", + ) + with tol_b: + date_tolerance = st.number_input( + "Date tolerance (days)", + min_value=0, value=0, step=1, + help="Allow N calendar days of drift between posting dates.", + ) + with tol_c: + invert_right_sign = st.checkbox( + "Invert right amount sign", + value=False, + help="Use when one side records debits as positive and the other as negative.", + ) + desc_min_score = st.slider( + "Description similarity boost (0 disables)", + min_value=0, max_value=100, value=0, step=5, + help=( + "When both sides have a description column set, accept matches with " + "this minimum fuzzy similarity even if amount/date are merely within " + "tolerance. Lower = more permissive." + ), + ) + +# --------------------------------------------------------------------------- +# Run +# --------------------------------------------------------------------------- + +st.divider() + +if st.button("Reconcile", type="primary", width="stretch"): + if len(left_keys) != len(right_keys): + st.error( + "Reference columns must match in count: " + f"left has {len(left_keys)}, right has {len(right_keys)}." + ) + st.stop() + options = ReconcileOptions( + left_amount=left_amount, + right_amount=right_amount, + left_date=left_date, + right_date=right_date, + left_keys=list(left_keys), + right_keys=list(right_keys), + left_desc=left_desc, + right_desc=right_desc, + desc_min_score=int(desc_min_score), + amount_tolerance=float(amount_tolerance), + date_tolerance_days=int(date_tolerance), + invert_right_sign=bool(invert_right_sign), + ) + with st.spinner("Reconciling..."): + try: + result = reconcile(left_df, right_df, options) + except ValueError as e: + st.error(str(e)) + st.stop() + st.session_state["reconcile_result"] = result + st.session_state["reconcile_left_name"] = left_name + log_event("tool_run", "Reconcile run", page="11_Reconciler") + +result = st.session_state.get("reconcile_result") +if result is None: + st.stop() + +# --------------------------------------------------------------------------- +# Results +# --------------------------------------------------------------------------- + +st.subheader("Results") + +stats = result.stats +m1, m2, m3, m4 = st.columns(4) +m1.metric("Matched", stats["matched"]) +m2.metric("Review", stats["review"]) +m3.metric("Unmatched left", stats["unmatched_left"]) +m4.metric("Unmatched right", stats["unmatched_right"]) + +# Health bar: matched / max(left, right) +denom = max(stats["left_rows"], stats["right_rows"]) or 1 +pct = stats["matched"] / denom * 100 +st.caption(f"Coverage: {pct:.1f}% of the larger side") + +tab_matched, tab_review, tab_left, tab_right = st.tabs( + [ + f"Matched ({stats['matched']})", + f"Review ({stats['review']})", + f"Unmatched left ({stats['unmatched_left']})", + f"Unmatched right ({stats['unmatched_right']})", + ] +) + +with tab_matched: + if result.matched.empty: + st.info("No matches.") + else: + st.dataframe(result.matched, width="stretch", hide_index=True) + +with tab_review: + if result.review.empty: + st.info("Nothing to review — no ambiguous candidates.") + else: + st.caption( + "Pairs flagged because the algorithm couldn't pick a single " + "best match (e.g. multiple equally-good candidates). Use the " + "left/right indices to disambiguate manually." + ) + st.dataframe(result.review, width="stretch", hide_index=True) + +with tab_left: + if result.unmatched_left.empty: + st.info("Every left row was matched.") + else: + st.dataframe(result.unmatched_left, width="stretch", hide_index=True) + +with tab_right: + if result.unmatched_right.empty: + st.info("Every right row was matched.") + else: + st.dataframe(result.unmatched_right, width="stretch", hide_index=True) + +# --------------------------------------------------------------------------- +# Downloads +# --------------------------------------------------------------------------- + +st.divider() +stem = Path(st.session_state.get("reconcile_left_name", "reconcile")).stem + +dl_a, dl_b, dl_c, dl_d = st.columns(4) +with dl_a: + html_download_button( + "Matched CSV", + result.matched.to_csv(index=False).encode("utf-8-sig"), + file_name=f"{stem}_matched.csv", + mime="text/csv", + disabled=result.matched.empty, + ) +with dl_b: + html_download_button( + "Review CSV", + result.review.to_csv(index=False).encode("utf-8-sig"), + file_name=f"{stem}_review.csv", + mime="text/csv", + disabled=result.review.empty, + ) +with dl_c: + html_download_button( + "Unmatched left", + result.unmatched_left.to_csv(index=False).encode("utf-8-sig"), + file_name=f"{stem}_unmatched_left.csv", + mime="text/csv", + disabled=result.unmatched_left.empty, + ) +with dl_d: + html_download_button( + "Unmatched right", + result.unmatched_right.to_csv(index=False).encode("utf-8-sig"), + file_name=f"{stem}_unmatched_right.csv", + mime="text/csv", + disabled=result.unmatched_right.empty, + ) diff --git a/src/gui/tools_registry.py b/src/gui/tools_registry.py index 2f58bea..13abab0 100644 --- a/src/gui/tools_registry.py +++ b/src/gui/tools_registry.py @@ -157,6 +157,18 @@ TOOLS: list[Tool] = [ status="Ready", section="transformations", ), + Tool( + tool_id="11_reconciler", + icon=":material/compare_arrows:", + name="Reconcile Two Files", + description=( + "Match transactions between two sources (e.g. bank feed vs. " + "ledger) with amount and date tolerance." + ), + page_slug="11_Reconciler", + status="Ready", + section="automations", + ), ] diff --git a/tests/test_reconcile.py b/tests/test_reconcile.py new file mode 100644 index 0000000..94e8676 --- /dev/null +++ b/tests/test_reconcile.py @@ -0,0 +1,317 @@ +"""Tests for src.core.reconcile — two-source matching engine.""" + +import pandas as pd +import pytest + +from src.core.reconcile import ( + ReconcileOptions, + ReconcileResult, + reconcile, +) + + +def _bank(rows): + return pd.DataFrame(rows, columns=["date", "amount", "desc"]) + + +def _ledger(rows): + return pd.DataFrame(rows, columns=["posted", "amt", "memo"]) + + +class TestExactMatch: + def test_one_to_one_exact(self): + left = _bank([ + ("2026-01-05", 100.00, "ACME"), + ("2026-01-06", 250.00, "WIDGET CO"), + ]) + right = _ledger([ + ("2026-01-05", 100.00, "Acme Inc"), + ("2026-01-06", 250.00, "Widget"), + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert result.stats["matched"] == 2 + assert result.stats["unmatched_left"] == 0 + assert result.stats["unmatched_right"] == 0 + assert (result.matched["match_pass"] == "exact").all() + + def test_unmatched_left_and_right(self): + left = _bank([ + ("2026-01-05", 100.00, "ACME"), + ("2026-01-07", 99.99, "ONLY ON LEFT"), + ]) + right = _ledger([ + ("2026-01-05", 100.00, "Acme"), + ("2026-01-08", 500.00, "Only on right"), + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert result.stats["matched"] == 1 + assert result.stats["unmatched_left"] == 1 + assert result.stats["unmatched_right"] == 1 + # The unmatched rows preserve their original columns. + assert "ONLY ON LEFT" in result.unmatched_left["desc"].tolist() + assert "Only on right" in result.unmatched_right["memo"].tolist() + + def test_amount_only_no_date(self): + # No date columns set — match purely on amount. Distinct + # amounts pair off one-to-one. + left = _bank([ + ("2026-01-01", 42.50, "A"), + ("2026-02-15", 99.00, "B"), + ]) + right = _ledger([ + ("2099-12-31", 42.50, "X"), + ("1970-01-01", 99.00, "Y"), + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + )) + assert result.stats["matched"] == 2 + + def test_identical_amounts_with_no_date_are_ambiguous(self): + # Without a date column to disambiguate, two left rows with + # the same amount and two right rows with the same amount + # are genuinely undecidable — route to review. + left = _bank([ + ("2026-01-01", 42.50, "A"), + ("2026-02-15", 42.50, "B"), + ]) + right = _ledger([ + ("2099-12-31", 42.50, "X"), + ("1970-01-01", 42.50, "Y"), + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + )) + assert result.stats["matched"] == 0 + assert result.stats["review"] >= 2 + + +class TestAmountTolerance: + def test_amount_within_tolerance(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.02, "X")]) + # Exact pass misses (100.00 != 100.02). Tolerance pass catches it. + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + amount_tolerance=0.05, + )) + assert result.stats["matched"] == 1 + assert result.matched.iloc[0]["match_pass"] == "tolerance" + assert abs(result.matched.iloc[0]["amount_diff"] - -0.02) < 1e-9 + + def test_outside_tolerance_unmatched(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.50, "X")]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + amount_tolerance=0.05, + )) + assert result.stats["matched"] == 0 + assert result.stats["unmatched_left"] == 1 + assert result.stats["unmatched_right"] == 1 + + +class TestDateWindow: + def test_date_within_window(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-07", 100.00, "X")]) # 2 days later + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + date_tolerance_days=3, + )) + assert result.stats["matched"] == 1 + assert result.matched.iloc[0]["date_diff_days"] == -2 + + def test_date_outside_window(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-20", 100.00, "X")]) # 15 days later + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + date_tolerance_days=5, + )) + assert result.stats["matched"] == 0 + + +class TestSignInversion: + def test_invert_right_sign(self): + # Bank: deposit = +100 ; Ledger: deposit recorded as -100. + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", -100.00, "X")]) + # Without inversion: no match. + r1 = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert r1.stats["matched"] == 0 + # With inversion: match. + r2 = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + invert_right_sign=True, + )) + assert r2.stats["matched"] == 1 + + +class TestAmbiguity: + def test_two_equal_candidates_go_to_review(self): + # One left row, two identical right rows → ambiguous. + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([ + ("2026-01-05", 100.00, "X"), + ("2026-01-05", 100.00, "Y"), + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert result.stats["matched"] == 0 + assert result.stats["review"] == 2 # both candidate pairs flagged + # Left was consumed by the ambiguity, both rights too. + assert result.stats["unmatched_left"] == 0 + assert result.stats["unmatched_right"] == 0 + + def test_uniquely_better_match_wins(self): + # Two left rows, two right rows; one pair is a closer match. + left = _bank([ + ("2026-01-05", 100.00, "A"), + ("2026-01-05", 100.05, "B"), + ]) + right = _ledger([ + ("2026-01-05", 100.00, "X"), # closer to A + ("2026-01-05", 100.05, "Y"), # closer to B + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + amount_tolerance=0.10, + )) + # Both should pair uniquely on the exact pass (penalty inside + # exact pass breaks the symmetric near-ties). + assert result.stats["matched"] == 2 + + +class TestKeyMatch: + def test_reference_number_authoritative(self): + # Same check number, same amount, different posting dates. + # Key match should pair them even though dates differ. + left = pd.DataFrame([ + {"date": "2026-01-05", "amount": 100.00, "check_no": "1042"}, + ]) + right = pd.DataFrame([ + {"posted": "2026-01-12", "amt": 100.00, "ref": "1042"}, + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + left_keys=["check_no"], right_keys=["ref"], + date_tolerance_days=0, # exact-pass would miss + )) + assert result.stats["matched"] == 1 + assert result.matched.iloc[0]["match_pass"] == "key" + + def test_key_requires_amount_to_tie(self): + # Same ref but mismatched amounts → not a key match. + left = pd.DataFrame([ + {"date": "2026-01-05", "amount": 100.00, "check_no": "1042"}, + ]) + right = pd.DataFrame([ + {"posted": "2026-01-05", "amt": 200.00, "ref": "1042"}, + ]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + left_keys=["check_no"], right_keys=["ref"], + )) + assert result.stats["matched"] == 0 + + +class TestInputValidation: + def test_missing_amount_columns(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.00, "X")]) + with pytest.raises(ValueError, match="left_amount"): + reconcile(left, right, ReconcileOptions( + right_amount="amt", + )) + + def test_left_date_without_right_date(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.00, "X")]) + with pytest.raises(ValueError, match="both be set or both be None"): + reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", # right_date missing + )) + + def test_mismatched_key_lengths(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.00, "X")]) + with pytest.raises(ValueError, match="same length"): + reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_keys=["a", "b"], right_keys=["x"], + )) + + def test_negative_tolerance_rejected(self): + left = _bank([("2026-01-05", 100.00, "A")]) + right = _ledger([("2026-01-05", 100.00, "X")]) + with pytest.raises(ValueError, match="amount_tolerance"): + reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + amount_tolerance=-0.01, + )) + + +class TestUnparseableInputs: + def test_non_numeric_amount_falls_through(self): + # Left row with garbage amount should land in unmatched_left + # (it can't participate in matching but must be visible). + left = pd.DataFrame([ + {"date": "2026-01-05", "amount": "not a number", "desc": "BAD"}, + {"date": "2026-01-05", "amount": 100.00, "desc": "OK"}, + ]) + right = _ledger([("2026-01-05", 100.00, "X")]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert result.stats["matched"] == 1 + # The garbage row appears in unmatched_left. + assert "BAD" in result.unmatched_left["desc"].tolist() + + +class TestResultShape: + def test_matched_carries_both_sides(self): + left = _bank([("2026-01-05", 100.00, "ACME")]) + right = _ledger([("2026-01-05", 100.00, "Acme Inc")]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + row = result.matched.iloc[0] + assert row["left_desc"] == "ACME" + assert row["right_memo"] == "Acme Inc" + assert row["left_amount"] == 100.00 + assert row["right_amt"] == 100.00 + + def test_empty_inputs_return_empty_result(self): + left = _bank([]) + right = _ledger([]) + result = reconcile(left, right, ReconcileOptions( + left_amount="amount", right_amount="amt", + left_date="date", right_date="posted", + )) + assert result.stats["matched"] == 0 + assert result.matched.empty + assert result.unmatched_left.empty + assert result.unmatched_right.empty