feat(pdf): row-heuristic extraction (mode dispatch, no coordinates)

User reported the column-visual approach is too brittle for real
bank statements: column-x-positions saved against a sample page
don't survive layout drift between months (statement A has
columns at x=300, statement B drifted to x=320), and a saved
template can only realistically work for one statement's
specific render. The fundamental fix is to stop depending on
coordinates at all.

**Row-heuristic mode** finds transaction rows by pattern: any
line with a date token + N amount tokens IS a transaction. Date
patterns (US slash / EU slash / ISO / "Jan 15, 2026" / etc.) and
amount patterns (currency, parens-negative, thousands grouping)
are matched against word text — no x-positions involved.

The full pipeline:

1. ``find_transaction_rows`` clusters words into rows and scans
   each line for date + amount tokens.
2. Multi-line descriptions still attach to the previous row via
   the no-date-no-amount continuation rule.
3. Amount shapes drive interpretation: ``single`` /
   ``txn_balance`` / ``debit_credit`` / ``debit_credit_balance``.
4. ``_infer_amount_column_centers`` clusters amount x-midpoints
   ACROSS ALL detected rows to find natural column groupings —
   so debit-vs-credit assignment for single-amount lines works
   without the user marking anything on screen.

``apply_template`` is now a dispatch over ``template["mode"]``:

- ``mode="row_heuristic"`` (default for new templates) — the new
  pipeline.
- ``mode="column_visual"`` — the existing pipeline, kept under
  ``_apply_template_column_visual`` for v1 templates and the
  Advanced fallback.

18 new tests cover: date detection (US slash, two-digit year,
ISO, month-name, missing); amount-token finding (currency,
parens, pure text, bare-year rejection); column-center inference
(clear two-column case, empty input); end-to-end on synthetic
Page objects with all four amount shapes; the critical
layout-drift test that proves the same template works on pages
of different sizes / different absolute x-positions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-19 23:45:55 +00:00
parent 10015c40e1
commit d80befd05a
2 changed files with 700 additions and 4 deletions

View File

