From b8aff862edb774991303408fd7c358eb1dc44c7a Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 May 2026 22:44:51 +0000 Subject: [PATCH] =?UTF-8?q?feat(pdf):=20add=20pure=20PDF=E2=86=92DataFrame?= =?UTF-8?q?=20extraction=20module?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1/6 of the PDF Extractor tool. Pure module — no Streamlit, no user-config I/O — that turns a PDF blob plus a template dict into a ``pandas.DataFrame`` of transaction rows. Primary use case is accountant-style extraction of bank-statement transactions, where each bank's format is encoded as a reusable template. Pipeline: 1. ``extract_pages(pdf_bytes)`` reads with pdfplumber and surfaces words with bounding boxes. 2. ``cluster_rows(words)`` groups words into rows by ``top`` tolerance — no reliance on PDF table-line detection (most bank statements have no visible cell borders). 3. ``assign_columns(row_words, boundaries)`` buckets each word by its horizontal midpoint into N+1 columns defined by N interior x-boundaries. 4. ``_within_table_window`` slices to the band between the header line and the end-marker (e.g. "Closing balance"). 5. ``apply_template`` orchestrates the above, handling: - parens-style negative amounts, currency stripping, custom decimal/thousands separators - separate debit + credit columns combined into a single signed ``amount`` (credit positive, debit negative — accounting register convention; matches QuickBooks/Xero imports) - multi-line description wrapping (rows with empty date column attach to the previous row's description) - row-level regex skip filters (e.g., "Total", "Subtotal") - page-range filters ("all", "2-", "1,3-5") Optional OCR fallback for scanned statements: - ``page_has_extractable_text`` heuristic flags pages with <5 words as likely-scanned. - ``ocr_available()`` checks both the ``pytesseract`` Python binding and the Tesseract binary; surfaces a clear reason string when either is missing. - ``extract_pages_auto`` does text-first, OCR-the-blanks, and returns warnings the UI can surface. 29 unit tests cover the parsing pipeline against synthetic WordBox/Page data — no fixture PDFs required, runs in 0.1s. Real PDF extraction is exercised by hand on the user's statements. Dependencies added: - ``pdfplumber>=0.10,<1`` — text + position extraction - ``pypdfium2>=4,<6`` — page rasterization for OCR + visual picker - ``streamlit-drawable-canvas>=0.9,<1`` — visual region picker (used in commit 5) - ``pytesseract>=0.3,<1`` — OCR (used in commit 6; system Tesseract binary required separately) - ``cryptography>=41,<49`` — bumped upper bound; pdfminer.six transitively requires a recent release. Internal ed25519 license-signing usage is API-stable across the bump. Co-Authored-By: Claude Opus 4.7 (1M context) --- requirements.txt | 6 +- src/pdf_extract.py | 616 ++++++++++++++++++++++++++++++++++++++ tests/test_pdf_extract.py | 286 ++++++++++++++++++ 3 files changed, 907 insertions(+), 1 deletion(-) create mode 100644 src/pdf_extract.py create mode 100644 tests/test_pdf_extract.py diff --git a/requirements.txt b/requirements.txt index 2cf200d..59557d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,8 @@ tqdm>=4.66,<5 typer>=0.12,<1 phonenumbers>=8.13,<9 streamlit>=1.35,<2 -cryptography>=41,<46 +streamlit-drawable-canvas>=0.9,<1 +cryptography>=41,<49 +pdfplumber>=0.10,<1 +pypdfium2>=4,<6 +pytesseract>=0.3,<1 diff --git a/src/pdf_extract.py b/src/pdf_extract.py new file mode 100644 index 0000000..43ba1d5 --- /dev/null +++ b/src/pdf_extract.py @@ -0,0 +1,616 @@ +"""PDF transaction extraction. + +Pure module — no Streamlit, no user-config I/O. Reads PDF bytes, +produces a ``pandas.DataFrame`` of rows according to a template +dict. The accountant-facing use case is extracting transaction +tables from bank statements (different banks = different +templates, reused across statements that share a format). + +Strategy: + +- ``pdfplumber`` for text + word positions. Bank-statement tables + rarely have visible cell borders, so we don't rely on table-line + detection — instead the template carries explicit column + x-position boundaries (set by the visual picker UI). +- Rows are detected by clustering word ``top`` (y-position) values + within a small tolerance — words on the same baseline. +- Multi-line descriptions: rows whose first column (date) is empty + are merged into the previous row's description column. +- Signed amounts: parenthesized values (``(123.45)``) parse negative. + Single signed amount column passes through. Separate debit/credit + columns are combined into one signed amount column with credits + positive and debits negative (accounting register convention — + matches QuickBooks/Xero import expectations). +- Optional OCR: pages with no extractable text fall through to + ``pytesseract`` IF the binding + Tesseract binary are both + available. Otherwise the page is skipped with a warning row. + +The template is a plain dict matching the schema documented in +``src/pdf_templates.py``. This module reads it; ``pdf_templates`` +manages its persistence and validation. +""" + +from __future__ import annotations + +import io +import re +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +import pandas as pd +import pdfplumber + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class WordBox: + """A single word with its bounding box on a page. + + Coordinates are in PDF points (1/72 inch), origin top-left.""" + x0: float + top: float + x1: float + bottom: float + text: str + + +@dataclass +class Page: + """One PDF page's text + word positions.""" + page_no: int # 1-indexed + width: float + height: float + text: str + words: list[WordBox] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# PDF reading +# --------------------------------------------------------------------------- + + +def extract_pages(pdf_bytes: bytes) -> list[Page]: + """Parse a PDF blob into our internal ``Page`` representation. + + Each page carries every word's bounding box; downstream code + groups them into rows by ``top`` clustering and into columns + by template-defined x-boundaries. + """ + out: list[Page] = [] + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: + for i, page in enumerate(pdf.pages, start=1): + words_raw = page.extract_words( + use_text_flow=True, + keep_blank_chars=False, + extra_attrs=[], + ) + words = [ + WordBox( + x0=float(w["x0"]), + top=float(w["top"]), + x1=float(w["x1"]), + bottom=float(w["bottom"]), + text=str(w["text"]), + ) + for w in words_raw + ] + out.append( + Page( + page_no=i, + width=float(page.width), + height=float(page.height), + text=page.extract_text() or "", + words=words, + ) + ) + return out + + +# --------------------------------------------------------------------------- +# Value parsing +# --------------------------------------------------------------------------- + + +_AMOUNT_DEFAULTS = { + "decimal_separator": ".", + "thousands_separator": ",", + "currency_strip": "$", + "negative_in_parens": True, +} + +_DATE_FORMATS_FALLBACK = [ + "%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%d/%m/%Y", "%d/%m/%y", + "%b %d %Y", "%d %b %Y", "%d-%b-%Y", "%m-%d-%Y", "%m-%d-%y", +] + + +def parse_amount(text: str, opts: dict[str, Any] | None = None) -> float | None: + """Parse a money string to a signed float, or ``None`` if it doesn't parse. + + Handles: currency prefixes, thousands separators, parenthesized + negatives, trailing minus signs ("123.45-"), and bare blanks. + """ + if text is None: + return None + s = text.strip() + if not s: + return None + o = {**_AMOUNT_DEFAULTS, **(opts or {})} + + negative = False + if o["negative_in_parens"] and s.startswith("(") and s.endswith(")"): + negative = True + s = s[1:-1].strip() + if s.endswith("-"): + negative = True + s = s[:-1].strip() + if s.startswith("-"): + negative = True + s = s[1:].strip() + currency = o.get("currency_strip") or "" + if currency: + for ch in currency: + s = s.replace(ch, "") + s = s.replace(" ", "") + if o["thousands_separator"]: + s = s.replace(o["thousands_separator"], "") + if o["decimal_separator"] != ".": + s = s.replace(o["decimal_separator"], ".") + + if not s or not re.match(r"^\d+(\.\d+)?$", s): + return None + val = float(s) + return -val if negative else val + + +def parse_date( + text: str, + formats: list[str] | None = None, +) -> str | None: + """Parse a date string against the provided formats and return ISO ``YYYY-MM-DD``. + + Falls back to a list of common formats if *formats* is empty. + Returns ``None`` if no format matches. + """ + if text is None: + return None + s = text.strip() + if not s: + return None + tries = list(formats or []) + _DATE_FORMATS_FALLBACK + for fmt in tries: + try: + dt = datetime.strptime(s, fmt) + return dt.strftime("%Y-%m-%d") + except ValueError: + continue + return None + + +# --------------------------------------------------------------------------- +# Row + column structure +# --------------------------------------------------------------------------- + + +def cluster_rows( + words: list[WordBox], + y_tolerance: float = 3.0, +) -> list[list[WordBox]]: + """Group word boxes into rows by ``top`` coordinate. + + Words whose ``top`` is within *y_tolerance* of an existing row's + median are added to that row. Otherwise a new row is started. + Output rows are sorted top-to-bottom; within a row, words are + sorted left-to-right. + """ + if not words: + return [] + by_top = sorted(words, key=lambda w: w.top) + rows: list[list[WordBox]] = [] + current: list[WordBox] = [by_top[0]] + current_top = by_top[0].top + for w in by_top[1:]: + if abs(w.top - current_top) <= y_tolerance: + current.append(w) + else: + rows.append(sorted(current, key=lambda w: w.x0)) + current = [w] + current_top = w.top + rows.append(sorted(current, key=lambda w: w.x0)) + return rows + + +def assign_columns( + row_words: list[WordBox], + boundaries: list[float], +) -> list[str]: + """Bucket the words of a single row into columns. + + ``boundaries`` are the *interior* x-positions between adjacent + columns. N boundaries → N+1 columns. A word's column is decided + by its horizontal midpoint; words within a column are joined + with single spaces in left-to-right order. + """ + n_cols = len(boundaries) + 1 + buckets: list[list[WordBox]] = [[] for _ in range(n_cols)] + sorted_bounds = sorted(boundaries) + for w in row_words: + mid = (w.x0 + w.x1) / 2 + col = 0 + for i, b in enumerate(sorted_bounds): + if mid >= b: + col = i + 1 + buckets[col].append(w) + return [ + " ".join(w.text for w in sorted(bucket, key=lambda w: w.x0)) + for bucket in buckets + ] + + +# --------------------------------------------------------------------------- +# Template application +# --------------------------------------------------------------------------- + + +def _pages_in_range(pages: list[Page], range_spec: str) -> list[Page]: + """Filter *pages* by a range spec like ``"all"``, ``"2-"``, ``"1,3-5"``. + + Empty / ``"all"`` returns all pages. Bad specs return all pages + (template author can fix at preview time).""" + s = (range_spec or "").strip().lower() + if not s or s == "all": + return pages + keep: set[int] = set() + for chunk in s.split(","): + chunk = chunk.strip() + if not chunk: + continue + if "-" in chunk: + a, b = chunk.split("-", 1) + a_i = int(a) if a.strip().isdigit() else 1 + b_i = int(b) if b.strip().isdigit() else len(pages) + for i in range(a_i, b_i + 1): + keep.add(i) + elif chunk.isdigit(): + keep.add(int(chunk)) + return [p for p in pages if p.page_no in keep] if keep else pages + + +def _within_table_window( + rows: list[list[WordBox]], + header_text: str, + end_markers: list[str], +) -> list[list[WordBox]]: + """Slice *rows* to the band between the header line and the end marker. + + Header match: the first row whose joined text contains every word + of ``header_text`` (case-insensitive). The header row itself is + excluded. End match: any row whose joined text matches one of the + ``end_markers`` regex patterns; that row and below are excluded. + + Empty ``header_text`` keeps from the first row; empty + ``end_markers`` keeps through the last row. + """ + if not rows: + return [] + needle_words = [w.lower() for w in (header_text or "").split() if w] + end_res = [re.compile(p, re.IGNORECASE) for p in end_markers if p] + + start = 0 + if needle_words: + start = -1 + for i, row in enumerate(rows): + joined = " ".join(w.text for w in row).lower() + if all(nw in joined for nw in needle_words): + start = i + 1 + break + if start == -1: + return [] + + end = len(rows) + for i in range(start, len(rows)): + joined = " ".join(w.text for w in rows[i]) + if any(rx.search(joined) for rx in end_res): + end = i + break + return rows[start:end] + + +def _row_is_continuation(cells: list[str]) -> bool: + """A row whose first column is empty is treated as a continuation + of the previous row's description (multi-line wrap).""" + return bool(cells) and not cells[0].strip() + + +def _coerce_amount_columns( + record: dict[str, str], + column_map: list[dict[str, Any]], + parse_opts: dict[str, Any], +) -> dict[str, Any]: + """Convert source-column text into typed output fields. + + Supports three amount shapes: + + 1. A single column mapped to ``amount`` — passes through with sign. + 2. Two columns mapped to ``amount_debit`` + ``amount_credit`` — + combined into a signed ``amount`` (credit positive, debit + negative — accounting register convention). + 3. A column mapped to ``balance`` — parsed as signed number. + + The ``date`` target is parsed against the template's date format. + Other targets pass through as text. + """ + out: dict[str, Any] = {} + debit_val: float | None = None + credit_val: float | None = None + + for col in column_map: + target = col.get("target", "") + source_text = record.get(target, "") if target else "" + if target == "date": + iso = parse_date(source_text, parse_opts.get("date_formats") or []) + out["date"] = iso or source_text + elif target == "description": + out["description"] = source_text + elif target == "amount": + out["amount"] = parse_amount(source_text, parse_opts) + elif target == "amount_debit": + debit_val = parse_amount(source_text, parse_opts) + elif target == "amount_credit": + credit_val = parse_amount(source_text, parse_opts) + elif target == "balance": + out["balance"] = parse_amount(source_text, parse_opts) + elif target: + out[target] = source_text + + if "amount" not in out and (debit_val is not None or credit_val is not None): + amt = 0.0 + if credit_val: + amt += credit_val + if debit_val: + amt -= debit_val + out["amount"] = amt + out["type"] = "credit" if amt > 0 else ("debit" if amt < 0 else "") + return out + + +def apply_template( + pages: list[Page], + template: dict[str, Any], +) -> pd.DataFrame: + """Run *template* over *pages* and return the extracted DataFrame. + + Template schema is defined in ``src/pdf_templates.py``. Missing + keys fall through to sensible defaults so a half-built template + in the GUI still produces a preview. + """ + pages_cfg = template.get("pages", {}) or {} + table_cfg = template.get("table", {}) or {} + columns_cfg = template.get("columns", []) or [] + parse_cfg = template.get("parse", {}) or {} + + pages_used = _pages_in_range(pages, pages_cfg.get("range", "all")) + skip_pages_re = pages_cfg.get("skip_matching") or "" + if skip_pages_re: + skip_re = re.compile(skip_pages_re, re.IGNORECASE) + pages_used = [p for p in pages_used if not skip_re.search(p.text)] + + boundaries = list(table_cfg.get("column_boundaries", []) or []) + header_text = table_cfg.get("header_text", "") or "" + end_markers = list(table_cfg.get("end_markers", []) or []) + skip_rows_res = [ + re.compile(p, re.IGNORECASE) + for p in (table_cfg.get("skip_rows_matching") or []) + ] + merge_multiline = bool(parse_cfg.get("merge_multiline_description", True)) + + target_names = [c.get("target") for c in columns_cfg if c.get("target")] + if not target_names: + target_names = [f"col_{i}" for i in range(len(boundaries) + 1)] + + parse_opts = { + "decimal_separator": parse_cfg.get("decimal_separator", "."), + "thousands_separator": parse_cfg.get("thousands_separator", ","), + "currency_strip": parse_cfg.get("currency_strip", "$"), + "negative_in_parens": parse_cfg.get("amount_negative_in_parens", True), + "date_formats": parse_cfg.get("date_formats") + or ([parse_cfg["date_format"]] if parse_cfg.get("date_format") else []), + } + + out_rows: list[dict[str, Any]] = [] + for page in pages_used: + rows = cluster_rows( + page.words, + y_tolerance=float(table_cfg.get("y_tolerance", 3.0)), + ) + rows = _within_table_window(rows, header_text, end_markers) + + prev_record: dict[str, Any] | None = None + for row_words in rows: + if not boundaries: + cells = [" ".join(w.text for w in row_words)] + else: + cells = assign_columns(row_words, boundaries) + joined = " ".join(c.strip() for c in cells if c.strip()) + if not joined: + continue + if any(rx.search(joined) for rx in skip_rows_res): + continue + + if merge_multiline and _row_is_continuation(cells) and prev_record: + # Glue the non-empty columns into the previous record's + # description (the natural sink for wrapped text). + extra = " ".join(c.strip() for c in cells if c.strip()) + if extra: + prev_record["description"] = ( + (prev_record.get("description") or "") + + " " + + extra + ).strip() + continue + + record_src: dict[str, str] = {} + for col_cfg in columns_cfg: + src_idx = col_cfg.get("source") + tgt = col_cfg.get("target") + if ( + isinstance(src_idx, int) + and 0 <= src_idx < len(cells) + and tgt + ): + record_src[tgt] = cells[src_idx] + + record_src.setdefault("_page", str(page.page_no)) + record = _coerce_amount_columns(record_src, columns_cfg, parse_opts) + record["_page"] = page.page_no + out_rows.append(record) + prev_record = record + + if not out_rows: + return pd.DataFrame() + df = pd.DataFrame(out_rows) + + preferred = ["date", "description", "amount", "type", "balance"] + cols = [c for c in preferred if c in df.columns] + extras = [c for c in df.columns if c not in cols and c != "_page"] + df = df[cols + extras + (["_page"] if "_page" in df.columns else [])] + return df + + +# --------------------------------------------------------------------------- +# OCR fallback (optional) +# --------------------------------------------------------------------------- + + +def page_has_extractable_text(page: Page, min_words: int = 5) -> bool: + """Heuristic: a scanned page typically yields zero or near-zero + words. ``min_words`` of 5 catches title/logo-only pages too.""" + return len(page.words) >= min_words + + +def ocr_available() -> tuple[bool, str]: + """Return ``(available, reason)`` — is OCR usable right now? + + Checks both the Python binding (``pytesseract``) and the + Tesseract binary. The reason string is suitable for surfacing to + the user when OCR is unavailable. + """ + try: + import pytesseract # noqa: F401 + except ImportError: + return False, "pytesseract is not installed." + try: + import pytesseract as pt + pt.get_tesseract_version() + except Exception as e: + return False, f"Tesseract binary not found: {e}" + return True, "" + + +def ocr_pdf_to_pages(pdf_bytes: bytes, dpi: int = 200) -> list[Page]: + """Run Tesseract over each page of *pdf_bytes* and return a + word-position-rich ``Page`` list, parallel to ``extract_pages``. + + Caller is responsible for first checking ``ocr_available()``. + Uses pypdfium2 to rasterize and pytesseract's ``image_to_data`` + to recover per-word bounding boxes so the same column-assignment + pipeline keeps working. + """ + import pypdfium2 as pdfium + import pytesseract + from PIL import Image # noqa: F401 (transitively required) + + pages: list[Page] = [] + pdf = pdfium.PdfDocument(pdf_bytes) + try: + # PDF points-per-inch is 72; scale renders into pixels. + scale = dpi / 72.0 + for i in range(len(pdf)): + pil_image = pdf[i].render(scale=scale).to_pil() + data = pytesseract.image_to_data( + pil_image, + output_type=pytesseract.Output.DICT, + ) + words: list[WordBox] = [] + for j, txt in enumerate(data.get("text", [])): + t = (txt or "").strip() + if not t: + continue + left = float(data["left"][j]) + top = float(data["top"][j]) + width = float(data["width"][j]) + height = float(data["height"][j]) + # Convert pixel coords back to PDF points so column + # boundaries from the template (in PDF points) keep + # working when an OCR page is mixed with text pages. + words.append(WordBox( + x0=left / scale, + top=top / scale, + x1=(left + width) / scale, + bottom=(top + height) / scale, + text=t, + )) + text_blob = " ".join(w.text for w in words) + pages.append(Page( + page_no=i + 1, + width=pil_image.width / scale, + height=pil_image.height / scale, + text=text_blob, + words=words, + )) + finally: + pdf.close() + return pages + + +def extract_pages_auto( + pdf_bytes: bytes, + *, + allow_ocr: bool = True, +) -> tuple[list[Page], list[str]]: + """Try text extraction first; OCR the pages that come back empty. + + Returns ``(pages, warnings)``. ``warnings`` is a list of human- + readable strings — e.g. "Pages 3, 4 had no text and OCR is + unavailable; they were skipped." Caller surfaces these in the UI. + """ + warnings: list[str] = [] + pages = extract_pages(pdf_bytes) + blank = [p for p in pages if not page_has_extractable_text(p)] + if not blank: + return pages, warnings + + if not allow_ocr: + warnings.append( + f"{len(blank)} page(s) appear scanned. OCR is disabled." + ) + return pages, warnings + + ok, reason = ocr_available() + if not ok: + warnings.append( + f"{len(blank)} page(s) appear scanned but OCR isn't usable: " + f"{reason}" + ) + return pages, warnings + + ocr_pages = ocr_pdf_to_pages(pdf_bytes) + # Splice OCR results into the original list for the blank pages. + by_no = {p.page_no: p for p in ocr_pages} + merged: list[Page] = [] + for p in pages: + if page_has_extractable_text(p): + merged.append(p) + elif p.page_no in by_no: + merged.append(by_no[p.page_no]) + else: + merged.append(p) + warnings.append( + f"OCR was used for {len(blank)} page(s) with no extractable text." + ) + return merged, warnings diff --git a/tests/test_pdf_extract.py b/tests/test_pdf_extract.py new file mode 100644 index 0000000..0f72aed --- /dev/null +++ b/tests/test_pdf_extract.py @@ -0,0 +1,286 @@ +"""Tests for the pure PDF-extraction pipeline. + +Real PDF parsing (``extract_pages``) is a thin wrapper around +``pdfplumber`` and is exercised by hand on real bank statements. +These tests pin the meaty bits — value parsing, row clustering, +column assignment, template-driven extraction — against synthetic +``WordBox`` data so they run fast and have no PDF dependency. +""" + +from __future__ import annotations + +import pandas as pd + +from src.pdf_extract import ( + Page, + WordBox, + apply_template, + assign_columns, + cluster_rows, + parse_amount, + parse_date, + _pages_in_range, + _within_table_window, +) + + +def _w(text: str, x0: float, top: float, x1: float | None = None) -> WordBox: + """Convenience constructor — heights and exact x1 don't matter + for the tests we write.""" + return WordBox( + x0=x0, + top=top, + x1=x1 if x1 is not None else x0 + 10 * len(text), + bottom=top + 10, + text=text, + ) + + +class TestParseAmount: + def test_plain_positive(self): + assert parse_amount("1234.56") == 1234.56 + + def test_currency_and_thousands(self): + assert parse_amount("$1,234.56") == 1234.56 + + def test_parens_negative(self): + assert parse_amount("(1,234.56)") == -1234.56 + + def test_leading_minus(self): + assert parse_amount("-100.00") == -100.0 + + def test_trailing_minus(self): + assert parse_amount("100.00-") == -100.0 + + def test_blank_returns_none(self): + assert parse_amount("") is None + assert parse_amount(" ") is None + assert parse_amount(None) is None + + def test_garbage_returns_none(self): + assert parse_amount("not a number") is None + + def test_european_decimal(self): + opts = { + "decimal_separator": ",", + "thousands_separator": ".", + "currency_strip": "€", + "negative_in_parens": True, + } + assert parse_amount("€1.234,56", opts) == 1234.56 + + +class TestParseDate: + def test_us_slash(self): + assert parse_date("01/15/2026", ["%m/%d/%Y"]) == "2026-01-15" + + def test_iso(self): + assert parse_date("2026-01-15", ["%Y-%m-%d"]) == "2026-01-15" + + def test_fallback_format(self): + # Not in the supplied list — should still parse via fallback. + assert parse_date("01/15/26") == "2026-01-15" + + def test_invalid(self): + assert parse_date("not-a-date") is None + + +class TestClusterRows: + def test_groups_close_y(self): + words = [ + _w("A", x0=0, top=100), + _w("B", x0=20, top=101), + _w("C", x0=40, top=102), + ] + rows = cluster_rows(words, y_tolerance=3.0) + assert len(rows) == 1 + assert [w.text for w in rows[0]] == ["A", "B", "C"] + + def test_separates_far_y(self): + words = [ + _w("A", x0=0, top=100), + _w("B", x0=0, top=120), + ] + rows = cluster_rows(words, y_tolerance=3.0) + assert [[w.text for w in r] for r in rows] == [["A"], ["B"]] + + def test_sorts_left_to_right_within_row(self): + words = [ + _w("C", x0=40, top=100), + _w("A", x0=0, top=100), + _w("B", x0=20, top=100), + ] + rows = cluster_rows(words) + assert [w.text for w in rows[0]] == ["A", "B", "C"] + + def test_empty(self): + assert cluster_rows([]) == [] + + +class TestAssignColumns: + def test_three_columns(self): + # boundaries at x=100, 200 → columns [0,100), [100,200), [200,∞) + row = [ + _w("Jan", x0=10, top=0, x1=40), # col 0 + _w("1", x0=45, top=0, x1=55), # col 0 + _w("Deposit", x0=110, top=0, x1=180), # col 1 + _w("250.00", x0=210, top=0, x1=260), # col 2 + ] + cells = assign_columns(row, [100, 200]) + assert cells[0] == "Jan 1" + assert cells[1] == "Deposit" + assert cells[2] == "250.00" + + def test_no_boundaries_one_column(self): + row = [_w("A", 0, 0), _w("B", 20, 0)] + cells = assign_columns(row, []) + assert cells == ["A B"] + + +class TestPagesInRange: + def _mk(self, n): + return [Page(page_no=i + 1, width=600, height=800, text="", words=[]) for i in range(n)] + + def test_all(self): + pages = self._mk(5) + assert len(_pages_in_range(pages, "all")) == 5 + assert len(_pages_in_range(pages, "")) == 5 + + def test_explicit_list(self): + pages = self._mk(5) + got = [p.page_no for p in _pages_in_range(pages, "1,3,5")] + assert got == [1, 3, 5] + + def test_range(self): + pages = self._mk(5) + got = [p.page_no for p in _pages_in_range(pages, "2-4")] + assert got == [2, 3, 4] + + def test_open_ended(self): + pages = self._mk(5) + got = [p.page_no for p in _pages_in_range(pages, "3-")] + assert got == [3, 4, 5] + + +class TestWithinTableWindow: + def test_header_skipped_end_excluded(self): + rows = [ + [_w("STATEMENT", 0, 0)], + [_w("Date", 0, 20), _w("Description", 50, 20), _w("Amount", 200, 20)], + [_w("01/15", 0, 40), _w("Coffee", 50, 40), _w("4.50", 200, 40)], + [_w("01/16", 0, 60), _w("Refund", 50, 60), _w("12.00", 200, 60)], + [_w("Closing", 0, 80), _w("balance", 50, 80)], + [_w("Page", 0, 100), _w("1", 50, 100)], + ] + out = _within_table_window(rows, "Date Description Amount", ["Closing balance"]) + # Should keep just the two transaction rows. + assert len(out) == 2 + assert out[0][0].text == "01/15" + assert out[1][0].text == "01/16" + + def test_no_header_returns_empty_when_required(self): + rows = [[_w("foo", 0, 0)]] + assert _within_table_window(rows, "Date Description Amount", []) == [] + + def test_blank_header_passes_through(self): + rows = [[_w("x", 0, 0)], [_w("y", 0, 20)]] + assert _within_table_window(rows, "", []) == rows + + +class TestApplyTemplate: + """End-to-end on synthetic ``Page`` objects.""" + + def _statement_page(self) -> Page: + # Mock layout: 3 columns at x=0/100/200, header at y=20, data at 40+. + words = [ + _w("STATEMENT", 0, 0), + # Header + _w("Date", 5, 20), _w("Description", 105, 20), _w("Amount", 205, 20), + # Row 1 + _w("01/15/2026", 5, 40), _w("Coffee", 105, 40), + _w("Shop", 140, 40), _w("(4.50)", 205, 40), + # Row 2 + _w("01/16/2026", 5, 60), _w("Refund", 105, 60), _w("$12.00", 205, 60), + # Continuation row (no date) — should merge into row 2 + _w("from", 105, 80), _w("vendor", 140, 80), + # End marker + _w("Closing", 5, 100), _w("balance", 105, 100), _w("$1,000.00", 205, 100), + ] + return Page(page_no=1, width=300, height=120, text="", words=words) + + def _template(self) -> dict: + return { + "pages": {"range": "all"}, + "table": { + "header_text": "Date Description Amount", + "end_markers": ["Closing balance"], + "column_boundaries": [100, 200], + "y_tolerance": 3.0, + "skip_rows_matching": [], + }, + "columns": [ + {"source": 0, "target": "date"}, + {"source": 1, "target": "description"}, + {"source": 2, "target": "amount"}, + ], + "parse": { + "date_format": "%m/%d/%Y", + "amount_negative_in_parens": True, + "merge_multiline_description": True, + }, + } + + def test_basic_extraction(self): + df = apply_template([self._statement_page()], self._template()) + assert isinstance(df, pd.DataFrame) + assert len(df) == 2 + assert list(df["date"]) == ["2026-01-15", "2026-01-16"] + # Parens-negative + assert df.iloc[0]["amount"] == -4.50 + # Plain positive with currency strip + assert df.iloc[1]["amount"] == 12.00 + # Multi-line description merged + assert "from vendor" in df.iloc[1]["description"] + + def test_debit_credit_split_columns(self): + # Layout: date | description | debit | credit columns + page = Page( + page_no=1, width=400, height=80, text="", + words=[ + _w("Date", 5, 0), _w("Desc", 105, 0), + _w("Debit", 205, 0), _w("Credit", 305, 0), + _w("01/15/2026", 5, 20), _w("Coffee", 105, 20), _w("4.50", 205, 20), + _w("01/16/2026", 5, 40), _w("Refund", 105, 40), + _w("", 205, 40), # no debit + _w("12.00", 305, 40), + ], + ) + tpl = { + "table": { + "header_text": "Date Desc Debit Credit", + "column_boundaries": [100, 200, 300], + }, + "columns": [ + {"source": 0, "target": "date"}, + {"source": 1, "target": "description"}, + {"source": 2, "target": "amount_debit"}, + {"source": 3, "target": "amount_credit"}, + ], + "parse": {"date_format": "%m/%d/%Y"}, + } + df = apply_template([page], tpl) + assert list(df["amount"]) == [-4.50, 12.00] + assert list(df["type"]) == ["debit", "credit"] + + def test_skip_rows_matching(self): + page = self._statement_page() + tpl = self._template() + tpl["table"]["skip_rows_matching"] = ["Refund"] + df = apply_template([page], tpl) + # Refund row is dropped — only one transaction left + assert len(df) == 1 + assert df.iloc[0]["amount"] == -4.50 + + def test_empty_pages_returns_empty_df(self): + df = apply_template([], self._template()) + assert df.empty