feat(pdf): add pure PDF→DataFrame extraction module
Phase 1/6 of the PDF Extractor tool. Pure module — no Streamlit,
no user-config I/O — that turns a PDF blob plus a template dict
into a ``pandas.DataFrame`` of transaction rows. Primary use case
is accountant-style extraction of bank-statement transactions,
where each bank's format is encoded as a reusable template.
Pipeline:
1. ``extract_pages(pdf_bytes)`` reads with pdfplumber and surfaces
words with bounding boxes.
2. ``cluster_rows(words)`` groups words into rows by ``top``
tolerance — no reliance on PDF table-line detection (most bank
statements have no visible cell borders).
3. ``assign_columns(row_words, boundaries)`` buckets each word by
its horizontal midpoint into N+1 columns defined by N interior
x-boundaries.
4. ``_within_table_window`` slices to the band between the header
line and the end-marker (e.g. "Closing balance").
5. ``apply_template`` orchestrates the above, handling:
- parens-style negative amounts, currency stripping, custom
decimal/thousands separators
- separate debit + credit columns combined into a single signed
``amount`` (credit positive, debit negative — accounting
register convention; matches QuickBooks/Xero imports)
- multi-line description wrapping (rows with empty date column
attach to the previous row's description)
- row-level regex skip filters (e.g., "Total", "Subtotal")
- page-range filters ("all", "2-", "1,3-5")
Optional OCR fallback for scanned statements:
- ``page_has_extractable_text`` heuristic flags pages with <5
words as likely-scanned.
- ``ocr_available()`` checks both the ``pytesseract`` Python
binding and the Tesseract binary; surfaces a clear reason
string when either is missing.
- ``extract_pages_auto`` does text-first, OCR-the-blanks, and
returns warnings the UI can surface.
29 unit tests cover the parsing pipeline against synthetic
WordBox/Page data — no fixture PDFs required, runs in 0.1s. Real
PDF extraction is exercised by hand on the user's statements.
Dependencies added:
- ``pdfplumber>=0.10,<1`` — text + position extraction
- ``pypdfium2>=4,<6`` — page rasterization for OCR + visual picker
- ``streamlit-drawable-canvas>=0.9,<1`` — visual region picker
(used in commit 5)
- ``pytesseract>=0.3,<1`` — OCR (used in commit 6; system
Tesseract binary required separately)
- ``cryptography>=41,<49`` — bumped upper bound; pdfminer.six
transitively requires a recent release. Internal ed25519
license-signing usage is API-stable across the bump.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -8,4 +8,8 @@ tqdm>=4.66,<5
|
||||
typer>=0.12,<1
|
||||
phonenumbers>=8.13,<9
|
||||
streamlit>=1.35,<2
|
||||
cryptography>=41,<46
|
||||
streamlit-drawable-canvas>=0.9,<1
|
||||
cryptography>=41,<49
|
||||
pdfplumber>=0.10,<1
|
||||
pypdfium2>=4,<6
|
||||
pytesseract>=0.3,<1
|
||||
|
||||
616
src/pdf_extract.py
Normal file
616
src/pdf_extract.py
Normal file
@@ -0,0 +1,616 @@
|
||||
"""PDF transaction extraction.
|
||||
|
||||
Pure module — no Streamlit, no user-config I/O. Reads PDF bytes,
|
||||
produces a ``pandas.DataFrame`` of rows according to a template
|
||||
dict. The accountant-facing use case is extracting transaction
|
||||
tables from bank statements (different banks = different
|
||||
templates, reused across statements that share a format).
|
||||
|
||||
Strategy:
|
||||
|
||||
- ``pdfplumber`` for text + word positions. Bank-statement tables
|
||||
rarely have visible cell borders, so we don't rely on table-line
|
||||
detection — instead the template carries explicit column
|
||||
x-position boundaries (set by the visual picker UI).
|
||||
- Rows are detected by clustering word ``top`` (y-position) values
|
||||
within a small tolerance — words on the same baseline.
|
||||
- Multi-line descriptions: rows whose first column (date) is empty
|
||||
are merged into the previous row's description column.
|
||||
- Signed amounts: parenthesized values (``(123.45)``) parse negative.
|
||||
Single signed amount column passes through. Separate debit/credit
|
||||
columns are combined into one signed amount column with credits
|
||||
positive and debits negative (accounting register convention —
|
||||
matches QuickBooks/Xero import expectations).
|
||||
- Optional OCR: pages with no extractable text fall through to
|
||||
``pytesseract`` IF the binding + Tesseract binary are both
|
||||
available. Otherwise the page is skipped with a warning row.
|
||||
|
||||
The template is a plain dict matching the schema documented in
|
||||
``src/pdf_templates.py``. This module reads it; ``pdf_templates``
|
||||
manages its persistence and validation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
import pdfplumber
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WordBox:
|
||||
"""A single word with its bounding box on a page.
|
||||
|
||||
Coordinates are in PDF points (1/72 inch), origin top-left."""
|
||||
x0: float
|
||||
top: float
|
||||
x1: float
|
||||
bottom: float
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Page:
|
||||
"""One PDF page's text + word positions."""
|
||||
page_no: int # 1-indexed
|
||||
width: float
|
||||
height: float
|
||||
text: str
|
||||
words: list[WordBox] = field(default_factory=list)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF reading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def extract_pages(pdf_bytes: bytes) -> list[Page]:
|
||||
"""Parse a PDF blob into our internal ``Page`` representation.
|
||||
|
||||
Each page carries every word's bounding box; downstream code
|
||||
groups them into rows by ``top`` clustering and into columns
|
||||
by template-defined x-boundaries.
|
||||
"""
|
||||
out: list[Page] = []
|
||||
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
|
||||
for i, page in enumerate(pdf.pages, start=1):
|
||||
words_raw = page.extract_words(
|
||||
use_text_flow=True,
|
||||
keep_blank_chars=False,
|
||||
extra_attrs=[],
|
||||
)
|
||||
words = [
|
||||
WordBox(
|
||||
x0=float(w["x0"]),
|
||||
top=float(w["top"]),
|
||||
x1=float(w["x1"]),
|
||||
bottom=float(w["bottom"]),
|
||||
text=str(w["text"]),
|
||||
)
|
||||
for w in words_raw
|
||||
]
|
||||
out.append(
|
||||
Page(
|
||||
page_no=i,
|
||||
width=float(page.width),
|
||||
height=float(page.height),
|
||||
text=page.extract_text() or "",
|
||||
words=words,
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Value parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_AMOUNT_DEFAULTS = {
|
||||
"decimal_separator": ".",
|
||||
"thousands_separator": ",",
|
||||
"currency_strip": "$",
|
||||
"negative_in_parens": True,
|
||||
}
|
||||
|
||||
_DATE_FORMATS_FALLBACK = [
|
||||
"%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d/%m/%Y", "%d/%m/%y",
|
||||
"%b %d %Y", "%d %b %Y", "%d-%b-%Y", "%m-%d-%Y", "%m-%d-%y",
|
||||
]
|
||||
|
||||
|
||||
def parse_amount(text: str, opts: dict[str, Any] | None = None) -> float | None:
|
||||
"""Parse a money string to a signed float, or ``None`` if it doesn't parse.
|
||||
|
||||
Handles: currency prefixes, thousands separators, parenthesized
|
||||
negatives, trailing minus signs ("123.45-"), and bare blanks.
|
||||
"""
|
||||
if text is None:
|
||||
return None
|
||||
s = text.strip()
|
||||
if not s:
|
||||
return None
|
||||
o = {**_AMOUNT_DEFAULTS, **(opts or {})}
|
||||
|
||||
negative = False
|
||||
if o["negative_in_parens"] and s.startswith("(") and s.endswith(")"):
|
||||
negative = True
|
||||
s = s[1:-1].strip()
|
||||
if s.endswith("-"):
|
||||
negative = True
|
||||
s = s[:-1].strip()
|
||||
if s.startswith("-"):
|
||||
negative = True
|
||||
s = s[1:].strip()
|
||||
currency = o.get("currency_strip") or ""
|
||||
if currency:
|
||||
for ch in currency:
|
||||
s = s.replace(ch, "")
|
||||
s = s.replace(" ", "")
|
||||
if o["thousands_separator"]:
|
||||
s = s.replace(o["thousands_separator"], "")
|
||||
if o["decimal_separator"] != ".":
|
||||
s = s.replace(o["decimal_separator"], ".")
|
||||
|
||||
if not s or not re.match(r"^\d+(\.\d+)?$", s):
|
||||
return None
|
||||
val = float(s)
|
||||
return -val if negative else val
|
||||
|
||||
|
||||
def parse_date(
|
||||
text: str,
|
||||
formats: list[str] | None = None,
|
||||
) -> str | None:
|
||||
"""Parse a date string against the provided formats and return ISO ``YYYY-MM-DD``.
|
||||
|
||||
Falls back to a list of common formats if *formats* is empty.
|
||||
Returns ``None`` if no format matches.
|
||||
"""
|
||||
if text is None:
|
||||
return None
|
||||
s = text.strip()
|
||||
if not s:
|
||||
return None
|
||||
tries = list(formats or []) + _DATE_FORMATS_FALLBACK
|
||||
for fmt in tries:
|
||||
try:
|
||||
dt = datetime.strptime(s, fmt)
|
||||
return dt.strftime("%Y-%m-%d")
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Row + column structure
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def cluster_rows(
|
||||
words: list[WordBox],
|
||||
y_tolerance: float = 3.0,
|
||||
) -> list[list[WordBox]]:
|
||||
"""Group word boxes into rows by ``top`` coordinate.
|
||||
|
||||
Words whose ``top`` is within *y_tolerance* of an existing row's
|
||||
median are added to that row. Otherwise a new row is started.
|
||||
Output rows are sorted top-to-bottom; within a row, words are
|
||||
sorted left-to-right.
|
||||
"""
|
||||
if not words:
|
||||
return []
|
||||
by_top = sorted(words, key=lambda w: w.top)
|
||||
rows: list[list[WordBox]] = []
|
||||
current: list[WordBox] = [by_top[0]]
|
||||
current_top = by_top[0].top
|
||||
for w in by_top[1:]:
|
||||
if abs(w.top - current_top) <= y_tolerance:
|
||||
current.append(w)
|
||||
else:
|
||||
rows.append(sorted(current, key=lambda w: w.x0))
|
||||
current = [w]
|
||||
current_top = w.top
|
||||
rows.append(sorted(current, key=lambda w: w.x0))
|
||||
return rows
|
||||
|
||||
|
||||
def assign_columns(
|
||||
row_words: list[WordBox],
|
||||
boundaries: list[float],
|
||||
) -> list[str]:
|
||||
"""Bucket the words of a single row into columns.
|
||||
|
||||
``boundaries`` are the *interior* x-positions between adjacent
|
||||
columns. N boundaries → N+1 columns. A word's column is decided
|
||||
by its horizontal midpoint; words within a column are joined
|
||||
with single spaces in left-to-right order.
|
||||
"""
|
||||
n_cols = len(boundaries) + 1
|
||||
buckets: list[list[WordBox]] = [[] for _ in range(n_cols)]
|
||||
sorted_bounds = sorted(boundaries)
|
||||
for w in row_words:
|
||||
mid = (w.x0 + w.x1) / 2
|
||||
col = 0
|
||||
for i, b in enumerate(sorted_bounds):
|
||||
if mid >= b:
|
||||
col = i + 1
|
||||
buckets[col].append(w)
|
||||
return [
|
||||
" ".join(w.text for w in sorted(bucket, key=lambda w: w.x0))
|
||||
for bucket in buckets
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template application
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _pages_in_range(pages: list[Page], range_spec: str) -> list[Page]:
|
||||
"""Filter *pages* by a range spec like ``"all"``, ``"2-"``, ``"1,3-5"``.
|
||||
|
||||
Empty / ``"all"`` returns all pages. Bad specs return all pages
|
||||
(template author can fix at preview time)."""
|
||||
s = (range_spec or "").strip().lower()
|
||||
if not s or s == "all":
|
||||
return pages
|
||||
keep: set[int] = set()
|
||||
for chunk in s.split(","):
|
||||
chunk = chunk.strip()
|
||||
if not chunk:
|
||||
continue
|
||||
if "-" in chunk:
|
||||
a, b = chunk.split("-", 1)
|
||||
a_i = int(a) if a.strip().isdigit() else 1
|
||||
b_i = int(b) if b.strip().isdigit() else len(pages)
|
||||
for i in range(a_i, b_i + 1):
|
||||
keep.add(i)
|
||||
elif chunk.isdigit():
|
||||
keep.add(int(chunk))
|
||||
return [p for p in pages if p.page_no in keep] if keep else pages
|
||||
|
||||
|
||||
def _within_table_window(
|
||||
rows: list[list[WordBox]],
|
||||
header_text: str,
|
||||
end_markers: list[str],
|
||||
) -> list[list[WordBox]]:
|
||||
"""Slice *rows* to the band between the header line and the end marker.
|
||||
|
||||
Header match: the first row whose joined text contains every word
|
||||
of ``header_text`` (case-insensitive). The header row itself is
|
||||
excluded. End match: any row whose joined text matches one of the
|
||||
``end_markers`` regex patterns; that row and below are excluded.
|
||||
|
||||
Empty ``header_text`` keeps from the first row; empty
|
||||
``end_markers`` keeps through the last row.
|
||||
"""
|
||||
if not rows:
|
||||
return []
|
||||
needle_words = [w.lower() for w in (header_text or "").split() if w]
|
||||
end_res = [re.compile(p, re.IGNORECASE) for p in end_markers if p]
|
||||
|
||||
start = 0
|
||||
if needle_words:
|
||||
start = -1
|
||||
for i, row in enumerate(rows):
|
||||
joined = " ".join(w.text for w in row).lower()
|
||||
if all(nw in joined for nw in needle_words):
|
||||
start = i + 1
|
||||
break
|
||||
if start == -1:
|
||||
return []
|
||||
|
||||
end = len(rows)
|
||||
for i in range(start, len(rows)):
|
||||
joined = " ".join(w.text for w in rows[i])
|
||||
if any(rx.search(joined) for rx in end_res):
|
||||
end = i
|
||||
break
|
||||
return rows[start:end]
|
||||
|
||||
|
||||
def _row_is_continuation(cells: list[str]) -> bool:
|
||||
"""A row whose first column is empty is treated as a continuation
|
||||
of the previous row's description (multi-line wrap)."""
|
||||
return bool(cells) and not cells[0].strip()
|
||||
|
||||
|
||||
def _coerce_amount_columns(
|
||||
record: dict[str, str],
|
||||
column_map: list[dict[str, Any]],
|
||||
parse_opts: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
"""Convert source-column text into typed output fields.
|
||||
|
||||
Supports three amount shapes:
|
||||
|
||||
1. A single column mapped to ``amount`` — passes through with sign.
|
||||
2. Two columns mapped to ``amount_debit`` + ``amount_credit`` —
|
||||
combined into a signed ``amount`` (credit positive, debit
|
||||
negative — accounting register convention).
|
||||
3. A column mapped to ``balance`` — parsed as signed number.
|
||||
|
||||
The ``date`` target is parsed against the template's date format.
|
||||
Other targets pass through as text.
|
||||
"""
|
||||
out: dict[str, Any] = {}
|
||||
debit_val: float | None = None
|
||||
credit_val: float | None = None
|
||||
|
||||
for col in column_map:
|
||||
target = col.get("target", "")
|
||||
source_text = record.get(target, "") if target else ""
|
||||
if target == "date":
|
||||
iso = parse_date(source_text, parse_opts.get("date_formats") or [])
|
||||
out["date"] = iso or source_text
|
||||
elif target == "description":
|
||||
out["description"] = source_text
|
||||
elif target == "amount":
|
||||
out["amount"] = parse_amount(source_text, parse_opts)
|
||||
elif target == "amount_debit":
|
||||
debit_val = parse_amount(source_text, parse_opts)
|
||||
elif target == "amount_credit":
|
||||
credit_val = parse_amount(source_text, parse_opts)
|
||||
elif target == "balance":
|
||||
out["balance"] = parse_amount(source_text, parse_opts)
|
||||
elif target:
|
||||
out[target] = source_text
|
||||
|
||||
if "amount" not in out and (debit_val is not None or credit_val is not None):
|
||||
amt = 0.0
|
||||
if credit_val:
|
||||
amt += credit_val
|
||||
if debit_val:
|
||||
amt -= debit_val
|
||||
out["amount"] = amt
|
||||
out["type"] = "credit" if amt > 0 else ("debit" if amt < 0 else "")
|
||||
return out
|
||||
|
||||
|
||||
def apply_template(
|
||||
pages: list[Page],
|
||||
template: dict[str, Any],
|
||||
) -> pd.DataFrame:
|
||||
"""Run *template* over *pages* and return the extracted DataFrame.
|
||||
|
||||
Template schema is defined in ``src/pdf_templates.py``. Missing
|
||||
keys fall through to sensible defaults so a half-built template
|
||||
in the GUI still produces a preview.
|
||||
"""
|
||||
pages_cfg = template.get("pages", {}) or {}
|
||||
table_cfg = template.get("table", {}) or {}
|
||||
columns_cfg = template.get("columns", []) or []
|
||||
parse_cfg = template.get("parse", {}) or {}
|
||||
|
||||
pages_used = _pages_in_range(pages, pages_cfg.get("range", "all"))
|
||||
skip_pages_re = pages_cfg.get("skip_matching") or ""
|
||||
if skip_pages_re:
|
||||
skip_re = re.compile(skip_pages_re, re.IGNORECASE)
|
||||
pages_used = [p for p in pages_used if not skip_re.search(p.text)]
|
||||
|
||||
boundaries = list(table_cfg.get("column_boundaries", []) or [])
|
||||
header_text = table_cfg.get("header_text", "") or ""
|
||||
end_markers = list(table_cfg.get("end_markers", []) or [])
|
||||
skip_rows_res = [
|
||||
re.compile(p, re.IGNORECASE)
|
||||
for p in (table_cfg.get("skip_rows_matching") or [])
|
||||
]
|
||||
merge_multiline = bool(parse_cfg.get("merge_multiline_description", True))
|
||||
|
||||
target_names = [c.get("target") for c in columns_cfg if c.get("target")]
|
||||
if not target_names:
|
||||
target_names = [f"col_{i}" for i in range(len(boundaries) + 1)]
|
||||
|
||||
parse_opts = {
|
||||
"decimal_separator": parse_cfg.get("decimal_separator", "."),
|
||||
"thousands_separator": parse_cfg.get("thousands_separator", ","),
|
||||
"currency_strip": parse_cfg.get("currency_strip", "$"),
|
||||
"negative_in_parens": parse_cfg.get("amount_negative_in_parens", True),
|
||||
"date_formats": parse_cfg.get("date_formats")
|
||||
or ([parse_cfg["date_format"]] if parse_cfg.get("date_format") else []),
|
||||
}
|
||||
|
||||
out_rows: list[dict[str, Any]] = []
|
||||
for page in pages_used:
|
||||
rows = cluster_rows(
|
||||
page.words,
|
||||
y_tolerance=float(table_cfg.get("y_tolerance", 3.0)),
|
||||
)
|
||||
rows = _within_table_window(rows, header_text, end_markers)
|
||||
|
||||
prev_record: dict[str, Any] | None = None
|
||||
for row_words in rows:
|
||||
if not boundaries:
|
||||
cells = [" ".join(w.text for w in row_words)]
|
||||
else:
|
||||
cells = assign_columns(row_words, boundaries)
|
||||
joined = " ".join(c.strip() for c in cells if c.strip())
|
||||
if not joined:
|
||||
continue
|
||||
if any(rx.search(joined) for rx in skip_rows_res):
|
||||
continue
|
||||
|
||||
if merge_multiline and _row_is_continuation(cells) and prev_record:
|
||||
# Glue the non-empty columns into the previous record's
|
||||
# description (the natural sink for wrapped text).
|
||||
extra = " ".join(c.strip() for c in cells if c.strip())
|
||||
if extra:
|
||||
prev_record["description"] = (
|
||||
(prev_record.get("description") or "")
|
||||
+ " "
|
||||
+ extra
|
||||
).strip()
|
||||
continue
|
||||
|
||||
record_src: dict[str, str] = {}
|
||||
for col_cfg in columns_cfg:
|
||||
src_idx = col_cfg.get("source")
|
||||
tgt = col_cfg.get("target")
|
||||
if (
|
||||
isinstance(src_idx, int)
|
||||
and 0 <= src_idx < len(cells)
|
||||
and tgt
|
||||
):
|
||||
record_src[tgt] = cells[src_idx]
|
||||
|
||||
record_src.setdefault("_page", str(page.page_no))
|
||||
record = _coerce_amount_columns(record_src, columns_cfg, parse_opts)
|
||||
record["_page"] = page.page_no
|
||||
out_rows.append(record)
|
||||
prev_record = record
|
||||
|
||||
if not out_rows:
|
||||
return pd.DataFrame()
|
||||
df = pd.DataFrame(out_rows)
|
||||
|
||||
preferred = ["date", "description", "amount", "type", "balance"]
|
||||
cols = [c for c in preferred if c in df.columns]
|
||||
extras = [c for c in df.columns if c not in cols and c != "_page"]
|
||||
df = df[cols + extras + (["_page"] if "_page" in df.columns else [])]
|
||||
return df
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OCR fallback (optional)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def page_has_extractable_text(page: Page, min_words: int = 5) -> bool:
|
||||
"""Heuristic: a scanned page typically yields zero or near-zero
|
||||
words. ``min_words`` of 5 catches title/logo-only pages too."""
|
||||
return len(page.words) >= min_words
|
||||
|
||||
|
||||
def ocr_available() -> tuple[bool, str]:
|
||||
"""Return ``(available, reason)`` — is OCR usable right now?
|
||||
|
||||
Checks both the Python binding (``pytesseract``) and the
|
||||
Tesseract binary. The reason string is suitable for surfacing to
|
||||
the user when OCR is unavailable.
|
||||
"""
|
||||
try:
|
||||
import pytesseract # noqa: F401
|
||||
except ImportError:
|
||||
return False, "pytesseract is not installed."
|
||||
try:
|
||||
import pytesseract as pt
|
||||
pt.get_tesseract_version()
|
||||
except Exception as e:
|
||||
return False, f"Tesseract binary not found: {e}"
|
||||
return True, ""
|
||||
|
||||
|
||||
def ocr_pdf_to_pages(pdf_bytes: bytes, dpi: int = 200) -> list[Page]:
|
||||
"""Run Tesseract over each page of *pdf_bytes* and return a
|
||||
word-position-rich ``Page`` list, parallel to ``extract_pages``.
|
||||
|
||||
Caller is responsible for first checking ``ocr_available()``.
|
||||
Uses pypdfium2 to rasterize and pytesseract's ``image_to_data``
|
||||
to recover per-word bounding boxes so the same column-assignment
|
||||
pipeline keeps working.
|
||||
"""
|
||||
import pypdfium2 as pdfium
|
||||
import pytesseract
|
||||
from PIL import Image # noqa: F401 (transitively required)
|
||||
|
||||
pages: list[Page] = []
|
||||
pdf = pdfium.PdfDocument(pdf_bytes)
|
||||
try:
|
||||
# PDF points-per-inch is 72; scale renders into pixels.
|
||||
scale = dpi / 72.0
|
||||
for i in range(len(pdf)):
|
||||
pil_image = pdf[i].render(scale=scale).to_pil()
|
||||
data = pytesseract.image_to_data(
|
||||
pil_image,
|
||||
output_type=pytesseract.Output.DICT,
|
||||
)
|
||||
words: list[WordBox] = []
|
||||
for j, txt in enumerate(data.get("text", [])):
|
||||
t = (txt or "").strip()
|
||||
if not t:
|
||||
continue
|
||||
left = float(data["left"][j])
|
||||
top = float(data["top"][j])
|
||||
width = float(data["width"][j])
|
||||
height = float(data["height"][j])
|
||||
# Convert pixel coords back to PDF points so column
|
||||
# boundaries from the template (in PDF points) keep
|
||||
# working when an OCR page is mixed with text pages.
|
||||
words.append(WordBox(
|
||||
x0=left / scale,
|
||||
top=top / scale,
|
||||
x1=(left + width) / scale,
|
||||
bottom=(top + height) / scale,
|
||||
text=t,
|
||||
))
|
||||
text_blob = " ".join(w.text for w in words)
|
||||
pages.append(Page(
|
||||
page_no=i + 1,
|
||||
width=pil_image.width / scale,
|
||||
height=pil_image.height / scale,
|
||||
text=text_blob,
|
||||
words=words,
|
||||
))
|
||||
finally:
|
||||
pdf.close()
|
||||
return pages
|
||||
|
||||
|
||||
def extract_pages_auto(
|
||||
pdf_bytes: bytes,
|
||||
*,
|
||||
allow_ocr: bool = True,
|
||||
) -> tuple[list[Page], list[str]]:
|
||||
"""Try text extraction first; OCR the pages that come back empty.
|
||||
|
||||
Returns ``(pages, warnings)``. ``warnings`` is a list of human-
|
||||
readable strings — e.g. "Pages 3, 4 had no text and OCR is
|
||||
unavailable; they were skipped." Caller surfaces these in the UI.
|
||||
"""
|
||||
warnings: list[str] = []
|
||||
pages = extract_pages(pdf_bytes)
|
||||
blank = [p for p in pages if not page_has_extractable_text(p)]
|
||||
if not blank:
|
||||
return pages, warnings
|
||||
|
||||
if not allow_ocr:
|
||||
warnings.append(
|
||||
f"{len(blank)} page(s) appear scanned. OCR is disabled."
|
||||
)
|
||||
return pages, warnings
|
||||
|
||||
ok, reason = ocr_available()
|
||||
if not ok:
|
||||
warnings.append(
|
||||
f"{len(blank)} page(s) appear scanned but OCR isn't usable: "
|
||||
f"{reason}"
|
||||
)
|
||||
return pages, warnings
|
||||
|
||||
ocr_pages = ocr_pdf_to_pages(pdf_bytes)
|
||||
# Splice OCR results into the original list for the blank pages.
|
||||
by_no = {p.page_no: p for p in ocr_pages}
|
||||
merged: list[Page] = []
|
||||
for p in pages:
|
||||
if page_has_extractable_text(p):
|
||||
merged.append(p)
|
||||
elif p.page_no in by_no:
|
||||
merged.append(by_no[p.page_no])
|
||||
else:
|
||||
merged.append(p)
|
||||
warnings.append(
|
||||
f"OCR was used for {len(blank)} page(s) with no extractable text."
|
||||
)
|
||||
return merged, warnings
|
||||
286
tests/test_pdf_extract.py
Normal file
286
tests/test_pdf_extract.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""Tests for the pure PDF-extraction pipeline.
|
||||
|
||||
Real PDF parsing (``extract_pages``) is a thin wrapper around
|
||||
``pdfplumber`` and is exercised by hand on real bank statements.
|
||||
These tests pin the meaty bits — value parsing, row clustering,
|
||||
column assignment, template-driven extraction — against synthetic
|
||||
``WordBox`` data so they run fast and have no PDF dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from src.pdf_extract import (
|
||||
Page,
|
||||
WordBox,
|
||||
apply_template,
|
||||
assign_columns,
|
||||
cluster_rows,
|
||||
parse_amount,
|
||||
parse_date,
|
||||
_pages_in_range,
|
||||
_within_table_window,
|
||||
)
|
||||
|
||||
|
||||
def _w(text: str, x0: float, top: float, x1: float | None = None) -> WordBox:
|
||||
"""Convenience constructor — heights and exact x1 don't matter
|
||||
for the tests we write."""
|
||||
return WordBox(
|
||||
x0=x0,
|
||||
top=top,
|
||||
x1=x1 if x1 is not None else x0 + 10 * len(text),
|
||||
bottom=top + 10,
|
||||
text=text,
|
||||
)
|
||||
|
||||
|
||||
class TestParseAmount:
|
||||
def test_plain_positive(self):
|
||||
assert parse_amount("1234.56") == 1234.56
|
||||
|
||||
def test_currency_and_thousands(self):
|
||||
assert parse_amount("$1,234.56") == 1234.56
|
||||
|
||||
def test_parens_negative(self):
|
||||
assert parse_amount("(1,234.56)") == -1234.56
|
||||
|
||||
def test_leading_minus(self):
|
||||
assert parse_amount("-100.00") == -100.0
|
||||
|
||||
def test_trailing_minus(self):
|
||||
assert parse_amount("100.00-") == -100.0
|
||||
|
||||
def test_blank_returns_none(self):
|
||||
assert parse_amount("") is None
|
||||
assert parse_amount(" ") is None
|
||||
assert parse_amount(None) is None
|
||||
|
||||
def test_garbage_returns_none(self):
|
||||
assert parse_amount("not a number") is None
|
||||
|
||||
def test_european_decimal(self):
|
||||
opts = {
|
||||
"decimal_separator": ",",
|
||||
"thousands_separator": ".",
|
||||
"currency_strip": "€",
|
||||
"negative_in_parens": True,
|
||||
}
|
||||
assert parse_amount("€1.234,56", opts) == 1234.56
|
||||
|
||||
|
||||
class TestParseDate:
|
||||
def test_us_slash(self):
|
||||
assert parse_date("01/15/2026", ["%m/%d/%Y"]) == "2026-01-15"
|
||||
|
||||
def test_iso(self):
|
||||
assert parse_date("2026-01-15", ["%Y-%m-%d"]) == "2026-01-15"
|
||||
|
||||
def test_fallback_format(self):
|
||||
# Not in the supplied list — should still parse via fallback.
|
||||
assert parse_date("01/15/26") == "2026-01-15"
|
||||
|
||||
def test_invalid(self):
|
||||
assert parse_date("not-a-date") is None
|
||||
|
||||
|
||||
class TestClusterRows:
|
||||
def test_groups_close_y(self):
|
||||
words = [
|
||||
_w("A", x0=0, top=100),
|
||||
_w("B", x0=20, top=101),
|
||||
_w("C", x0=40, top=102),
|
||||
]
|
||||
rows = cluster_rows(words, y_tolerance=3.0)
|
||||
assert len(rows) == 1
|
||||
assert [w.text for w in rows[0]] == ["A", "B", "C"]
|
||||
|
||||
def test_separates_far_y(self):
|
||||
words = [
|
||||
_w("A", x0=0, top=100),
|
||||
_w("B", x0=0, top=120),
|
||||
]
|
||||
rows = cluster_rows(words, y_tolerance=3.0)
|
||||
assert [[w.text for w in r] for r in rows] == [["A"], ["B"]]
|
||||
|
||||
def test_sorts_left_to_right_within_row(self):
|
||||
words = [
|
||||
_w("C", x0=40, top=100),
|
||||
_w("A", x0=0, top=100),
|
||||
_w("B", x0=20, top=100),
|
||||
]
|
||||
rows = cluster_rows(words)
|
||||
assert [w.text for w in rows[0]] == ["A", "B", "C"]
|
||||
|
||||
def test_empty(self):
|
||||
assert cluster_rows([]) == []
|
||||
|
||||
|
||||
class TestAssignColumns:
|
||||
def test_three_columns(self):
|
||||
# boundaries at x=100, 200 → columns [0,100), [100,200), [200,∞)
|
||||
row = [
|
||||
_w("Jan", x0=10, top=0, x1=40), # col 0
|
||||
_w("1", x0=45, top=0, x1=55), # col 0
|
||||
_w("Deposit", x0=110, top=0, x1=180), # col 1
|
||||
_w("250.00", x0=210, top=0, x1=260), # col 2
|
||||
]
|
||||
cells = assign_columns(row, [100, 200])
|
||||
assert cells[0] == "Jan 1"
|
||||
assert cells[1] == "Deposit"
|
||||
assert cells[2] == "250.00"
|
||||
|
||||
def test_no_boundaries_one_column(self):
|
||||
row = [_w("A", 0, 0), _w("B", 20, 0)]
|
||||
cells = assign_columns(row, [])
|
||||
assert cells == ["A B"]
|
||||
|
||||
|
||||
class TestPagesInRange:
|
||||
def _mk(self, n):
|
||||
return [Page(page_no=i + 1, width=600, height=800, text="", words=[]) for i in range(n)]
|
||||
|
||||
def test_all(self):
|
||||
pages = self._mk(5)
|
||||
assert len(_pages_in_range(pages, "all")) == 5
|
||||
assert len(_pages_in_range(pages, "")) == 5
|
||||
|
||||
def test_explicit_list(self):
|
||||
pages = self._mk(5)
|
||||
got = [p.page_no for p in _pages_in_range(pages, "1,3,5")]
|
||||
assert got == [1, 3, 5]
|
||||
|
||||
def test_range(self):
|
||||
pages = self._mk(5)
|
||||
got = [p.page_no for p in _pages_in_range(pages, "2-4")]
|
||||
assert got == [2, 3, 4]
|
||||
|
||||
def test_open_ended(self):
|
||||
pages = self._mk(5)
|
||||
got = [p.page_no for p in _pages_in_range(pages, "3-")]
|
||||
assert got == [3, 4, 5]
|
||||
|
||||
|
||||
class TestWithinTableWindow:
|
||||
def test_header_skipped_end_excluded(self):
|
||||
rows = [
|
||||
[_w("STATEMENT", 0, 0)],
|
||||
[_w("Date", 0, 20), _w("Description", 50, 20), _w("Amount", 200, 20)],
|
||||
[_w("01/15", 0, 40), _w("Coffee", 50, 40), _w("4.50", 200, 40)],
|
||||
[_w("01/16", 0, 60), _w("Refund", 50, 60), _w("12.00", 200, 60)],
|
||||
[_w("Closing", 0, 80), _w("balance", 50, 80)],
|
||||
[_w("Page", 0, 100), _w("1", 50, 100)],
|
||||
]
|
||||
out = _within_table_window(rows, "Date Description Amount", ["Closing balance"])
|
||||
# Should keep just the two transaction rows.
|
||||
assert len(out) == 2
|
||||
assert out[0][0].text == "01/15"
|
||||
assert out[1][0].text == "01/16"
|
||||
|
||||
def test_no_header_returns_empty_when_required(self):
|
||||
rows = [[_w("foo", 0, 0)]]
|
||||
assert _within_table_window(rows, "Date Description Amount", []) == []
|
||||
|
||||
def test_blank_header_passes_through(self):
|
||||
rows = [[_w("x", 0, 0)], [_w("y", 0, 20)]]
|
||||
assert _within_table_window(rows, "", []) == rows
|
||||
|
||||
|
||||
class TestApplyTemplate:
|
||||
"""End-to-end on synthetic ``Page`` objects."""
|
||||
|
||||
def _statement_page(self) -> Page:
|
||||
# Mock layout: 3 columns at x=0/100/200, header at y=20, data at 40+.
|
||||
words = [
|
||||
_w("STATEMENT", 0, 0),
|
||||
# Header
|
||||
_w("Date", 5, 20), _w("Description", 105, 20), _w("Amount", 205, 20),
|
||||
# Row 1
|
||||
_w("01/15/2026", 5, 40), _w("Coffee", 105, 40),
|
||||
_w("Shop", 140, 40), _w("(4.50)", 205, 40),
|
||||
# Row 2
|
||||
_w("01/16/2026", 5, 60), _w("Refund", 105, 60), _w("$12.00", 205, 60),
|
||||
# Continuation row (no date) — should merge into row 2
|
||||
_w("from", 105, 80), _w("vendor", 140, 80),
|
||||
# End marker
|
||||
_w("Closing", 5, 100), _w("balance", 105, 100), _w("$1,000.00", 205, 100),
|
||||
]
|
||||
return Page(page_no=1, width=300, height=120, text="", words=words)
|
||||
|
||||
def _template(self) -> dict:
|
||||
return {
|
||||
"pages": {"range": "all"},
|
||||
"table": {
|
||||
"header_text": "Date Description Amount",
|
||||
"end_markers": ["Closing balance"],
|
||||
"column_boundaries": [100, 200],
|
||||
"y_tolerance": 3.0,
|
||||
"skip_rows_matching": [],
|
||||
},
|
||||
"columns": [
|
||||
{"source": 0, "target": "date"},
|
||||
{"source": 1, "target": "description"},
|
||||
{"source": 2, "target": "amount"},
|
||||
],
|
||||
"parse": {
|
||||
"date_format": "%m/%d/%Y",
|
||||
"amount_negative_in_parens": True,
|
||||
"merge_multiline_description": True,
|
||||
},
|
||||
}
|
||||
|
||||
def test_basic_extraction(self):
|
||||
df = apply_template([self._statement_page()], self._template())
|
||||
assert isinstance(df, pd.DataFrame)
|
||||
assert len(df) == 2
|
||||
assert list(df["date"]) == ["2026-01-15", "2026-01-16"]
|
||||
# Parens-negative
|
||||
assert df.iloc[0]["amount"] == -4.50
|
||||
# Plain positive with currency strip
|
||||
assert df.iloc[1]["amount"] == 12.00
|
||||
# Multi-line description merged
|
||||
assert "from vendor" in df.iloc[1]["description"]
|
||||
|
||||
def test_debit_credit_split_columns(self):
|
||||
# Layout: date | description | debit | credit columns
|
||||
page = Page(
|
||||
page_no=1, width=400, height=80, text="",
|
||||
words=[
|
||||
_w("Date", 5, 0), _w("Desc", 105, 0),
|
||||
_w("Debit", 205, 0), _w("Credit", 305, 0),
|
||||
_w("01/15/2026", 5, 20), _w("Coffee", 105, 20), _w("4.50", 205, 20),
|
||||
_w("01/16/2026", 5, 40), _w("Refund", 105, 40),
|
||||
_w("", 205, 40), # no debit
|
||||
_w("12.00", 305, 40),
|
||||
],
|
||||
)
|
||||
tpl = {
|
||||
"table": {
|
||||
"header_text": "Date Desc Debit Credit",
|
||||
"column_boundaries": [100, 200, 300],
|
||||
},
|
||||
"columns": [
|
||||
{"source": 0, "target": "date"},
|
||||
{"source": 1, "target": "description"},
|
||||
{"source": 2, "target": "amount_debit"},
|
||||
{"source": 3, "target": "amount_credit"},
|
||||
],
|
||||
"parse": {"date_format": "%m/%d/%Y"},
|
||||
}
|
||||
df = apply_template([page], tpl)
|
||||
assert list(df["amount"]) == [-4.50, 12.00]
|
||||
assert list(df["type"]) == ["debit", "credit"]
|
||||
|
||||
def test_skip_rows_matching(self):
|
||||
page = self._statement_page()
|
||||
tpl = self._template()
|
||||
tpl["table"]["skip_rows_matching"] = ["Refund"]
|
||||
df = apply_template([page], tpl)
|
||||
# Refund row is dropped — only one transaction left
|
||||
assert len(df) == 1
|
||||
assert df.iloc[0]["amount"] == -4.50
|
||||
|
||||
def test_empty_pages_returns_empty_df(self):
|
||||
df = apply_template([], self._template())
|
||||
assert df.empty
|
||||
Reference in New Issue
Block a user