"""Heuristic PDF transaction scanner. Single public entry point: ``scan_pdf_for_transactions(pdf_bytes)`` returns a list of dicts shaped like ``[date] [description] [amount]``, plus a list of warning strings. The GUI renders those rows in an editable table and lets the user pick which to keep before exporting to CSV. There are no templates, no per-bank configuration files, and no coordinate dependencies. A transaction row is "any extracted text line containing a date pattern AND at least one amount pattern." Multi-amount rows surface every detected amount as ``amount_1``, ``amount_2``, ... — the user labels and reshapes in their CSV editor of choice. Optional OCR fallback for scanned PDFs via ``pytesseract`` + ``pypdfium2``. Robust to missing system Tesseract — returns a clear reason string instead of raising. """ from __future__ import annotations import io import os import platform import re import sys from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # Dependency guards # --------------------------------------------------------------------------- class PdfDependencyMissing(ImportError): """A runtime PDF dependency is missing. Carries an actionable install hint that the GUI surfaces. """ def __init__(self, missing: str, hint: str = ""): self.missing = missing self.hint = hint or ( "Install the PDF dependencies: ``pip install " "pdfplumber pypdfium2 pytesseract``" ) super().__init__(f"{missing} is not installed. {self.hint}") def _require_pdfplumber(): try: import pdfplumber # noqa: PLC0415 return pdfplumber except ImportError as e: raise PdfDependencyMissing("pdfplumber") from e def _require_pdfium(): try: import pypdfium2 # noqa: PLC0415 return pypdfium2 except ImportError as e: raise PdfDependencyMissing("pypdfium2") from e # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass(frozen=True) class WordBox: """A single word with its bounding box on a page. Coordinates are in PDF points (1/72 inch), origin top-left.""" x0: float top: float x1: float bottom: float text: str @dataclass class Page: """One PDF page's text + word positions.""" page_no: int width: float height: float text: str words: list[WordBox] = field(default_factory=list) # --------------------------------------------------------------------------- # Value parsing # --------------------------------------------------------------------------- _DATE_RES_FULL = [ re.compile(r"\b(\d{1,2}/\d{1,2}/\d{2,4})\b"), re.compile(r"\b(\d{1,2}-\d{1,2}-\d{2,4})\b"), re.compile(r"\b(\d{4}-\d{2}-\d{2})\b"), re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2},?\s+\d{2,4})\b"), re.compile(r"\b(\d{1,2}\s+[A-Z][a-z]{2}\s+\d{2,4})\b"), ] # Short-date patterns (no year). Many bank statements show dates as # ``MM/DD`` or ``Jan 13`` because the year is implied by the # statement period. Tried only after the full-year patterns fail # so a string like "1/2 cup" in a memo can't claim to be a date # when a real dated transaction was already matched on the same row. _DATE_RES_SHORT = [ re.compile(r"\b(\d{1,2}/\d{1,2})(?!\d)"), re.compile(r"\b(\d{1,2}-\d{1,2})(?!\d)"), re.compile(r"\b([A-Z][a-z]{2}\s+\d{1,2})(?!\d)"), ] _DATE_RES = _DATE_RES_FULL + _DATE_RES_SHORT _DATE_FORMATS_FALLBACK = [ "%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d/%m/%Y", "%d/%m/%y", "%b %d %Y", "%b %d, %Y", "%d %b %Y", "%d-%b-%Y", "%m-%d-%Y", "%m-%d-%y", ] # Amount tokens: optional $/€/£, optional leading -, optional parens, # 1-3 digits before grouping with comma-thousand groups, optional # decimal portion. Trailing minus also captured. _AMOUNT_RE = re.compile( r"(? float | None: """Parse a money string to a signed float, or ``None`` if it doesn't parse. Handles: currency prefixes (configurable), thousands separators, parenthesized negatives, trailing minus signs ("123.45-"), leading minus, and bare blanks. """ if text is None: return None s = str(text).strip() if not s: return None negative = False if negative_in_parens and s.startswith("(") and s.endswith(")"): negative = True s = s[1:-1].strip() if s.endswith("-"): negative = True s = s[:-1].strip() if s.startswith("-"): negative = True s = s[1:].strip() for ch in currency_strip: s = s.replace(ch, "") s = s.replace(" ", "") if thousands: s = s.replace(thousands, "") if decimal != ".": s = s.replace(decimal, ".") if not s or not re.match(r"^\d+(\.\d+)?$", s): return None val = float(s) return -val if negative else val def parse_date( text: str, formats: list[str] | None = None, ) -> str | None: """Parse a date string and return ISO ``YYYY-MM-DD``. Tries *formats* first, then a list of common formats. Returns ``None`` if no format matches. Caller is responsible for preserving the raw text alongside the parsed value so the user can correct mis-detections in the editor. """ if text is None: return None s = str(text).strip() if not s: return None tries = list(formats or []) + _DATE_FORMATS_FALLBACK for fmt in tries: try: return datetime.strptime(s, fmt).strftime("%Y-%m-%d") except ValueError: continue return None # --------------------------------------------------------------------------- # PDF reading # --------------------------------------------------------------------------- def extract_pages(pdf_bytes: bytes) -> list[Page]: """Parse a PDF blob into ``Page`` records with word positions. Word positions are kept so the row clusterer can group by y-coordinate, but no x-position information is used downstream — the detector only looks at text content. """ pdfplumber = _require_pdfplumber() out: list[Page] = [] with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for i, page in enumerate(pdf.pages, start=1): words_raw = page.extract_words( use_text_flow=True, keep_blank_chars=False, extra_attrs=[], ) words = [ WordBox( x0=float(w["x0"]), top=float(w["top"]), x1=float(w["x1"]), bottom=float(w["bottom"]), text=str(w["text"]), ) for w in words_raw ] out.append( Page( page_no=i, width=float(page.width), height=float(page.height), text=page.extract_text() or "", words=words, ) ) return out def cluster_rows( words: list[WordBox], y_tolerance: float = 3.0, ) -> list[list[WordBox]]: """Group word boxes into visual rows by ``top`` coordinate. Words whose ``top`` is within *y_tolerance* of the current cluster's first word join that cluster. Output rows are sorted top-to-bottom and words within a row are sorted left-to-right. """ if not words: return [] by_top = sorted(words, key=lambda w: w.top) rows: list[list[WordBox]] = [] current: list[WordBox] = [by_top[0]] current_top = by_top[0].top for w in by_top[1:]: if abs(w.top - current_top) <= y_tolerance: current.append(w) else: rows.append(sorted(current, key=lambda w: w.x0)) current = [w] current_top = w.top rows.append(sorted(current, key=lambda w: w.x0)) return rows # --------------------------------------------------------------------------- # OCR fallback (optional) # --------------------------------------------------------------------------- def page_has_extractable_text(page: Page, min_words: int = 5) -> bool: """Heuristic: a scanned page typically yields zero or near-zero words. ``min_words=5`` catches title/logo-only pages too.""" return len(page.words) >= min_words # --------------------------------------------------------------------------- # Tesseract discovery # # Discovery order (shared with the PyInstaller build agent): # # 1. ``DATATOOLS_TESSERACT_PATH`` env var override (user escape hatch) # 2. Bundled binary inside the PyInstaller frozen bundle # (``sys._MEIPASS / "tesseract" / "tesseract[.exe]"``) — only # present when running from a frozen DataTools installer/portable # build. No-op in a dev checkout. # 3. System PATH lookup (``pytesseract.get_tesseract_version()``) # 4. Windows well-known install dirs (legacy fallback for users who # installed UB Mannheim's Tesseract-OCR themselves) # # When a bundled tessdata directory exists, ``TESSDATA_PREFIX`` is set # so Tesseract picks up the bundled ``eng.traineddata``. User-supplied # ``TESSDATA_PREFIX`` is never clobbered. # --------------------------------------------------------------------------- def _bundled_tesseract_path() -> Path | None: """Return the path to the bundled Tesseract binary, or ``None``. Only returns a non-None value when running from a PyInstaller frozen bundle (``sys.frozen`` is truthy AND ``sys._MEIPASS`` is set). The bundled binary lives at ``<_MEIPASS>/tesseract/tesseract`` (``.exe`` on Windows) per the contract shared with the build agent. The file is NOT required to exist for this helper to return a path — callers ``stat`` / ``.exists()``-check it themselves so a missing bundled binary is treated the same as "not bundled" and discovery falls through to PATH lookup. """ if not getattr(sys, "frozen", False): return None meipass = getattr(sys, "_MEIPASS", None) if not meipass: return None binary = "tesseract.exe" if platform.system() == "Windows" else "tesseract" return Path(meipass) / "tesseract" / binary def _bundled_tessdata_dir() -> Path | None: """Return the bundled ``tessdata`` directory or ``None``. Same frozen-state gating as ``_bundled_tesseract_path``; the dir lives at ``<_MEIPASS>/tesseract/tessdata``. Callers use this to point Tesseract at the bundled language data via the ``TESSDATA_PREFIX`` env var. """ if not getattr(sys, "frozen", False): return None meipass = getattr(sys, "_MEIPASS", None) if not meipass: return None return Path(meipass) / "tesseract" / "tessdata" def _apply_bundled_tessdata_prefix() -> None: """Point Tesseract at the bundled ``tessdata`` directory. Sets ``TESSDATA_PREFIX`` to the bundled path so the frozen Tesseract binary picks up the bundled ``eng.traineddata``. A user-supplied ``TESSDATA_PREFIX`` is preserved untouched — power users who explicitly chose their own language data win. No-op outside a frozen bundle, or if the bundled dir doesn't exist (e.g. tessdata wasn't packaged for the current platform). """ if os.environ.get("TESSDATA_PREFIX"): return tessdata = _bundled_tessdata_dir() if tessdata is not None and tessdata.exists(): os.environ["TESSDATA_PREFIX"] = str(tessdata) def _autodetect_tesseract_path() -> str | None: """Locate a Tesseract binary outside the user's ``PATH``. Tries the bundled binary first (only present in PyInstaller frozen builds) so installer/portable users get a working OCR without touching their system. Falls back to the legacy Windows well-known install locations so users who installed UB Mannheim's Tesseract-OCR themselves keep working too. """ bundled = _bundled_tesseract_path() if bundled is not None and bundled.exists(): return str(bundled) if platform.system() != "Windows": return None candidates = [ r"C:\Program Files\Tesseract-OCR\tesseract.exe", r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe", os.path.expandvars( r"%LOCALAPPDATA%\Programs\Tesseract-OCR\tesseract.exe" ), ] for p in candidates: if p and Path(p).exists(): return p return None def ocr_available() -> tuple[bool, str]: """Return ``(available, reason)`` — is OCR usable right now? Discovery order: ``DATATOOLS_TESSERACT_PATH`` env var override, then the bundled binary (only present in a frozen build), then PATH-based lookup, then well-known Windows install locations. See the module-level discovery block for the full contract. """ try: import pytesseract # noqa: PLC0415 except ImportError: return False, "pytesseract is not installed." # Point Tesseract at the bundled tessdata (if any) BEFORE the # first ``get_tesseract_version`` call so the bundled language # data is loaded even when the user happens to also have a # system Tesseract that we'd otherwise fall through to. _apply_bundled_tessdata_prefix() override = os.environ.get("DATATOOLS_TESSERACT_PATH") if override: pytesseract.pytesseract.tesseract_cmd = override else: # Probe the bundled binary BEFORE PATH so frozen builds use # their own Tesseract instead of any incidental system one. bundled = _bundled_tesseract_path() if bundled is not None and bundled.exists(): pytesseract.pytesseract.tesseract_cmd = str(bundled) try: pytesseract.get_tesseract_version() return True, "" except Exception as e_path: candidate = _autodetect_tesseract_path() if candidate: pytesseract.pytesseract.tesseract_cmd = candidate try: pytesseract.get_tesseract_version() return True, "" except Exception as e_candidate: return False, ( f"Tesseract found at {candidate} but failed to " f"run: {e_candidate}" ) return False, f"Tesseract binary not found on PATH: {e_path}" def ocr_pdf_to_pages(pdf_bytes: bytes, dpi: int = 200) -> list[Page]: """OCR every page of *pdf_bytes* and return word-position-rich ``Page`` records, parallel to ``extract_pages``. Caller must check ``ocr_available()`` first. """ pdfium = _require_pdfium() import pytesseract # noqa: PLC0415 pages: list[Page] = [] pdf = pdfium.PdfDocument(pdf_bytes) try: scale = dpi / 72.0 for i in range(len(pdf)): pil_image = pdf[i].render(scale=scale).to_pil() data = pytesseract.image_to_data( pil_image, output_type=pytesseract.Output.DICT, ) words: list[WordBox] = [] for j, txt in enumerate(data.get("text", [])): t = (txt or "").strip() if not t: continue left = float(data["left"][j]) top = float(data["top"][j]) width = float(data["width"][j]) height = float(data["height"][j]) words.append(WordBox( x0=left / scale, top=top / scale, x1=(left + width) / scale, bottom=(top + height) / scale, text=t, )) text_blob = " ".join(w.text for w in words) pages.append(Page( page_no=i + 1, width=pil_image.width / scale, height=pil_image.height / scale, text=text_blob, words=words, )) finally: pdf.close() return pages def extract_pages_auto( pdf_bytes: bytes, *, allow_ocr: bool = True, ) -> tuple[list[Page], list[str]]: """Text extraction first; OCR the pages that come back empty. Returns ``(pages, warnings)`` — human-readable warning strings the caller surfaces in the UI. """ warnings: list[str] = [] pages = extract_pages(pdf_bytes) blank = [p for p in pages if not page_has_extractable_text(p)] if not blank: return pages, warnings if not allow_ocr: warnings.append( f"{len(blank)} page(s) appear scanned. OCR is disabled." ) return pages, warnings ok, reason = ocr_available() if not ok: warnings.append( f"{len(blank)} page(s) appear scanned but OCR isn't usable: " f"{reason}" ) return pages, warnings ocr_pages = ocr_pdf_to_pages(pdf_bytes) by_no = {p.page_no: p for p in ocr_pages} merged: list[Page] = [] for p in pages: if page_has_extractable_text(p): merged.append(p) elif p.page_no in by_no: merged.append(by_no[p.page_no]) else: merged.append(p) warnings.append( f"OCR was used for {len(blank)} page(s) with no extractable text." ) return merged, warnings # --------------------------------------------------------------------------- # Row detection (the only thing the GUI actually calls) # --------------------------------------------------------------------------- def _find_dates_in_words( row_words: list[WordBox], ) -> list[tuple[int, int, str]]: """Return every date-like substring on this row, sorted by position. Each entry is ``(start_idx, end_idx_exclusive, text)``. Two-pass search: - **Pass 1** — full-year patterns (``01/15/2026``, ``Jan 13, 2026``). Longest window first so multi-word dates aren't truncated to a partial short match. - **Pass 2** — short patterns (``01/13``, ``Jan 13``). Only claims word ranges that pass 1 didn't already take, so a real ``01/13/2026`` always wins over an adjacent ``Page 1/2``. Some statements show both a transaction date and a posting date per row (Chase, BofA, …). The scanner uses the first match as the canonical date for the CSV column, and excludes EVERY date from the description so the second / third dates don't leak into the description text. """ def _scan(patterns, window_order): local_found: list[tuple[int, int, str]] = [] local_claimed: set[int] = set() for i in range(len(row_words)): if i in local_claimed: continue matched = False for window in window_order: end = i + window if end > len(row_words): continue if any(j in local_claimed for j in range(i, end)): continue chunk = " ".join(x.text for x in row_words[i:end]) for rx in patterns: m = rx.search(chunk) if m: consumed = max(1, len(m.group(1).split())) actual_end = i + consumed local_found.append((i, actual_end, m.group(1))) local_claimed.update(range(i, actual_end)) matched = True break if matched: break return local_found full = _scan(_DATE_RES_FULL, (3, 2, 1)) if full: # A real full-year date on the row anchors interpretation. # Don't ALSO collect short patterns — they're almost always # page numbers ("Page 1/2") or fractions in memos when a # real date is present. return sorted(full, key=lambda t: t[0]) short = _scan(_DATE_RES_SHORT, (2, 1)) return sorted(short, key=lambda t: t[0]) def _find_amount_tokens( row_words: list[WordBox], ) -> list[tuple[int, WordBox, str]]: """Return ``[(word_index, wordbox, normalized_text)]`` for each amount-shaped token on this row, left-to-right. Filters out tokens that match the regex but lack real money markers (currency symbol, decimal point, parens, sign, thousand separator) — keeps bare years and page numbers out. """ out: list[tuple[int, WordBox, str]] = [] for i, w in enumerate(row_words): m = _AMOUNT_RE.search(w.text) if not m: continue token = m.group(1) if not re.search(r"[\$€£.,()\-]", token): continue out.append((i, w, token)) return out DEFAULT_DATE_FORMAT = "%Y-%m-%d" """ISO-8601-style ``YYYY-MM-DD``. Default for output date columns because it sorts lexicographically, parses in every spreadsheet tool the user might import the CSV into, and is unambiguous across US/EU readers.""" def format_amount(value, places: int = 2) -> str: """Render an amount value as a fixed-precision string. Floats lose trailing zeros in their native repr (``4.5`` is not ``4.50``), and pandas / Streamlit happily show that inconsistency cell-by-cell — confusing on a statement where every number is currency. This formatter forces *places* decimals so 4.5, 12.0 and 1000 all render with the same precision. Numeric → ``{value:.{places}f}``. None / empty / non-finite → empty string. Strings (typically the raw token preserved when ``parse_amount`` couldn't decode the original) pass through untouched so the user sees the source text in the editor. Booleans pass through as ``str(value)`` — guards against ``True`` rendering as ``"1.00"`` because Python treats ``bool`` as ``int``. """ if value is None or value == "": return "" if isinstance(value, bool): return str(value) if isinstance(value, (int, float)): import math if isinstance(value, float) and not math.isfinite(value): return "" return f"{value:.{places}f}" return str(value) def format_date(iso_str: str | None, fmt: str = DEFAULT_DATE_FORMAT) -> str: """Convert an ISO ``YYYY-MM-DD`` date string to *fmt*. Returns the input unchanged if it's not parseable as ISO, empty string if input is None/empty. The scanner uses this on every date column (transaction date + statement period start/end) so the output CSV is consistent. """ if not iso_str: return "" try: return datetime.strptime(iso_str, "%Y-%m-%d").strftime(fmt) except (ValueError, TypeError): return iso_str # --------------------------------------------------------------------------- # Statement-level metadata (account number + period) # --------------------------------------------------------------------------- # Account number regexes. Bank statements label these in a small # handful of conventional ways. The capture group is a permissive # run of digits / X / * / dashes / spaces — accounts are often # masked like ``****1234`` or printed with grouping like # ``1234-5678-9012``. _ACCOUNT_RES = [ re.compile( r"Account\s*(?:Number|No\.?|#)\s*[:.]?\s*" r"([X\*\d][X\*\d\-\s]{3,30}[X\*\d])", re.IGNORECASE, ), re.compile( r"Account\s*[:.]\s*([X\*\d][X\*\d\-\s]{3,30}[X\*\d])", re.IGNORECASE, ), re.compile( r"A/?[Cc]\s*(?:#|No\.?)?\s*[:.]?\s*" r"([X\*\d][X\*\d\-\s]{3,30}[X\*\d])", re.IGNORECASE, ), ] def _extract_account_number(text: str) -> str | None: """Find the first plausible account number in *text*. Plausible = at least 4 digit characters and matched near an 'Account' label. Whitespace is collapsed; the literal mask characters (``X``, ``*``) and dashes are preserved so the user sees ``****1234`` rather than ``1234`` (which would lose information). """ for rx in _ACCOUNT_RES: for m in rx.finditer(text): value = re.sub(r"\s+", " ", m.group(1).strip()) digit_count = sum(1 for c in value if c.isdigit()) if digit_count >= 4: return value return None _PERIOD_LABEL_RE = re.compile( r"(?:Statement\s*(?:Period|Date)|" r"For\s+the\s+(?:period|statement\s+period)|" r"Period\s+(?:Covered|Beginning|of\s+Statement)|" r"From)", re.IGNORECASE, ) def _extract_statement_period( text: str, ) -> tuple[str | None, str | None]: """Locate the statement period dates and return them as ISO ``(start, end)`` or ``(None, None)``. Strategy: find every "Statement Period" / "From" / etc. label, then look for full-year dates in the ~150 chars following the label. The first two dates become start/end. If only one date appears, both fields get the same value (single-statement-date case — common on monthly cycles where only the closing date is shown). """ for label_m in _PERIOD_LABEL_RE.finditer(text): snippet = text[label_m.end() : label_m.end() + 150] dates: list[tuple[int, str]] = [] for rx in _DATE_RES_FULL: for m in rx.finditer(snippet): iso = parse_date(m.group(1)) if iso: dates.append((m.start(), iso)) if dates: dates.sort(key=lambda x: x[0]) if len(dates) >= 2: return dates[0][1], dates[1][1] return dates[0][1], dates[0][1] return None, None def extract_statement_metadata( pages: list[Page], ) -> dict[str, str | None]: """Pull account number + statement period out of the header region of *pages*. Searches page 1's text, falling back to page 1 + 2 combined if page 1's account/period detection comes up empty (some statements put header info on page 2 — Wells Fargo business accounts do this). Returns ``{"account_number", "period_start", "period_end"}`` with ``None`` for any field that couldn't be detected. ISO format for the dates. """ if not pages: return { "account_number": None, "period_start": None, "period_end": None, } text = pages[0].text account = _extract_account_number(text) start, end = _extract_statement_period(text) # Fallback to pages 1+2 if anything was missed. if (account is None or start is None) and len(pages) > 1: extended = pages[0].text + "\n" + pages[1].text if account is None: account = _extract_account_number(extended) if start is None: start, end = _extract_statement_period(extended) return { "account_number": account, "period_start": start, "period_end": end, } def _try_short_date_with_year(raw_date: str, year: int) -> str | None: """Append *year* to a short date string and try to parse it. Returns ISO or None if no format matches.""" candidates = [ ("%m/%d/%Y", f"{raw_date}/{year}"), ("%m-%d-%Y", f"{raw_date}-{year}"), ("%b %d %Y", f"{raw_date} {year}"), ("%d %b %Y", f"{raw_date} {year}"), ] for fmt, candidate in candidates: try: return datetime.strptime(candidate, fmt).strftime("%Y-%m-%d") except ValueError: continue return None _YEAR_FROM_FILENAME_RE = re.compile(r"(? int | None: """Extract a 4-digit year from a filename like ``eStmt_2025-01-13.pdf`` → ``2025``. Returns the first match, or ``None`` if no 20XX pattern is present. Used as a fallback signal when the statement period can't be detected from the PDF's text — many bank-statement filenames follow the convention ``eStmt_YYYY-MM-DD.pdf`` so the year is right there. """ if not filename: return None m = _YEAR_FROM_FILENAME_RE.search(filename) return int(m.group(1)) if m else None def _infer_year_for_short_date( raw_date: str, period_start_iso: str | None, period_end_iso: str | None, *, filename_year_hint: int | None = None, override_year: int | None = None, ) -> str | None: """Bind a short date like ``01/13`` to a full ISO date using the best available year evidence. Priority order: 1. ``override_year`` — user-supplied, beats all heuristics. 2. ``period_start_iso`` + ``period_end_iso`` — generate candidates for BOTH years (they differ only on Dec/Jan-boundary statements) and pick the one that falls inside the period, or closest if neither is inside. Handles the Dec/Jan case: a ``12/30`` row in a 2024-12-16 to 2025-01-15 statement resolves to 2024-12-30 because that's the only candidate inside the period. 3. ``filename_year_hint`` — when the statement-period regex missed but the filename carries a year (common in bank e-statement naming). Returns ISO ``YYYY-MM-DD`` or None when no signal is available — caller falls back to the raw text so the user can correct in the editor. """ if not raw_date: return None if override_year: return _try_short_date_with_year(raw_date, override_year) if period_start_iso and period_end_iso: try: start_dt = datetime.strptime(period_start_iso, "%Y-%m-%d") end_dt = datetime.strptime(period_end_iso, "%Y-%m-%d") except (ValueError, TypeError): start_dt = end_dt = None if start_dt and end_dt: years_to_try = {start_dt.year, end_dt.year} candidates: list[str] = [] for year in years_to_try: iso = _try_short_date_with_year(raw_date, year) if iso: candidates.append(iso) if candidates: def distance(iso_str: str) -> int: dt = datetime.strptime(iso_str, "%Y-%m-%d") if start_dt <= dt <= end_dt: return 0 # Outside the period — measure shortest gap # to either edge so a 12/15 transaction in a # 12/16-01/15 statement still leans toward the # period's start year. return min( abs((dt - start_dt).days), abs((dt - end_dt).days), ) candidates.sort(key=distance) return candidates[0] if filename_year_hint: return _try_short_date_with_year(raw_date, filename_year_hint) return None def _description_from_row( row_words: list[WordBox], date_ranges: list[tuple[int, int]], amount_idxs: set[int], ) -> str: """Stitch the description from the row's non-date, non-amount tokens. ``date_ranges`` is a list of ``(start, end)`` (end exclusive) — every word in any range is excluded. Why a list: some bank statements show two dates per row (transaction + posting). Without excluding all of them, the extra date(s) leak into the description and look like trash. Keeps tokens before the first amount and after the last amount (trailing check numbers, memos); drops words between amount tokens (usually whitespace artifacts in column gaps). """ excluded: set[int] = set() for start, end in date_ranges: excluded.update(range(start, end)) keep: list[str] = [] seen_first_amount = False last_amount_idx = max(amount_idxs) if amount_idxs else -1 for i, w in enumerate(row_words): if i in excluded: continue if i in amount_idxs: seen_first_amount = True continue if seen_first_amount and i < last_amount_idx: continue keep.append(w.text) return " ".join(keep).strip() def scan_pdf_for_transactions( pdf_bytes: bytes, *, negative_in_parens: bool = True, allow_ocr: bool = True, date_formats: list[str] | None = None, y_tolerance: float = 3.0, merge_multiline_descriptions: bool = True, output_date_format: str = DEFAULT_DATE_FORMAT, filename_year_hint: int | None = None, year_override: int | None = None, ) -> tuple[list[dict[str, Any]], list[str]]: """Scan *pdf_bytes* for transaction-like rows. A row qualifies if it contains a date pattern AND at least one amount pattern. Each returned record looks like:: { "date": "2026-01-15", # output_date_format applied "description": "...", "amount_1": 4.50, "amount_2": 1000.00, # if a second amount was found "page": 1, "raw": "01/15/2026 Coffee $4.50", "account_number": "****1234", # from header } Account number is extracted from the statement header once per PDF and stamped onto every detected row so the CSV is self-attributing when statements are combined. The statement period IS detected (used internally for year inference on short dates like "01/13") but isn't surfaced as a per-row column — the inferred year already lives in the ``date`` field. Short dates without a year (``01/13``, ``Jan 13``) are bound to the year of the statement period's end before formatting. If period detection fails, the raw short text is preserved. Multi-line descriptions (rows with no date and no amount) attach to the most recent transaction row when ``merge_multiline_descriptions=True`` (default). Returns ``(rows, warnings)``. Warnings are human-readable strings the GUI surfaces in an expander. """ pages, warnings = extract_pages_auto(pdf_bytes, allow_ocr=allow_ocr) metadata = extract_statement_metadata(pages) out_rows: list[dict[str, Any]] = [] # Maximum y-gap (in PDF points) between a transaction and a # following no-date-no-amount line for that line to count as a # continuation of the description. Typical line baselines sit # ~10–14 pts apart; 25 pts allows for one blank line but # rejects section headers that are several rows away. _MULTILINE_MERGE_MAX_GAP = 25.0 for page in pages: # ``prev`` and ``prev_y_bottom`` reset per page so a section # header at the top of page 2 can't attach to the last # transaction on page 1 — PDF y-coordinates restart at the # top of each page so the y-distance check is meaningless # across page boundaries. prev: dict[str, Any] | None = None prev_y_bottom: float | None = None rows = cluster_rows(page.words, y_tolerance=y_tolerance) for row_words in rows: line = " ".join(w.text for w in row_words).strip() if not line: continue dates = _find_dates_in_words(row_words) amount_tokens = _find_amount_tokens(row_words) if not dates or not amount_tokens: # Continuation candidate — a line with no date AND # no amount of its own. Only attach to the previous # transaction if (a) we have one, (b) it's on this # same page, and (c) the y-gap to it is small enough # that a human would read this as a wrapped line # rather than a separate paragraph or section header. if ( merge_multiline_descriptions and prev is not None and not dates and not amount_tokens and row_words ): current_top = min(w.top for w in row_words) if ( prev_y_bottom is not None and (current_top - prev_y_bottom) <= _MULTILINE_MERGE_MAX_GAP ): prev["description"] = ( (prev["description"] + " " + line).strip() ) prev_y_bottom = max(w.bottom for w in row_words) continue # First date wins for the "date" column; ALL dates are # excluded from the description so a row carrying both # a transaction date and a posting date doesn't leak # the second one into description text. _, _, first_date_text = dates[0] date_ranges = [(s, e) for s, e, _ in dates] amount_idxs = {idx for idx, _, _ in amount_tokens} desc = _description_from_row( row_words, date_ranges, amount_idxs, ) # Every real transaction must have a description. Rows # like "01/13/2025 $1,000.00" (Daily Ledger Balances # section, page totals, period summaries) carry a date # and an amount but no text in between — they're # statement furniture, not transactions. Drop them. if not desc.strip(): continue iso = parse_date(first_date_text, date_formats) if iso is None: # Short date — try to bind a year using the cascade: # override → statement period (Dec/Jan-aware) → # filename year hint. Each signal is a separate # argument so the caller can mix-and-match. iso = _infer_year_for_short_date( first_date_text, metadata["period_start"], metadata["period_end"], filename_year_hint=filename_year_hint, override_year=year_override, ) formatted_date = ( format_date(iso, output_date_format) if iso else first_date_text ) record: dict[str, Any] = { "date": formatted_date, "description": desc, "page": page.page_no, "raw": line, } for k, (_, _, txt) in enumerate(amount_tokens, start=1): parsed = parse_amount( txt, negative_in_parens=negative_in_parens, ) # Fall back to the raw text if the parser fails so # the user sees something to fix in the editor # rather than a silent NaN. record[f"amount_{k}"] = ( parsed if parsed is not None else txt ) # Drop rows where the transaction amount is exactly 0. # Bank statements include noise like "INTEREST EARNED # 0.00" or "PAGE TOTAL 0.00" that pass the date+amount # heuristic but aren't real transactions. We key off # ``amount_1`` (leftmost amount = usually the txn # amount); a non-zero balance in ``amount_2`` doesn't # rescue a zero ``amount_1``. if not _has_real_transaction_amount(record): continue # Stamp the account number onto every kept row so the # CSV is self-attributing when statements are combined. # The period start/end aren't surfaced per row — they're # used only for the year-inference fallback above # (binding short dates like "01/13" to the statement's # year) but downstream the date column already carries # the inferred full date. record["account_number"] = metadata["account_number"] or "" out_rows.append(record) prev = record prev_y_bottom = ( max(w.bottom for w in row_words) if row_words else None ) return out_rows, warnings def _has_real_transaction_amount(record: dict[str, Any]) -> bool: """``amount_1`` is the row's primary amount. Drop rows whose amount_1 parsed to exactly 0; keep everything else (positive, negative, or unparsed-but-non-empty).""" amount_1 = record.get("amount_1") if amount_1 is None: return False if isinstance(amount_1, (int, float)): return amount_1 != 0 # Unparsed string — keep so the user can verify in the editor. return bool(str(amount_1).strip()) def diagnose_pdf_lines( pdf_bytes: bytes, *, allow_ocr: bool = True, max_lines: int = 200, ) -> tuple[list[dict[str, Any]], list[str]]: """Dump every clustered text line from a PDF for diagnosis. Surfaces what the scanner actually saw — including lines the detector dropped because they lacked a date or amount. Use when ``scan_pdf_for_transactions`` returns 0 rows so the user can spot what's wrong (no extractable text → scanned PDF / weird date format / amounts in a column the regex misses). Returns ``(lines, warnings)`` where each line is:: {"page": int, "text": str, "has_date": bool, "has_amount": bool} Capped at *max_lines* across all pages so a 100-page statement doesn't dump 10,000 rows into the UI. """ pages, warnings = extract_pages_auto(pdf_bytes, allow_ocr=allow_ocr) out: list[dict[str, Any]] = [] for page in pages: rows = cluster_rows(page.words) for row_words in rows: text = " ".join(w.text for w in row_words).strip() if not text: continue out.append({ "page": page.page_no, "text": text, "has_date": bool(_find_dates_in_words(row_words)), "has_amount": bool(_find_amount_tokens(row_words)), }) if len(out) >= max_lines: warnings.append( f"Diagnostic capped at {max_lines} lines. " "Larger PDFs aren't fully shown here — the full " "scan still runs in Scan mode." ) return out, warnings return out, warnings __all__ = [ "PdfDependencyMissing", "Page", "WordBox", "cluster_rows", "diagnose_pdf_lines", "extract_pages", "extract_pages_auto", "extract_statement_metadata", "format_amount", "format_date", "ocr_available", "parse_amount", "parse_date", "scan_pdf_for_transactions", "year_from_filename", ]