feat(pdf): add pure PDF→DataFrame extraction module

Phase 1/6 of the PDF Extractor tool. Pure module — no Streamlit,
no user-config I/O — that turns a PDF blob plus a template dict
into a ``pandas.DataFrame`` of transaction rows. Primary use case
is accountant-style extraction of bank-statement transactions,
where each bank's format is encoded as a reusable template.

Pipeline:

1. ``extract_pages(pdf_bytes)`` reads with pdfplumber and surfaces
   words with bounding boxes.
2. ``cluster_rows(words)`` groups words into rows by ``top``
   tolerance — no reliance on PDF table-line detection (most bank
   statements have no visible cell borders).
3. ``assign_columns(row_words, boundaries)`` buckets each word by
   its horizontal midpoint into N+1 columns defined by N interior
   x-boundaries.
4. ``_within_table_window`` slices to the band between the header
   line and the end-marker (e.g. "Closing balance").
5. ``apply_template`` orchestrates the above, handling:
   - parens-style negative amounts, currency stripping, custom
     decimal/thousands separators
   - separate debit + credit columns combined into a single signed
     ``amount`` (credit positive, debit negative — accounting
     register convention; matches QuickBooks/Xero imports)
   - multi-line description wrapping (rows with empty date column
     attach to the previous row's description)
   - row-level regex skip filters (e.g., "Total", "Subtotal")
   - page-range filters ("all", "2-", "1,3-5")

Optional OCR fallback for scanned statements:

- ``page_has_extractable_text`` heuristic flags pages with <5
  words as likely-scanned.
- ``ocr_available()`` checks both the ``pytesseract`` Python
  binding and the Tesseract binary; surfaces a clear reason
  string when either is missing.
- ``extract_pages_auto`` does text-first, OCR-the-blanks, and
  returns warnings the UI can surface.

29 unit tests cover the parsing pipeline against synthetic
WordBox/Page data — no fixture PDFs required, runs in 0.1s. Real
PDF extraction is exercised by hand on the user's statements.

Dependencies added:
- ``pdfplumber>=0.10,<1`` — text + position extraction
- ``pypdfium2>=4,<6`` — page rasterization for OCR + visual picker
- ``streamlit-drawable-canvas>=0.9,<1`` — visual region picker
  (used in commit 5)
- ``pytesseract>=0.3,<1`` — OCR (used in commit 6; system
  Tesseract binary required separately)
- ``cryptography>=41,<49`` — bumped upper bound; pdfminer.six
  transitively requires a recent release. Internal ed25519
  license-signing usage is API-stable across the bump.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-19 22:44:51 +00:00
parent c16e2a5e29
commit b8aff862ed
3 changed files with 907 additions and 1 deletions

616
src/pdf_extract.py Normal file
View File

@@ -0,0 +1,616 @@
"""PDF transaction extraction.
Pure module — no Streamlit, no user-config I/O. Reads PDF bytes,
produces a ``pandas.DataFrame`` of rows according to a template
dict. The accountant-facing use case is extracting transaction
tables from bank statements (different banks = different
templates, reused across statements that share a format).
Strategy:
- ``pdfplumber`` for text + word positions. Bank-statement tables
rarely have visible cell borders, so we don't rely on table-line
detection — instead the template carries explicit column
x-position boundaries (set by the visual picker UI).
- Rows are detected by clustering word ``top`` (y-position) values
within a small tolerance — words on the same baseline.
- Multi-line descriptions: rows whose first column (date) is empty
are merged into the previous row's description column.
- Signed amounts: parenthesized values (``(123.45)``) parse negative.
Single signed amount column passes through. Separate debit/credit
columns are combined into one signed amount column with credits
positive and debits negative (accounting register convention —
matches QuickBooks/Xero import expectations).
- Optional OCR: pages with no extractable text fall through to
``pytesseract`` IF the binding + Tesseract binary are both
available. Otherwise the page is skipped with a warning row.
The template is a plain dict matching the schema documented in
``src/pdf_templates.py``. This module reads it; ``pdf_templates``
manages its persistence and validation.
"""
from __future__ import annotations
import io
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import pandas as pd
import pdfplumber
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class WordBox:
"""A single word with its bounding box on a page.
Coordinates are in PDF points (1/72 inch), origin top-left."""
x0: float
top: float
x1: float
bottom: float
text: str
@dataclass
class Page:
"""One PDF page's text + word positions."""
page_no: int # 1-indexed
width: float
height: float
text: str
words: list[WordBox] = field(default_factory=list)
# ---------------------------------------------------------------------------
# PDF reading
# ---------------------------------------------------------------------------
def extract_pages(pdf_bytes: bytes) -> list[Page]:
"""Parse a PDF blob into our internal ``Page`` representation.
Each page carries every word's bounding box; downstream code
groups them into rows by ``top`` clustering and into columns
by template-defined x-boundaries.
"""
out: list[Page] = []
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
for i, page in enumerate(pdf.pages, start=1):
words_raw = page.extract_words(
use_text_flow=True,
keep_blank_chars=False,
extra_attrs=[],
)
words = [
WordBox(
x0=float(w["x0"]),
top=float(w["top"]),
x1=float(w["x1"]),
bottom=float(w["bottom"]),
text=str(w["text"]),
)
for w in words_raw
]
out.append(
Page(
page_no=i,
width=float(page.width),
height=float(page.height),
text=page.extract_text() or "",
words=words,
)
)
return out
# ---------------------------------------------------------------------------
# Value parsing
# ---------------------------------------------------------------------------
_AMOUNT_DEFAULTS = {
"decimal_separator": ".",
"thousands_separator": ",",
"currency_strip": "$",
"negative_in_parens": True,
}
_DATE_FORMATS_FALLBACK = [
"%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d/%m/%Y", "%d/%m/%y",
"%b %d %Y", "%d %b %Y", "%d-%b-%Y", "%m-%d-%Y", "%m-%d-%y",
]
def parse_amount(text: str, opts: dict[str, Any] | None = None) -> float | None:
"""Parse a money string to a signed float, or ``None`` if it doesn't parse.
Handles: currency prefixes, thousands separators, parenthesized
negatives, trailing minus signs ("123.45-"), and bare blanks.
"""
if text is None:
return None
s = text.strip()
if not s:
return None
o = {**_AMOUNT_DEFAULTS, **(opts or {})}
negative = False
if o["negative_in_parens"] and s.startswith("(") and s.endswith(")"):
negative = True
s = s[1:-1].strip()
if s.endswith("-"):
negative = True
s = s[:-1].strip()
if s.startswith("-"):
negative = True
s = s[1:].strip()
currency = o.get("currency_strip") or ""
if currency:
for ch in currency:
s = s.replace(ch, "")
s = s.replace(" ", "")
if o["thousands_separator"]:
s = s.replace(o["thousands_separator"], "")
if o["decimal_separator"] != ".":
s = s.replace(o["decimal_separator"], ".")
if not s or not re.match(r"^\d+(\.\d+)?$", s):
return None
val = float(s)
return -val if negative else val
def parse_date(
text: str,
formats: list[str] | None = None,
) -> str | None:
"""Parse a date string against the provided formats and return ISO ``YYYY-MM-DD``.
Falls back to a list of common formats if *formats* is empty.
Returns ``None`` if no format matches.
"""
if text is None:
return None
s = text.strip()
if not s:
return None
tries = list(formats or []) + _DATE_FORMATS_FALLBACK
for fmt in tries:
try:
dt = datetime.strptime(s, fmt)
return dt.strftime("%Y-%m-%d")
except ValueError:
continue
return None
# ---------------------------------------------------------------------------
# Row + column structure
# ---------------------------------------------------------------------------
def cluster_rows(
words: list[WordBox],
y_tolerance: float = 3.0,
) -> list[list[WordBox]]:
"""Group word boxes into rows by ``top`` coordinate.
Words whose ``top`` is within *y_tolerance* of an existing row's
median are added to that row. Otherwise a new row is started.
Output rows are sorted top-to-bottom; within a row, words are
sorted left-to-right.
"""
if not words:
return []
by_top = sorted(words, key=lambda w: w.top)
rows: list[list[WordBox]] = []
current: list[WordBox] = [by_top[0]]
current_top = by_top[0].top
for w in by_top[1:]:
if abs(w.top - current_top) <= y_tolerance:
current.append(w)
else:
rows.append(sorted(current, key=lambda w: w.x0))
current = [w]
current_top = w.top
rows.append(sorted(current, key=lambda w: w.x0))
return rows
def assign_columns(
row_words: list[WordBox],
boundaries: list[float],
) -> list[str]:
"""Bucket the words of a single row into columns.
``boundaries`` are the *interior* x-positions between adjacent
columns. N boundaries → N+1 columns. A word's column is decided
by its horizontal midpoint; words within a column are joined
with single spaces in left-to-right order.
"""
n_cols = len(boundaries) + 1
buckets: list[list[WordBox]] = [[] for _ in range(n_cols)]
sorted_bounds = sorted(boundaries)
for w in row_words:
mid = (w.x0 + w.x1) / 2
col = 0
for i, b in enumerate(sorted_bounds):
if mid >= b:
col = i + 1
buckets[col].append(w)
return [
" ".join(w.text for w in sorted(bucket, key=lambda w: w.x0))
for bucket in buckets
]
# ---------------------------------------------------------------------------
# Template application
# ---------------------------------------------------------------------------
def _pages_in_range(pages: list[Page], range_spec: str) -> list[Page]:
"""Filter *pages* by a range spec like ``"all"``, ``"2-"``, ``"1,3-5"``.
Empty / ``"all"`` returns all pages. Bad specs return all pages
(template author can fix at preview time)."""
s = (range_spec or "").strip().lower()
if not s or s == "all":
return pages
keep: set[int] = set()
for chunk in s.split(","):
chunk = chunk.strip()
if not chunk:
continue
if "-" in chunk:
a, b = chunk.split("-", 1)
a_i = int(a) if a.strip().isdigit() else 1
b_i = int(b) if b.strip().isdigit() else len(pages)
for i in range(a_i, b_i + 1):
keep.add(i)
elif chunk.isdigit():
keep.add(int(chunk))
return [p for p in pages if p.page_no in keep] if keep else pages
def _within_table_window(
rows: list[list[WordBox]],
header_text: str,
end_markers: list[str],
) -> list[list[WordBox]]:
"""Slice *rows* to the band between the header line and the end marker.
Header match: the first row whose joined text contains every word
of ``header_text`` (case-insensitive). The header row itself is
excluded. End match: any row whose joined text matches one of the
``end_markers`` regex patterns; that row and below are excluded.
Empty ``header_text`` keeps from the first row; empty
``end_markers`` keeps through the last row.
"""
if not rows:
return []
needle_words = [w.lower() for w in (header_text or "").split() if w]
end_res = [re.compile(p, re.IGNORECASE) for p in end_markers if p]
start = 0
if needle_words:
start = -1
for i, row in enumerate(rows):
joined = " ".join(w.text for w in row).lower()
if all(nw in joined for nw in needle_words):
start = i + 1
break
if start == -1:
return []
end = len(rows)
for i in range(start, len(rows)):
joined = " ".join(w.text for w in rows[i])
if any(rx.search(joined) for rx in end_res):
end = i
break
return rows[start:end]
def _row_is_continuation(cells: list[str]) -> bool:
"""A row whose first column is empty is treated as a continuation
of the previous row's description (multi-line wrap)."""
return bool(cells) and not cells[0].strip()
def _coerce_amount_columns(
record: dict[str, str],
column_map: list[dict[str, Any]],
parse_opts: dict[str, Any],
) -> dict[str, Any]:
"""Convert source-column text into typed output fields.
Supports three amount shapes:
1. A single column mapped to ``amount`` — passes through with sign.
2. Two columns mapped to ``amount_debit`` + ``amount_credit`` —
combined into a signed ``amount`` (credit positive, debit
negative — accounting register convention).
3. A column mapped to ``balance`` — parsed as signed number.
The ``date`` target is parsed against the template's date format.
Other targets pass through as text.
"""
out: dict[str, Any] = {}
debit_val: float | None = None
credit_val: float | None = None
for col in column_map:
target = col.get("target", "")
source_text = record.get(target, "") if target else ""
if target == "date":
iso = parse_date(source_text, parse_opts.get("date_formats") or [])
out["date"] = iso or source_text
elif target == "description":
out["description"] = source_text
elif target == "amount":
out["amount"] = parse_amount(source_text, parse_opts)
elif target == "amount_debit":
debit_val = parse_amount(source_text, parse_opts)
elif target == "amount_credit":
credit_val = parse_amount(source_text, parse_opts)
elif target == "balance":
out["balance"] = parse_amount(source_text, parse_opts)
elif target:
out[target] = source_text
if "amount" not in out and (debit_val is not None or credit_val is not None):
amt = 0.0
if credit_val:
amt += credit_val
if debit_val:
amt -= debit_val
out["amount"] = amt
out["type"] = "credit" if amt > 0 else ("debit" if amt < 0 else "")
return out
def apply_template(
pages: list[Page],
template: dict[str, Any],
) -> pd.DataFrame:
"""Run *template* over *pages* and return the extracted DataFrame.
Template schema is defined in ``src/pdf_templates.py``. Missing
keys fall through to sensible defaults so a half-built template
in the GUI still produces a preview.
"""
pages_cfg = template.get("pages", {}) or {}
table_cfg = template.get("table", {}) or {}
columns_cfg = template.get("columns", []) or []
parse_cfg = template.get("parse", {}) or {}
pages_used = _pages_in_range(pages, pages_cfg.get("range", "all"))
skip_pages_re = pages_cfg.get("skip_matching") or ""
if skip_pages_re:
skip_re = re.compile(skip_pages_re, re.IGNORECASE)
pages_used = [p for p in pages_used if not skip_re.search(p.text)]
boundaries = list(table_cfg.get("column_boundaries", []) or [])
header_text = table_cfg.get("header_text", "") or ""
end_markers = list(table_cfg.get("end_markers", []) or [])
skip_rows_res = [
re.compile(p, re.IGNORECASE)
for p in (table_cfg.get("skip_rows_matching") or [])
]
merge_multiline = bool(parse_cfg.get("merge_multiline_description", True))
target_names = [c.get("target") for c in columns_cfg if c.get("target")]
if not target_names:
target_names = [f"col_{i}" for i in range(len(boundaries) + 1)]
parse_opts = {
"decimal_separator": parse_cfg.get("decimal_separator", "."),
"thousands_separator": parse_cfg.get("thousands_separator", ","),
"currency_strip": parse_cfg.get("currency_strip", "$"),
"negative_in_parens": parse_cfg.get("amount_negative_in_parens", True),
"date_formats": parse_cfg.get("date_formats")
or ([parse_cfg["date_format"]] if parse_cfg.get("date_format") else []),
}
out_rows: list[dict[str, Any]] = []
for page in pages_used:
rows = cluster_rows(
page.words,
y_tolerance=float(table_cfg.get("y_tolerance", 3.0)),
)
rows = _within_table_window(rows, header_text, end_markers)
prev_record: dict[str, Any] | None = None
for row_words in rows:
if not boundaries:
cells = [" ".join(w.text for w in row_words)]
else:
cells = assign_columns(row_words, boundaries)
joined = " ".join(c.strip() for c in cells if c.strip())
if not joined:
continue
if any(rx.search(joined) for rx in skip_rows_res):
continue
if merge_multiline and _row_is_continuation(cells) and prev_record:
# Glue the non-empty columns into the previous record's
# description (the natural sink for wrapped text).
extra = " ".join(c.strip() for c in cells if c.strip())
if extra:
prev_record["description"] = (
(prev_record.get("description") or "")
+ " "
+ extra
).strip()
continue
record_src: dict[str, str] = {}
for col_cfg in columns_cfg:
src_idx = col_cfg.get("source")
tgt = col_cfg.get("target")
if (
isinstance(src_idx, int)
and 0 <= src_idx < len(cells)
and tgt
):
record_src[tgt] = cells[src_idx]
record_src.setdefault("_page", str(page.page_no))
record = _coerce_amount_columns(record_src, columns_cfg, parse_opts)
record["_page"] = page.page_no
out_rows.append(record)
prev_record = record
if not out_rows:
return pd.DataFrame()
df = pd.DataFrame(out_rows)
preferred = ["date", "description", "amount", "type", "balance"]
cols = [c for c in preferred if c in df.columns]
extras = [c for c in df.columns if c not in cols and c != "_page"]
df = df[cols + extras + (["_page"] if "_page" in df.columns else [])]
return df
# ---------------------------------------------------------------------------
# OCR fallback (optional)
# ---------------------------------------------------------------------------
def page_has_extractable_text(page: Page, min_words: int = 5) -> bool:
"""Heuristic: a scanned page typically yields zero or near-zero
words. ``min_words`` of 5 catches title/logo-only pages too."""
return len(page.words) >= min_words
def ocr_available() -> tuple[bool, str]:
"""Return ``(available, reason)`` — is OCR usable right now?
Checks both the Python binding (``pytesseract``) and the
Tesseract binary. The reason string is suitable for surfacing to
the user when OCR is unavailable.
"""
try:
import pytesseract # noqa: F401
except ImportError:
return False, "pytesseract is not installed."
try:
import pytesseract as pt
pt.get_tesseract_version()
except Exception as e:
return False, f"Tesseract binary not found: {e}"
return True, ""
def ocr_pdf_to_pages(pdf_bytes: bytes, dpi: int = 200) -> list[Page]:
"""Run Tesseract over each page of *pdf_bytes* and return a
word-position-rich ``Page`` list, parallel to ``extract_pages``.
Caller is responsible for first checking ``ocr_available()``.
Uses pypdfium2 to rasterize and pytesseract's ``image_to_data``
to recover per-word bounding boxes so the same column-assignment
pipeline keeps working.
"""
import pypdfium2 as pdfium
import pytesseract
from PIL import Image # noqa: F401 (transitively required)
pages: list[Page] = []
pdf = pdfium.PdfDocument(pdf_bytes)
try:
# PDF points-per-inch is 72; scale renders into pixels.
scale = dpi / 72.0
for i in range(len(pdf)):
pil_image = pdf[i].render(scale=scale).to_pil()
data = pytesseract.image_to_data(
pil_image,
output_type=pytesseract.Output.DICT,
)
words: list[WordBox] = []
for j, txt in enumerate(data.get("text", [])):
t = (txt or "").strip()
if not t:
continue
left = float(data["left"][j])
top = float(data["top"][j])
width = float(data["width"][j])
height = float(data["height"][j])
# Convert pixel coords back to PDF points so column
# boundaries from the template (in PDF points) keep
# working when an OCR page is mixed with text pages.
words.append(WordBox(
x0=left / scale,
top=top / scale,
x1=(left + width) / scale,
bottom=(top + height) / scale,
text=t,
))
text_blob = " ".join(w.text for w in words)
pages.append(Page(
page_no=i + 1,
width=pil_image.width / scale,
height=pil_image.height / scale,
text=text_blob,
words=words,
))
finally:
pdf.close()
return pages
def extract_pages_auto(
pdf_bytes: bytes,
*,
allow_ocr: bool = True,
) -> tuple[list[Page], list[str]]:
"""Try text extraction first; OCR the pages that come back empty.
Returns ``(pages, warnings)``. ``warnings`` is a list of human-
readable strings — e.g. "Pages 3, 4 had no text and OCR is
unavailable; they were skipped." Caller surfaces these in the UI.
"""
warnings: list[str] = []
pages = extract_pages(pdf_bytes)
blank = [p for p in pages if not page_has_extractable_text(p)]
if not blank:
return pages, warnings
if not allow_ocr:
warnings.append(
f"{len(blank)} page(s) appear scanned. OCR is disabled."
)
return pages, warnings
ok, reason = ocr_available()
if not ok:
warnings.append(
f"{len(blank)} page(s) appear scanned but OCR isn't usable: "
f"{reason}"
)
return pages, warnings
ocr_pages = ocr_pdf_to_pages(pdf_bytes)
# Splice OCR results into the original list for the blank pages.
by_no = {p.page_no: p for p in ocr_pages}
merged: list[Page] = []
for p in pages:
if page_has_extractable_text(p):
merged.append(p)
elif p.page_no in by_no:
merged.append(by_no[p.page_no])
else:
merged.append(p)
warnings.append(
f"OCR was used for {len(blank)} page(s) with no extractable text."
)
return merged, warnings