@@ -417,16 +417,432 @@ def _coerce_amount_columns(
return out
# ---------------------------------------------------------------------------
# Row-heuristic extraction (mode = "row_heuristic", default for new templates)
# ---------------------------------------------------------------------------
_DATE_RES = [
re.compile(r"\b(\d{1,2}/\d{1,2}/\d{2,4})\b"),
re.compile(r"\b(\d{1,2}-\d{1,2}-\d{2,4})\b"),
re.compile(r"\b(\d{4}-\d{2}-\d{2})\b"),
re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{2,4})\b"),
re.compile(r"\b(\d{1,2}\s+[A-Z][a-z]{2}\s+\d{2,4})\b"),
# Short month-day (e.g. "Jan 15") — sometimes used when year is
# implied by the statement period. Lower-priority match.
re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2})\b"),
]
# Amount tokens: optional $/€/£, optional leading -, optional parens,
# 1-3 digits before grouping with comma-thousand groups, optional
# decimal portion. Trailing minus also captured.
_AMOUNT_RE = re.compile(
r"(?<![\w.])" # no preceding letter/dot
r"(\(?-?[\$€£]?-?\d{1,3}(?:,\d{3})*(?:\.\d{1,4})?\)?-?)"
r"(?![\w.])"
)
def _looks_like_amount(token: str) -> bool:
"""Reject tokens that match the amount regex but are obviously
not money — e.g. a bare year or a page number. Real amounts
have at least one of: currency symbol, decimal point, parens,
minus sign, or a thousand separator."""
if not token:
return False
return bool(re.search(r"[\$€£.,()\-]", token))
def _find_dates_in_words(
row_words: list[WordBox],
) -> list[tuple[int, str]]:
"""Find the FIRST date-like substring on this row.
Returns ``[(word_index, date_text)]`` or empty list. Searches
word-by-word so we can identify which word(s) constitute the
date and exclude them from the description."""
for i, w in enumerate(row_words):
# Stitch the next few words together — some date formats
# like "Jan 15, 2026" span 3 word tokens.
for window in (3, 2, 1):
chunk = " ".join(x.text for x in row_words[i : i + window])
for rx in _DATE_RES:
m = rx.search(chunk)
if m:
return [(i, m.group(1))]
return []
def _find_amount_tokens(
row_words: list[WordBox],
) -> list[tuple[int, WordBox, str]]:
"""Find amount-shaped tokens on this row, keeping their position.
Returns ``[(word_index, wordbox, normalized_text)]``. The
word_index lets the caller exclude these from description text;
the wordbox preserves the x-position so we can cluster amount
columns later without templated coordinates."""
out: list[tuple[int, WordBox, str]] = []
for i, w in enumerate(row_words):
# Each word might contain multiple amount tokens if the PDF
# extractor merged things, but in practice one match per word.
m = _AMOUNT_RE.search(w.text)
if m and _looks_like_amount(m.group(1)):
out.append((i, w, m.group(1)))
return out
def _row_is_transaction(
row_words: list[WordBox],
*,
min_amounts: int,
max_amounts: int,
) -> bool:
"""A transaction line has at least one date AND enough amount
tokens to satisfy the configured shape."""
if not _find_dates_in_words(row_words):
return False
amounts = _find_amount_tokens(row_words)
return min_amounts <= len(amounts) <= max_amounts
def _description_from_row(
row_words: list[WordBox],
date_idx: int,
amount_idxs: set[int],
) -> str:
"""Stitch the row's description: everything between the date
word and the first amount token, plus anything after the last
amount that isn't itself an amount."""
keep: list[str] = []
seen_first_amount = False
last_amount_idx = max(amount_idxs) if amount_idxs else -1
for i, w in enumerate(row_words):
if i == date_idx:
continue
if i in amount_idxs:
seen_first_amount = True
continue
# After the last amount, trailing tokens are usually a
# check number or memo — keep them too.
if seen_first_amount and i < last_amount_idx:
continue
keep.append(w.text)
return " ".join(keep).strip()
def _assign_amounts_by_shape(
amount_tokens: list[tuple[int, WordBox, str]],
shape: str,
parse_opts: dict[str, Any],
column_centers: list[float] | None = None,
) -> dict[str, Any]:
"""Map raw amount tokens to typed CSV fields per the shape.
Shapes:
``single`` → first amount is ``amount`` (sign in value)
``txn_balance`` → leftmost is ``amount``, rightmost is
``balance``
``debit_credit`` → if one token, assign to debit or credit by
x-position (uses ``column_centers``); if two, leftmost is
debit, next is credit. Combine into signed ``amount``.
``debit_credit_balance`` → leftmost is debit, middle is
credit, rightmost is balance.
"""
out: dict[str, Any] = {}
if not amount_tokens:
return out
txt = [t[2] for t in amount_tokens]
boxes = [t[1] for t in amount_tokens]
if shape == "single":
out["amount"] = parse_amount(txt[0], parse_opts)
elif shape == "txn_balance":
out["amount"] = parse_amount(txt[0], parse_opts)
if len(txt) >= 2:
out["balance"] = parse_amount(txt[-1], parse_opts)
elif shape == "debit_credit":
debit_val: float | None = None
credit_val: float | None = None
if len(txt) == 1 and column_centers and len(column_centers) >= 2:
# Decide debit vs credit by which column-center the token's
# midpoint is closest to.
mid = (boxes[0].x0 + boxes[0].x1) / 2
distances = [abs(mid - c) for c in column_centers[:2]]
if distances[0] <= distances[1]:
debit_val = parse_amount(txt[0], parse_opts)
else:
credit_val = parse_amount(txt[0], parse_opts)
else:
# Two tokens: leftmost = debit, rightmost = credit.
if len(txt) >= 1:
debit_val = parse_amount(txt[0], parse_opts)
if len(txt) >= 2:
credit_val = parse_amount(txt[1], parse_opts)
amt = 0.0
if credit_val:
amt += credit_val
if debit_val:
amt -= debit_val
out["amount"] = amt
out["type"] = "credit" if amt > 0 else ("debit" if amt < 0 else "")
elif shape == "debit_credit_balance":
debit_val = None
credit_val = None
if len(txt) == 2 and column_centers and len(column_centers) >= 3:
# Two tokens but the shape expects three — fall through
# to x-position assignment using the configured columns.
mids = [(b.x0 + b.x1) / 2 for b in boxes]
assigned: list[int | None] = [None, None, None]
for k, m in enumerate(mids):
col = min(
range(3),
key=lambda c, m=m: abs(m - column_centers[c]),
)
assigned[col] = k
if assigned[0] is not None:
debit_val = parse_amount(txt[assigned[0]], parse_opts)
if assigned[1] is not None:
credit_val = parse_amount(txt[assigned[1]], parse_opts)
if assigned[2] is not None:
out["balance"] = parse_amount(txt[assigned[2]], parse_opts)
else:
if len(txt) >= 1:
debit_val = parse_amount(txt[0], parse_opts)
if len(txt) >= 2:
credit_val = parse_amount(txt[1], parse_opts)
if len(txt) >= 3:
out["balance"] = parse_amount(txt[2], parse_opts)
amt = 0.0
if credit_val:
amt += credit_val
if debit_val:
amt -= debit_val
out["amount"] = amt
out["type"] = "credit" if amt > 0 else ("debit" if amt < 0 else "")
else:
# Unknown shape — fall back to the simplest interpretation.
out["amount"] = parse_amount(txt[0], parse_opts)
return out
def _infer_amount_column_centers(
rows: list[list[WordBox]],
*,
expected: int,
min_amounts: int,
max_amounts: int,
) -> list[float]:
"""Cluster amount-token x-midpoints across all transaction rows
to find natural column centers. Returns up to *expected* centers
sorted left-to-right.
Avoids re-introducing user-drawn coordinates: the columns are
inferred from the data itself. We can't run k-means without
scikit-learn, so use a simple sorted-midpoints + greedy bucket
by proximity tolerance approach.
"""
midpoints: list[float] = []
for row_words in rows:
if not _row_is_transaction(
row_words, min_amounts=min_amounts, max_amounts=max_amounts,
):
continue
for _, w, _ in _find_amount_tokens(row_words):
midpoints.append((w.x0 + w.x1) / 2)
if not midpoints:
return []
midpoints.sort()
# Bucket by adjacency: any gap > 30pt starts a new bucket.
# 30pt ≈ 4x the typical inter-column spacing on bank statements.
buckets: list[list[float]] = [[midpoints[0]]]
for m in midpoints[1:]:
if m - buckets[-1][-1] <= 30:
buckets[-1].append(m)
else:
buckets.append([m])
centers = [sum(b) / len(b) for b in buckets]
if len(centers) <= expected:
return centers
# More buckets than expected — keep the *expected* most-populated.
by_pop = sorted(
zip(centers, (len(b) for b in buckets)),
key=lambda x: x[1],
reverse=True,
)[:expected]
return sorted(c for c, _ in by_pop)
def find_transaction_rows(
pages: list[Page],
template: dict[str, Any],
) -> list[dict[str, Any]]:
"""Heuristic row detector. Returns a list of preview records
suitable for rendering in the build-mode preview table.
Each record carries the raw text + parsed fields; the GUI
surfaces these so the user can confirm or tune the template
before extraction commits to disk.
"""
rd = template.get("row_detection", {}) or {}
amt_cfg = template.get("amounts", {}) or {}
date_cfg = template.get("date", {}) or {}
pages_cfg = template.get("pages", {}) or {}
pages_used = _pages_in_range(pages, pages_cfg.get("range", "all"))
skip_pages_re = pages_cfg.get("skip_matching") or ""
if skip_pages_re:
skip_re = re.compile(skip_pages_re, re.IGNORECASE)
pages_used = [p for p in pages_used if not skip_re.search(p.text)]
min_amounts = int(rd.get("min_amounts_per_row", 1))
max_amounts = int(rd.get("max_amounts_per_row", 3))
skip_row_res = [
re.compile(p, re.IGNORECASE)
for p in (rd.get("skip_rows_matching") or [])
]
shape = amt_cfg.get("shape", "single")
expected_amount_cols = {
"single": 1,
"txn_balance": 2,
"debit_credit": 2,
"debit_credit_balance": 3,
}.get(shape, 1)
parse_opts = {
"decimal_separator": amt_cfg.get("decimal_separator", "."),
"thousands_separator": amt_cfg.get("thousands_separator", ","),
"currency_strip": amt_cfg.get("currency_strip", "$"),
"negative_in_parens": amt_cfg.get("negative_in_parens", True),
}
date_formats: list[str] = list(date_cfg.get("formats_fallback") or [])
if date_cfg.get("format"):
date_formats = [date_cfg["format"]] + date_formats
# First pass per page: gather rows so we can also infer amount
# column centers across the whole document.
all_rows: list[tuple[Page, list[list[WordBox]]]] = []
for page in pages_used:
rows = cluster_rows(
page.words,
y_tolerance=float(rd.get("y_tolerance", 3.0)),
)
all_rows.append((page, rows))
flat_rows = [r for _, rows in all_rows for r in rows]
column_centers = _infer_amount_column_centers(
flat_rows,
expected=expected_amount_cols,
min_amounts=min_amounts,
max_amounts=max_amounts,
)
out: list[dict[str, Any]] = []
merge_multi = bool(rd.get("merge_multiline_description", True))
prev: dict[str, Any] | None = None
for page, rows in all_rows:
for row_words in rows:
line = " ".join(w.text for w in row_words)
if not line.strip():
continue
if any(rx.search(line) for rx in skip_row_res):
continue
dates = _find_dates_in_words(row_words)
amount_tokens = _find_amount_tokens(row_words)
is_txn = bool(dates) and (
min_amounts <= len(amount_tokens) <= max_amounts
)
if not is_txn:
# Possible multi-line description continuation —
# a no-date, no-amount line directly following a
# transaction.
if (
merge_multi
and prev is not None
and not amount_tokens
and not dates
):
prev["description"] = (
(prev.get("description") or "") + " " + line
).strip()
continue
date_idx, date_text = dates[0]
amount_idxs = {idx for idx, _, _ in amount_tokens}
desc = _description_from_row(row_words, date_idx, amount_idxs)
record: dict[str, Any] = {
"date": parse_date(date_text, date_formats) or date_text,
"description": desc,
"_page": page.page_no,
"_raw_line": line,
}
record.update(_assign_amounts_by_shape(
amount_tokens, shape, parse_opts, column_centers,
))
out.append(record)
prev = record
return out
def apply_template_row_heuristic(
pages: list[Page],
template: dict[str, Any],
) -> pd.DataFrame:
"""Row-heuristic counterpart to ``apply_template``. Same return
shape (a DataFrame) so callers don't care which mode produced it."""
rows = find_transaction_rows(pages, template)
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
# Drop internal helper columns from the user-facing output.
if "_raw_line" in df.columns:
df = df.drop(columns=["_raw_line"])
preferred = ["date", "description", "amount", "type", "balance"]
cols = [c for c in preferred if c in df.columns]
extras = [c for c in df.columns if c not in cols and c != "_page"]
df = df[cols + extras + (["_page"] if "_page" in df.columns else [])]
return df
def apply_template(
pages: list[Page],
template: dict[str, Any],
) -> pd.DataFrame:
"""Run *template* over *pages* and return the extracted DataFrame.
"""Dispatch by template mode and return the extracted DataFrame.
Template schema is defined in ``src/pdf_templates.py``. Missing
keys fall through to sensible defaults so a half-built template
in the GUI still produces a preview.
``mode="row_heuristic"`` (default for new templates): no
coordinates needed — finds transaction lines by date+amount
pattern matching. Robust to layout drift between statements.
``mode="column_visual"`` (legacy): uses x-position boundaries
from the visual picker. Kept for templates saved before the
row-heuristic shift.
Templates without a mode key default to ``column_visual`` for
backward compatibility with schema_version=1 templates.
"""
mode = template.get("mode", "column_visual")
if mode == "row_heuristic":
return apply_template_row_heuristic(pages, template)
return _apply_template_column_visual(pages, template)
def _apply_template_column_visual(
pages: list[Page],
template: dict[str, Any],
) -> pd.DataFrame:
"""Original column-x-position pipeline. Now the legacy code
path; kept for any v1 templates and as the manual-override
advanced mode in the build UI."""
pages_cfg = template.get("pages", {}) or {}
table_cfg = template.get("table", {}) or {}
columns_cfg = template.get("columns", []) or []