Files
datatools-dev/src/pdf_extract.py
Michael 17faf84aed feat(pdf): probe bundled Tesseract first when running frozen
Adds runtime support for the bundled Tesseract that ships inside the
DataTools installer / portable / AppImage artifacts. When DataTools
is launched from a PyInstaller frozen bundle the OCR engine now
resolves automatically — no end-user install required.

New helpers in src/pdf_extract.py:
- _bundled_tesseract_path() → Path | None — returns
  <sys._MEIPASS>/tesseract/tesseract[.exe] when getattr(sys,
  "frozen", False) AND sys._MEIPASS are present; None in dev.
- _bundled_tessdata_dir() → Path | None — same gating, returns
  <sys._MEIPASS>/tesseract/tessdata.
- _apply_bundled_tessdata_prefix() — sets TESSDATA_PREFIX to the
  bundled tessdata dir before any pytesseract call; only if frozen,
  dir exists, and the user hasn't already overridden the env var.

Discovery order in ocr_available() / _autodetect_tesseract_path():
1. DATATOOLS_TESSERACT_PATH env override (existing)
2. Bundled binary (NEW — frozen-only)
3. System PATH (existing)
4. Windows well-known install dirs (existing legacy fallback)

In dev (not frozen) every new probe is a no-op so the developer
experience is unchanged.

12 new tests cover frozen vs. non-frozen detection on each platform,
the user-override respect for TESSDATA_PREFIX, autodetect priority
ordering, and the no-bundled-dir graceful path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 18:19:52 +00:00

1200 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Heuristic PDF transaction scanner.
Single public entry point: ``scan_pdf_for_transactions(pdf_bytes)``
returns a list of dicts shaped like ``[date] [description] [amount]``,
plus a list of warning strings. The GUI renders those rows in an
editable table and lets the user pick which to keep before
exporting to CSV.
There are no templates, no per-bank configuration files, and no
coordinate dependencies. A transaction row is "any extracted text
line containing a date pattern AND at least one amount pattern."
Multi-amount rows surface every detected amount as ``amount_1``,
``amount_2``, ... — the user labels and reshapes in their CSV
editor of choice.
Optional OCR fallback for scanned PDFs via ``pytesseract`` +
``pypdfium2``. Robust to missing system Tesseract — returns a
clear reason string instead of raising.
"""
from __future__ import annotations
import io
import os
import platform
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# Dependency guards
# ---------------------------------------------------------------------------
class PdfDependencyMissing(ImportError):
"""A runtime PDF dependency is missing.
Carries an actionable install hint that the GUI surfaces.
"""
def __init__(self, missing: str, hint: str = ""):
self.missing = missing
self.hint = hint or (
"Install the PDF dependencies: ``pip install "
"pdfplumber pypdfium2 pytesseract``"
)
super().__init__(f"{missing} is not installed. {self.hint}")
def _require_pdfplumber():
try:
import pdfplumber # noqa: PLC0415
return pdfplumber
except ImportError as e:
raise PdfDependencyMissing("pdfplumber") from e
def _require_pdfium():
try:
import pypdfium2 # noqa: PLC0415
return pypdfium2
except ImportError as e:
raise PdfDependencyMissing("pypdfium2") from e
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class WordBox:
"""A single word with its bounding box on a page.
Coordinates are in PDF points (1/72 inch), origin top-left."""
x0: float
top: float
x1: float
bottom: float
text: str
@dataclass
class Page:
"""One PDF page's text + word positions."""
page_no: int
width: float
height: float
text: str
words: list[WordBox] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Value parsing
# ---------------------------------------------------------------------------
_DATE_RES_FULL = [
re.compile(r"\b(\d{1,2}/\d{1,2}/\d{2,4})\b"),
re.compile(r"\b(\d{1,2}-\d{1,2}-\d{2,4})\b"),
re.compile(r"\b(\d{4}-\d{2}-\d{2})\b"),
re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{2,4})\b"),
re.compile(r"\b(\d{1,2}\s+[A-Z][a-z]{2}\s+\d{2,4})\b"),
]
# Short-date patterns (no year). Many bank statements show dates as
# ``MM/DD`` or ``Jan 13`` because the year is implied by the
# statement period. Tried only after the full-year patterns fail
# so a string like "1/2 cup" in a memo can't claim to be a date
# when a real dated transaction was already matched on the same row.
_DATE_RES_SHORT = [
re.compile(r"\b(\d{1,2}/\d{1,2})(?!\d)"),
re.compile(r"\b(\d{1,2}-\d{1,2})(?!\d)"),
re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2})(?!\d)"),
]
_DATE_RES = _DATE_RES_FULL + _DATE_RES_SHORT
_DATE_FORMATS_FALLBACK = [
"%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d/%m/%Y", "%d/%m/%y",
"%b %d %Y", "%b %d, %Y", "%d %b %Y", "%d-%b-%Y",
"%m-%d-%Y", "%m-%d-%y",
]
# Amount tokens: optional $/€/£, optional leading -, optional parens,
# 1-3 digits before grouping with comma-thousand groups, optional
# decimal portion. Trailing minus also captured.
_AMOUNT_RE = re.compile(
r"(?<![\w.])"
r"(\(?-?[\$€£]?-?\d{1,3}(?:,\d{3})*(?:\.\d{1,4})?\)?-?)"
r"(?![\w.])"
)
def parse_amount(
text: str,
*,
negative_in_parens: bool = True,
decimal: str = ".",
thousands: str = ",",
currency_strip: str = "$€£",
) -> float | None:
"""Parse a money string to a signed float, or ``None`` if it
doesn't parse.
Handles: currency prefixes (configurable), thousands separators,
parenthesized negatives, trailing minus signs ("123.45-"),
leading minus, and bare blanks.
"""
if text is None:
return None
s = str(text).strip()
if not s:
return None
negative = False
if negative_in_parens and s.startswith("(") and s.endswith(")"):
negative = True
s = s[1:-1].strip()
if s.endswith("-"):
negative = True
s = s[:-1].strip()
if s.startswith("-"):
negative = True
s = s[1:].strip()
for ch in currency_strip:
s = s.replace(ch, "")
s = s.replace(" ", "")
if thousands:
s = s.replace(thousands, "")
if decimal != ".":
s = s.replace(decimal, ".")
if not s or not re.match(r"^\d+(\.\d+)?$", s):
return None
val = float(s)
return -val if negative else val
def parse_date(
text: str,
formats: list[str] | None = None,
) -> str | None:
"""Parse a date string and return ISO ``YYYY-MM-DD``.
Tries *formats* first, then a list of common formats. Returns
``None`` if no format matches. Caller is responsible for
preserving the raw text alongside the parsed value so the user
can correct mis-detections in the editor.
"""
if text is None:
return None
s = str(text).strip()
if not s:
return None
tries = list(formats or []) + _DATE_FORMATS_FALLBACK
for fmt in tries:
try:
return datetime.strptime(s, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return None
# ---------------------------------------------------------------------------
# PDF reading
# ---------------------------------------------------------------------------
def extract_pages(pdf_bytes: bytes) -> list[Page]:
"""Parse a PDF blob into ``Page`` records with word positions.
Word positions are kept so the row clusterer can group by
y-coordinate, but no x-position information is used downstream
— the detector only looks at text content.
"""
pdfplumber = _require_pdfplumber()
out: list[Page] = []
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
for i, page in enumerate(pdf.pages, start=1):
words_raw = page.extract_words(
use_text_flow=True,
keep_blank_chars=False,
extra_attrs=[],
)
words = [
WordBox(
x0=float(w["x0"]),
top=float(w["top"]),
x1=float(w["x1"]),
bottom=float(w["bottom"]),
text=str(w["text"]),
)
for w in words_raw
]
out.append(
Page(
page_no=i,
width=float(page.width),
height=float(page.height),
text=page.extract_text() or "",
words=words,
)
)
return out
def cluster_rows(
words: list[WordBox],
y_tolerance: float = 3.0,
) -> list[list[WordBox]]:
"""Group word boxes into visual rows by ``top`` coordinate.
Words whose ``top`` is within *y_tolerance* of the current
cluster's first word join that cluster. Output rows are sorted
top-to-bottom and words within a row are sorted left-to-right.
"""
if not words:
return []
by_top = sorted(words, key=lambda w: w.top)
rows: list[list[WordBox]] = []
current: list[WordBox] = [by_top[0]]
current_top = by_top[0].top
for w in by_top[1:]:
if abs(w.top - current_top) <= y_tolerance:
current.append(w)
else:
rows.append(sorted(current, key=lambda w: w.x0))
current = [w]
current_top = w.top
rows.append(sorted(current, key=lambda w: w.x0))
return rows
# ---------------------------------------------------------------------------
# OCR fallback (optional)
# ---------------------------------------------------------------------------
def page_has_extractable_text(page: Page, min_words: int = 5) -> bool:
"""Heuristic: a scanned page typically yields zero or near-zero
words. ``min_words=5`` catches title/logo-only pages too."""
return len(page.words) >= min_words
# ---------------------------------------------------------------------------
# Tesseract discovery
#
# Discovery order (shared with the PyInstaller build agent):
#
# 1. ``DATATOOLS_TESSERACT_PATH`` env var override (user escape hatch)
# 2. Bundled binary inside the PyInstaller frozen bundle
# (``sys._MEIPASS / "tesseract" / "tesseract[.exe]"``) — only
# present when running from a frozen DataTools installer/portable
# build. No-op in a dev checkout.
# 3. System PATH lookup (``pytesseract.get_tesseract_version()``)
# 4. Windows well-known install dirs (legacy fallback for users who
# installed UB Mannheim's Tesseract-OCR themselves)
#
# When a bundled tessdata directory exists, ``TESSDATA_PREFIX`` is set
# so Tesseract picks up the bundled ``eng.traineddata``. User-supplied
# ``TESSDATA_PREFIX`` is never clobbered.
# ---------------------------------------------------------------------------
def _bundled_tesseract_path() -> Path | None:
"""Return the path to the bundled Tesseract binary, or ``None``.
Only returns a non-None value when running from a PyInstaller
frozen bundle (``sys.frozen`` is truthy AND ``sys._MEIPASS`` is
set). The bundled binary lives at
``<_MEIPASS>/tesseract/tesseract`` (``.exe`` on Windows) per the
contract shared with the build agent.
The file is NOT required to exist for this helper to return a
path — callers ``stat`` / ``.exists()``-check it themselves so a
missing bundled binary is treated the same as "not bundled" and
discovery falls through to PATH lookup.
"""
if not getattr(sys, "frozen", False):
return None
meipass = getattr(sys, "_MEIPASS", None)
if not meipass:
return None
binary = "tesseract.exe" if platform.system() == "Windows" else "tesseract"
return Path(meipass) / "tesseract" / binary
def _bundled_tessdata_dir() -> Path | None:
"""Return the bundled ``tessdata`` directory or ``None``.
Same frozen-state gating as ``_bundled_tesseract_path``; the dir
lives at ``<_MEIPASS>/tesseract/tessdata``. Callers use this to
point Tesseract at the bundled language data via the
``TESSDATA_PREFIX`` env var.
"""
if not getattr(sys, "frozen", False):
return None
meipass = getattr(sys, "_MEIPASS", None)
if not meipass:
return None
return Path(meipass) / "tesseract" / "tessdata"
def _apply_bundled_tessdata_prefix() -> None:
"""Point Tesseract at the bundled ``tessdata`` directory.
Sets ``TESSDATA_PREFIX`` to the bundled path so the frozen
Tesseract binary picks up the bundled ``eng.traineddata``. A
user-supplied ``TESSDATA_PREFIX`` is preserved untouched — power
users who explicitly chose their own language data win.
No-op outside a frozen bundle, or if the bundled dir doesn't
exist (e.g. tessdata wasn't packaged for the current platform).
"""
if os.environ.get("TESSDATA_PREFIX"):
return
tessdata = _bundled_tessdata_dir()
if tessdata is not None and tessdata.exists():
os.environ["TESSDATA_PREFIX"] = str(tessdata)
def _autodetect_tesseract_path() -> str | None:
"""Locate a Tesseract binary outside the user's ``PATH``.
Tries the bundled binary first (only present in PyInstaller
frozen builds) so installer/portable users get a working OCR
without touching their system. Falls back to the legacy Windows
well-known install locations so users who installed UB
Mannheim's Tesseract-OCR themselves keep working too.
"""
bundled = _bundled_tesseract_path()
if bundled is not None and bundled.exists():
return str(bundled)
if platform.system() != "Windows":
return None
candidates = [
r"C:\Program Files\Tesseract-OCR\tesseract.exe",
r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe",
os.path.expandvars(
r"%LOCALAPPDATA%\Programs\Tesseract-OCR\tesseract.exe"
),
]
for p in candidates:
if p and Path(p).exists():
return p
return None
def ocr_available() -> tuple[bool, str]:
"""Return ``(available, reason)`` — is OCR usable right now?
Discovery order: ``DATATOOLS_TESSERACT_PATH`` env var override,
then the bundled binary (only present in a frozen build), then
PATH-based lookup, then well-known Windows install locations.
See the module-level discovery block for the full contract.
"""
try:
import pytesseract # noqa: PLC0415
except ImportError:
return False, "pytesseract is not installed."
# Point Tesseract at the bundled tessdata (if any) BEFORE the
# first ``get_tesseract_version`` call so the bundled language
# data is loaded even when the user happens to also have a
# system Tesseract that we'd otherwise fall through to.
_apply_bundled_tessdata_prefix()
override = os.environ.get("DATATOOLS_TESSERACT_PATH")
if override:
pytesseract.pytesseract.tesseract_cmd = override
else:
# Probe the bundled binary BEFORE PATH so frozen builds use
# their own Tesseract instead of any incidental system one.
bundled = _bundled_tesseract_path()
if bundled is not None and bundled.exists():
pytesseract.pytesseract.tesseract_cmd = str(bundled)
try:
pytesseract.get_tesseract_version()
return True, ""
except Exception as e_path:
candidate = _autodetect_tesseract_path()
if candidate:
pytesseract.pytesseract.tesseract_cmd = candidate
try:
pytesseract.get_tesseract_version()
return True, ""
except Exception as e_candidate:
return False, (
f"Tesseract found at {candidate} but failed to "
f"run: {e_candidate}"
)
return False, f"Tesseract binary not found on PATH: {e_path}"
def ocr_pdf_to_pages(pdf_bytes: bytes, dpi: int = 200) -> list[Page]:
"""OCR every page of *pdf_bytes* and return word-position-rich
``Page`` records, parallel to ``extract_pages``.
Caller must check ``ocr_available()`` first.
"""
pdfium = _require_pdfium()
import pytesseract # noqa: PLC0415
pages: list[Page] = []
pdf = pdfium.PdfDocument(pdf_bytes)
try:
scale = dpi / 72.0
for i in range(len(pdf)):
pil_image = pdf[i].render(scale=scale).to_pil()
data = pytesseract.image_to_data(
pil_image,
output_type=pytesseract.Output.DICT,
)
words: list[WordBox] = []
for j, txt in enumerate(data.get("text", [])):
t = (txt or "").strip()
if not t:
continue
left = float(data["left"][j])
top = float(data["top"][j])
width = float(data["width"][j])
height = float(data["height"][j])
words.append(WordBox(
x0=left / scale,
top=top / scale,
x1=(left + width) / scale,
bottom=(top + height) / scale,
text=t,
))
text_blob = " ".join(w.text for w in words)
pages.append(Page(
page_no=i + 1,
width=pil_image.width / scale,
height=pil_image.height / scale,
text=text_blob,
words=words,
))
finally:
pdf.close()
return pages
def extract_pages_auto(
pdf_bytes: bytes,
*,
allow_ocr: bool = True,
) -> tuple[list[Page], list[str]]:
"""Text extraction first; OCR the pages that come back empty.
Returns ``(pages, warnings)`` — human-readable warning strings
the caller surfaces in the UI.
"""
warnings: list[str] = []
pages = extract_pages(pdf_bytes)
blank = [p for p in pages if not page_has_extractable_text(p)]
if not blank:
return pages, warnings
if not allow_ocr:
warnings.append(
f"{len(blank)} page(s) appear scanned. OCR is disabled."
)
return pages, warnings
ok, reason = ocr_available()
if not ok:
warnings.append(
f"{len(blank)} page(s) appear scanned but OCR isn't usable: "
f"{reason}"
)
return pages, warnings
ocr_pages = ocr_pdf_to_pages(pdf_bytes)
by_no = {p.page_no: p for p in ocr_pages}
merged: list[Page] = []
for p in pages:
if page_has_extractable_text(p):
merged.append(p)
elif p.page_no in by_no:
merged.append(by_no[p.page_no])
else:
merged.append(p)
warnings.append(
f"OCR was used for {len(blank)} page(s) with no extractable text."
)
return merged, warnings
# ---------------------------------------------------------------------------
# Row detection (the only thing the GUI actually calls)
# ---------------------------------------------------------------------------
def _find_dates_in_words(
row_words: list[WordBox],
) -> list[tuple[int, int, str]]:
"""Return every date-like substring on this row, sorted by
position. Each entry is ``(start_idx, end_idx_exclusive, text)``.
Two-pass search:
- **Pass 1** — full-year patterns (``01/15/2026``,
``Jan 13, 2026``). Longest window first so multi-word dates
aren't truncated to a partial short match.
- **Pass 2** — short patterns (``01/13``, ``Jan 13``). Only
claims word ranges that pass 1 didn't already take, so a
real ``01/13/2026`` always wins over an adjacent
``Page 1/2``.
Some statements show both a transaction date and a posting
date per row (Chase, BofA, …). The scanner uses the first
match as the canonical date for the CSV column, and excludes
EVERY date from the description so the second / third dates
don't leak into the description text.
"""
def _scan(patterns, window_order):
local_found: list[tuple[int, int, str]] = []
local_claimed: set[int] = set()
for i in range(len(row_words)):
if i in local_claimed:
continue
matched = False
for window in window_order:
end = i + window
if end > len(row_words):
continue
if any(j in local_claimed for j in range(i, end)):
continue
chunk = " ".join(x.text for x in row_words[i:end])
for rx in patterns:
m = rx.search(chunk)
if m:
consumed = max(1, len(m.group(1).split()))
actual_end = i + consumed
local_found.append((i, actual_end, m.group(1)))
local_claimed.update(range(i, actual_end))
matched = True
break
if matched:
break
return local_found
full = _scan(_DATE_RES_FULL, (3, 2, 1))
if full:
# A real full-year date on the row anchors interpretation.
# Don't ALSO collect short patterns — they're almost always
# page numbers ("Page 1/2") or fractions in memos when a
# real date is present.
return sorted(full, key=lambda t: t[0])
short = _scan(_DATE_RES_SHORT, (2, 1))
return sorted(short, key=lambda t: t[0])
def _find_amount_tokens(
row_words: list[WordBox],
) -> list[tuple[int, WordBox, str]]:
"""Return ``[(word_index, wordbox, normalized_text)]`` for each
amount-shaped token on this row, left-to-right.
Filters out tokens that match the regex but lack real money
markers (currency symbol, decimal point, parens, sign,
thousand separator) — keeps bare years and page numbers out.
"""
out: list[tuple[int, WordBox, str]] = []
for i, w in enumerate(row_words):
m = _AMOUNT_RE.search(w.text)
if not m:
continue
token = m.group(1)
if not re.search(r"[\$€£.,()\-]", token):
continue
out.append((i, w, token))
return out
DEFAULT_DATE_FORMAT = "%Y-%m-%d"
"""ISO-8601-style ``YYYY-MM-DD``. Default for output date columns
because it sorts lexicographically, parses in every spreadsheet
tool the user might import the CSV into, and is unambiguous
across US/EU readers."""
def format_amount(value, places: int = 2) -> str:
"""Render an amount value as a fixed-precision string.
Floats lose trailing zeros in their native repr (``4.5`` is
not ``4.50``), and pandas / Streamlit happily show that
inconsistency cell-by-cell — confusing on a statement where
every number is currency. This formatter forces *places*
decimals so 4.5, 12.0 and 1000 all render with the same
precision.
Numeric → ``{value:.{places}f}``. None / empty / non-finite →
empty string. Strings (typically the raw token preserved when
``parse_amount`` couldn't decode the original) pass through
untouched so the user sees the source text in the editor.
Booleans pass through as ``str(value)`` — guards against ``True``
rendering as ``"1.00"`` because Python treats ``bool`` as ``int``.
"""
if value is None or value == "":
return ""
if isinstance(value, bool):
return str(value)
if isinstance(value, (int, float)):
import math
if isinstance(value, float) and not math.isfinite(value):
return ""
return f"{value:.{places}f}"
return str(value)
def format_date(iso_str: str | None, fmt: str = DEFAULT_DATE_FORMAT) -> str:
"""Convert an ISO ``YYYY-MM-DD`` date string to *fmt*.
Returns the input unchanged if it's not parseable as ISO,
empty string if input is None/empty. The scanner uses this
on every date column (transaction date + statement period
start/end) so the output CSV is consistent.
"""
if not iso_str:
return ""
try:
return datetime.strptime(iso_str, "%Y-%m-%d").strftime(fmt)
except (ValueError, TypeError):
return iso_str
# ---------------------------------------------------------------------------
# Statement-level metadata (account number + period)
# ---------------------------------------------------------------------------
# Account number regexes. Bank statements label these in a small
# handful of conventional ways. The capture group is a permissive
# run of digits / X / * / dashes / spaces — accounts are often
# masked like ``****1234`` or printed with grouping like
# ``1234-5678-9012``.
_ACCOUNT_RES = [
re.compile(
r"Account\s*(?:Number|No\.?|#)\s*[:.]?\s*"
r"([X\*\d][X\*\d\-\s]{3,30}[X\*\d])",
re.IGNORECASE,
),
re.compile(
r"Account\s*[:.]\s*([X\*\d][X\*\d\-\s]{3,30}[X\*\d])",
re.IGNORECASE,
),
re.compile(
r"A/?[Cc]\s*(?:#|No\.?)?\s*[:.]?\s*"
r"([X\*\d][X\*\d\-\s]{3,30}[X\*\d])",
re.IGNORECASE,
),
]
def _extract_account_number(text: str) -> str | None:
"""Find the first plausible account number in *text*.
Plausible = at least 4 digit characters and matched near an
'Account' label. Whitespace is collapsed; the literal mask
characters (``X``, ``*``) and dashes are preserved so the
user sees ``****1234`` rather than ``1234`` (which would lose
information).
"""
for rx in _ACCOUNT_RES:
for m in rx.finditer(text):
value = re.sub(r"\s+", " ", m.group(1).strip())
digit_count = sum(1 for c in value if c.isdigit())
if digit_count >= 4:
return value
return None
_PERIOD_LABEL_RE = re.compile(
r"(?:Statement\s*(?:Period|Date)|"
r"For\s+the\s+(?:period|statement\s+period)|"
r"Period\s+(?:Covered|Beginning|of\s+Statement)|"
r"From)",
re.IGNORECASE,
)
def _extract_statement_period(
text: str,
) -> tuple[str | None, str | None]:
"""Locate the statement period dates and return them as ISO
``(start, end)`` or ``(None, None)``.
Strategy: find every "Statement Period" / "From" / etc. label,
then look for full-year dates in the ~150 chars following the
label. The first two dates become start/end. If only one date
appears, both fields get the same value (single-statement-date
case — common on monthly cycles where only the closing date
is shown).
"""
for label_m in _PERIOD_LABEL_RE.finditer(text):
snippet = text[label_m.end() : label_m.end() + 150]
dates: list[tuple[int, str]] = []
for rx in _DATE_RES_FULL:
for m in rx.finditer(snippet):
iso = parse_date(m.group(1))
if iso:
dates.append((m.start(), iso))
if dates:
dates.sort(key=lambda x: x[0])
if len(dates) >= 2:
return dates[0][1], dates[1][1]
return dates[0][1], dates[0][1]
return None, None
def extract_statement_metadata(
pages: list[Page],
) -> dict[str, str | None]:
"""Pull account number + statement period out of the header
region of *pages*.
Searches page 1's text, falling back to page 1 + 2 combined
if page 1's account/period detection comes up empty (some
statements put header info on page 2 — Wells Fargo business
accounts do this).
Returns ``{"account_number", "period_start", "period_end"}``
with ``None`` for any field that couldn't be detected. ISO
format for the dates.
"""
if not pages:
return {
"account_number": None,
"period_start": None,
"period_end": None,
}
text = pages[0].text
account = _extract_account_number(text)
start, end = _extract_statement_period(text)
# Fallback to pages 1+2 if anything was missed.
if (account is None or start is None) and len(pages) > 1:
extended = pages[0].text + "\n" + pages[1].text
if account is None:
account = _extract_account_number(extended)
if start is None:
start, end = _extract_statement_period(extended)
return {
"account_number": account,
"period_start": start,
"period_end": end,
}
def _try_short_date_with_year(raw_date: str, year: int) -> str | None:
"""Append *year* to a short date string and try to parse it.
Returns ISO or None if no format matches."""
candidates = [
("%m/%d/%Y", f"{raw_date}/{year}"),
("%m-%d-%Y", f"{raw_date}-{year}"),
("%b %d %Y", f"{raw_date} {year}"),
("%d %b %Y", f"{raw_date} {year}"),
]
for fmt, candidate in candidates:
try:
return datetime.strptime(candidate, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
return None
_YEAR_FROM_FILENAME_RE = re.compile(r"(?<!\d)(20\d{2})(?!\d)")
def year_from_filename(filename: str) -> int | None:
"""Extract a 4-digit year from a filename like
``eStmt_2025-01-13.pdf`` → ``2025``. Returns the first match,
or ``None`` if no 20XX pattern is present.
Used as a fallback signal when the statement period can't be
detected from the PDF's text — many bank-statement filenames
follow the convention ``eStmt_YYYY-MM-DD.pdf`` so the year is
right there.
"""
if not filename:
return None
m = _YEAR_FROM_FILENAME_RE.search(filename)
return int(m.group(1)) if m else None
def _infer_year_for_short_date(
raw_date: str,
period_start_iso: str | None,
period_end_iso: str | None,
*,
filename_year_hint: int | None = None,
override_year: int | None = None,
) -> str | None:
"""Bind a short date like ``01/13`` to a full ISO date using
the best available year evidence.
Priority order:
1. ``override_year`` — user-supplied, beats all heuristics.
2. ``period_start_iso`` + ``period_end_iso`` — generate
candidates for BOTH years (they differ only on
Dec/Jan-boundary statements) and pick the one that falls
inside the period, or closest if neither is inside.
Handles the Dec/Jan case: a ``12/30`` row in a 2024-12-16
to 2025-01-15 statement resolves to 2024-12-30 because
that's the only candidate inside the period.
3. ``filename_year_hint`` — when the statement-period regex
missed but the filename carries a year (common in bank
e-statement naming).
Returns ISO ``YYYY-MM-DD`` or None when no signal is
available — caller falls back to the raw text so the user
can correct in the editor.
"""
if not raw_date:
return None
if override_year:
return _try_short_date_with_year(raw_date, override_year)
if period_start_iso and period_end_iso:
try:
start_dt = datetime.strptime(period_start_iso, "%Y-%m-%d")
end_dt = datetime.strptime(period_end_iso, "%Y-%m-%d")
except (ValueError, TypeError):
start_dt = end_dt = None
if start_dt and end_dt:
years_to_try = {start_dt.year, end_dt.year}
candidates: list[str] = []
for year in years_to_try:
iso = _try_short_date_with_year(raw_date, year)
if iso:
candidates.append(iso)
if candidates:
def distance(iso_str: str) -> int:
dt = datetime.strptime(iso_str, "%Y-%m-%d")
if start_dt <= dt <= end_dt:
return 0
# Outside the period — measure shortest gap
# to either edge so a 12/15 transaction in a
# 12/16-01/15 statement still leans toward the
# period's start year.
return min(
abs((dt - start_dt).days),
abs((dt - end_dt).days),
)
candidates.sort(key=distance)
return candidates[0]
if filename_year_hint:
return _try_short_date_with_year(raw_date, filename_year_hint)
return None
def _description_from_row(
row_words: list[WordBox],
date_ranges: list[tuple[int, int]],
amount_idxs: set[int],
) -> str:
"""Stitch the description from the row's non-date, non-amount
tokens. ``date_ranges`` is a list of ``(start, end)`` (end
exclusive) — every word in any range is excluded.
Why a list: some bank statements show two dates per row
(transaction + posting). Without excluding all of them, the
extra date(s) leak into the description and look like trash.
Keeps tokens before the first amount and after the last
amount (trailing check numbers, memos); drops words between
amount tokens (usually whitespace artifacts in column gaps).
"""
excluded: set[int] = set()
for start, end in date_ranges:
excluded.update(range(start, end))
keep: list[str] = []
seen_first_amount = False
last_amount_idx = max(amount_idxs) if amount_idxs else -1
for i, w in enumerate(row_words):
if i in excluded:
continue
if i in amount_idxs:
seen_first_amount = True
continue
if seen_first_amount and i < last_amount_idx:
continue
keep.append(w.text)
return " ".join(keep).strip()
def scan_pdf_for_transactions(
pdf_bytes: bytes,
*,
negative_in_parens: bool = True,
allow_ocr: bool = True,
date_formats: list[str] | None = None,
y_tolerance: float = 3.0,
merge_multiline_descriptions: bool = True,
output_date_format: str = DEFAULT_DATE_FORMAT,
filename_year_hint: int | None = None,
year_override: int | None = None,
) -> tuple[list[dict[str, Any]], list[str]]:
"""Scan *pdf_bytes* for transaction-like rows.
A row qualifies if it contains a date pattern AND at least one
amount pattern. Each returned record looks like::
{
"date": "2026-01-15", # output_date_format applied
"description": "...",
"amount_1": 4.50,
"amount_2": 1000.00, # if a second amount was found
"page": 1,
"raw": "01/15/2026 Coffee $4.50",
"account_number": "****1234", # from header
}
Account number is extracted from the statement header once
per PDF and stamped onto every detected row so the CSV is
self-attributing when statements are combined. The statement
period IS detected (used internally for year inference on
short dates like "01/13") but isn't surfaced as a per-row
column — the inferred year already lives in the ``date``
field.
Short dates without a year (``01/13``, ``Jan 13``) are bound
to the year of the statement period's end before formatting.
If period detection fails, the raw short text is preserved.
Multi-line descriptions (rows with no date and no amount)
attach to the most recent transaction row when
``merge_multiline_descriptions=True`` (default).
Returns ``(rows, warnings)``. Warnings are human-readable
strings the GUI surfaces in an expander.
"""
pages, warnings = extract_pages_auto(pdf_bytes, allow_ocr=allow_ocr)
metadata = extract_statement_metadata(pages)
out_rows: list[dict[str, Any]] = []
# Maximum y-gap (in PDF points) between a transaction and a
# following no-date-no-amount line for that line to count as a
# continuation of the description. Typical line baselines sit
# ~1014 pts apart; 25 pts allows for one blank line but
# rejects section headers that are several rows away.
_MULTILINE_MERGE_MAX_GAP = 25.0
for page in pages:
# ``prev`` and ``prev_y_bottom`` reset per page so a section
# header at the top of page 2 can't attach to the last
# transaction on page 1 — PDF y-coordinates restart at the
# top of each page so the y-distance check is meaningless
# across page boundaries.
prev: dict[str, Any] | None = None
prev_y_bottom: float | None = None
rows = cluster_rows(page.words, y_tolerance=y_tolerance)
for row_words in rows:
line = " ".join(w.text for w in row_words).strip()
if not line:
continue
dates = _find_dates_in_words(row_words)
amount_tokens = _find_amount_tokens(row_words)
if not dates or not amount_tokens:
# Continuation candidate — a line with no date AND
# no amount of its own. Only attach to the previous
# transaction if (a) we have one, (b) it's on this
# same page, and (c) the y-gap to it is small enough
# that a human would read this as a wrapped line
# rather than a separate paragraph or section header.
if (
merge_multiline_descriptions
and prev is not None
and not dates
and not amount_tokens
and row_words
):
current_top = min(w.top for w in row_words)
if (
prev_y_bottom is not None
and (current_top - prev_y_bottom)
<= _MULTILINE_MERGE_MAX_GAP
):
prev["description"] = (
(prev["description"] + " " + line).strip()
)
prev_y_bottom = max(w.bottom for w in row_words)
continue
# First date wins for the "date" column; ALL dates are
# excluded from the description so a row carrying both
# a transaction date and a posting date doesn't leak
# the second one into description text.
_, _, first_date_text = dates[0]
date_ranges = [(s, e) for s, e, _ in dates]
amount_idxs = {idx for idx, _, _ in amount_tokens}
desc = _description_from_row(
row_words, date_ranges, amount_idxs,
)
# Every real transaction must have a description. Rows
# like "01/13/2025 $1,000.00" (Daily Ledger Balances
# section, page totals, period summaries) carry a date
# and an amount but no text in between — they're
# statement furniture, not transactions. Drop them.
if not desc.strip():
continue
iso = parse_date(first_date_text, date_formats)
if iso is None:
# Short date — try to bind a year using the cascade:
# override → statement period (Dec/Jan-aware) →
# filename year hint. Each signal is a separate
# argument so the caller can mix-and-match.
iso = _infer_year_for_short_date(
first_date_text,
metadata["period_start"],
metadata["period_end"],
filename_year_hint=filename_year_hint,
override_year=year_override,
)
formatted_date = (
format_date(iso, output_date_format)
if iso else first_date_text
)
record: dict[str, Any] = {
"date": formatted_date,
"description": desc,
"page": page.page_no,
"raw": line,
}
for k, (_, _, txt) in enumerate(amount_tokens, start=1):
parsed = parse_amount(
txt, negative_in_parens=negative_in_parens,
)
# Fall back to the raw text if the parser fails so
# the user sees something to fix in the editor
# rather than a silent NaN.
record[f"amount_{k}"] = (
parsed if parsed is not None else txt
)
# Drop rows where the transaction amount is exactly 0.
# Bank statements include noise like "INTEREST EARNED
# 0.00" or "PAGE TOTAL 0.00" that pass the date+amount
# heuristic but aren't real transactions. We key off
# ``amount_1`` (leftmost amount = usually the txn
# amount); a non-zero balance in ``amount_2`` doesn't
# rescue a zero ``amount_1``.
if not _has_real_transaction_amount(record):
continue
# Stamp the account number onto every kept row so the
# CSV is self-attributing when statements are combined.
# The period start/end aren't surfaced per row — they're
# used only for the year-inference fallback above
# (binding short dates like "01/13" to the statement's
# year) but downstream the date column already carries
# the inferred full date.
record["account_number"] = metadata["account_number"] or ""
out_rows.append(record)
prev = record
prev_y_bottom = (
max(w.bottom for w in row_words) if row_words else None
)
return out_rows, warnings
def _has_real_transaction_amount(record: dict[str, Any]) -> bool:
"""``amount_1`` is the row's primary amount. Drop rows whose
amount_1 parsed to exactly 0; keep everything else (positive,
negative, or unparsed-but-non-empty)."""
amount_1 = record.get("amount_1")
if amount_1 is None:
return False
if isinstance(amount_1, (int, float)):
return amount_1 != 0
# Unparsed string — keep so the user can verify in the editor.
return bool(str(amount_1).strip())
def diagnose_pdf_lines(
pdf_bytes: bytes,
*,
allow_ocr: bool = True,
max_lines: int = 200,
) -> tuple[list[dict[str, Any]], list[str]]:
"""Dump every clustered text line from a PDF for diagnosis.
Surfaces what the scanner actually saw — including lines the
detector dropped because they lacked a date or amount. Use
when ``scan_pdf_for_transactions`` returns 0 rows so the user
can spot what's wrong (no extractable text → scanned PDF /
weird date format / amounts in a column the regex misses).
Returns ``(lines, warnings)`` where each line is::
{"page": int, "text": str,
"has_date": bool, "has_amount": bool}
Capped at *max_lines* across all pages so a 100-page statement
doesn't dump 10,000 rows into the UI.
"""
pages, warnings = extract_pages_auto(pdf_bytes, allow_ocr=allow_ocr)
out: list[dict[str, Any]] = []
for page in pages:
rows = cluster_rows(page.words)
for row_words in rows:
text = " ".join(w.text for w in row_words).strip()
if not text:
continue
out.append({
"page": page.page_no,
"text": text,
"has_date": bool(_find_dates_in_words(row_words)),
"has_amount": bool(_find_amount_tokens(row_words)),
})
if len(out) >= max_lines:
warnings.append(
f"Diagnostic capped at {max_lines} lines. "
"Larger PDFs aren't fully shown here — the full "
"scan still runs in Scan mode."
)
return out, warnings
return out, warnings
__all__ = [
"PdfDependencyMissing",
"Page",
"WordBox",
"cluster_rows",
"diagnose_pdf_lines",
"extract_pages",
"extract_pages_auto",
"extract_statement_metadata",
"format_amount",
"format_date",
"ocr_available",
"parse_amount",
"parse_date",
"scan_pdf_for_transactions",
"year_from_filename",
]