feat: add documentation, Streamlit GUI, and full source tree
- Rewrite README.md with project overview, quick-start, and CLI summary - Add docs/CLI-REFERENCE.md with full flag reference and 8 recipe sections - Add docs/DEVELOPER.md with architecture, data flow, and extension guides - Rewrite src/core/__init__.py with public API exports and module docstring - Add Streamlit GUI (src/gui/) with file upload, advanced options, interactive match group review with side-by-side diff, and download buttons - Add .gitignore, requirements.txt, all source code, tests, and sample data - Add streamlit to requirements.txt Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
247
src/core/io.py
Normal file
247
src/core/io.py
Normal file
@@ -0,0 +1,247 @@
|
||||
"""File I/O: encoding/delimiter detection, CSV/Excel reading, output writing."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import io
|
||||
from pathlib import Path
|
||||
from typing import Generator, Optional
|
||||
|
||||
import pandas as pd
|
||||
from charset_normalizer import from_bytes
|
||||
from loguru import logger
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Encoding detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_encoding(path: Path, sample_bytes: int = 65_536) -> str:
|
||||
"""Detect file encoding by reading the first *sample_bytes*.
|
||||
|
||||
Returns the best-guess encoding name (e.g. ``utf-8``, ``windows-1252``).
|
||||
Falls back to ``utf-8`` when detection is inconclusive.
|
||||
"""
|
||||
raw = Path(path).read_bytes()[:sample_bytes]
|
||||
if not raw:
|
||||
return "utf-8"
|
||||
|
||||
# Check BOM first
|
||||
if raw[:3] == b"\xef\xbb\xbf":
|
||||
return "utf-8-sig"
|
||||
if raw[:2] in (b"\xff\xfe", b"\xfe\xff"):
|
||||
return "utf-16"
|
||||
|
||||
result = from_bytes(raw).best()
|
||||
if result is None:
|
||||
return "utf-8"
|
||||
enc = result.encoding.lower()
|
||||
# Normalise common aliases
|
||||
if enc in ("ascii", "us-ascii"):
|
||||
enc = "utf-8"
|
||||
return enc
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Delimiter detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_COMMON_DELIMITERS = [",", "\t", ";", "|"]
|
||||
|
||||
|
||||
def detect_delimiter(path: Path, encoding: str = "utf-8") -> str:
|
||||
"""Sniff the delimiter from the first 20 lines of a text file.
|
||||
|
||||
Falls back to comma if csv.Sniffer cannot decide.
|
||||
"""
|
||||
raw_path = Path(path)
|
||||
lines: list[str] = []
|
||||
with raw_path.open("r", encoding=encoding, errors="replace") as fh:
|
||||
for _ in range(20):
|
||||
line = fh.readline()
|
||||
if not line:
|
||||
break
|
||||
lines.append(line)
|
||||
|
||||
if not lines:
|
||||
return ","
|
||||
|
||||
sample = "".join(lines)
|
||||
try:
|
||||
dialect = csv.Sniffer().sniff(sample, delimiters="".join(_COMMON_DELIMITERS))
|
||||
return dialect.delimiter
|
||||
except csv.Error:
|
||||
return ","
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Header-row detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_header_row(path: Path, encoding: str = "utf-8", delimiter: str = ",",
|
||||
max_scan: int = 20) -> int:
|
||||
"""Return the 0-based index of the likely header row.
|
||||
|
||||
Heuristic: the first row where *every* cell looks like a column name
|
||||
(non-numeric, non-empty string). Falls back to 0.
|
||||
"""
|
||||
raw_path = Path(path)
|
||||
with raw_path.open("r", encoding=encoding, errors="replace") as fh:
|
||||
reader = csv.reader(fh, delimiter=delimiter)
|
||||
for idx, row in enumerate(reader):
|
||||
if idx >= max_scan:
|
||||
break
|
||||
if not row:
|
||||
continue
|
||||
# All cells must be non-empty, non-numeric strings
|
||||
if all(_looks_like_header(cell) for cell in row if cell.strip()):
|
||||
return idx
|
||||
return 0
|
||||
|
||||
|
||||
def _looks_like_header(value: str) -> bool:
|
||||
"""True if *value* looks like a column header, not a data value."""
|
||||
v = value.strip()
|
||||
if not v:
|
||||
return False
|
||||
# Pure numbers are not headers
|
||||
try:
|
||||
float(v.replace(",", ""))
|
||||
return False
|
||||
except ValueError:
|
||||
pass
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Excel helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def list_sheets(path: Path) -> list[str]:
|
||||
"""Return sheet names from an Excel workbook."""
|
||||
xl = pd.ExcelFile(path, engine="openpyxl")
|
||||
return xl.sheet_names
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def read_file(
|
||||
path: str | Path,
|
||||
*,
|
||||
encoding: Optional[str] = None,
|
||||
delimiter: Optional[str] = None,
|
||||
header_row: Optional[int] = None,
|
||||
sheet_name: Optional[str | int] = 0,
|
||||
chunk_size: Optional[int] = None,
|
||||
) -> pd.DataFrame | Generator[pd.DataFrame, None, None]:
|
||||
"""Read a CSV, TSV, or Excel file into a DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : file path
|
||||
encoding : override detected encoding (CSV only)
|
||||
delimiter : override detected delimiter (CSV only)
|
||||
header_row : 0-based row index for the header; auto-detected if *None*
|
||||
sheet_name : Excel sheet (name or 0-based index). Ignored for CSV.
|
||||
chunk_size : if set, return a generator of DataFrames (CSV only).
|
||||
|
||||
Returns a DataFrame (or generator when *chunk_size* is set).
|
||||
"""
|
||||
filepath = Path(path)
|
||||
if not filepath.exists():
|
||||
raise FileNotFoundError(f"File not found: {filepath}")
|
||||
|
||||
suffix = filepath.suffix.lower()
|
||||
if suffix in (".xlsx", ".xls"):
|
||||
return _read_excel(filepath, header_row=header_row, sheet_name=sheet_name)
|
||||
else:
|
||||
return _read_csv(
|
||||
filepath,
|
||||
encoding=encoding,
|
||||
delimiter=delimiter,
|
||||
header_row=header_row,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
|
||||
def _read_csv(
|
||||
path: Path,
|
||||
*,
|
||||
encoding: Optional[str] = None,
|
||||
delimiter: Optional[str] = None,
|
||||
header_row: Optional[int] = None,
|
||||
chunk_size: Optional[int] = None,
|
||||
) -> pd.DataFrame | Generator[pd.DataFrame, None, None]:
|
||||
enc = encoding or detect_encoding(path)
|
||||
delim = delimiter or detect_delimiter(path, enc)
|
||||
hdr = header_row if header_row is not None else detect_header_row(path, enc, delim)
|
||||
|
||||
logger.debug("Reading CSV {} (encoding={}, delimiter={!r}, header_row={})",
|
||||
path.name, enc, delim, hdr)
|
||||
|
||||
kwargs: dict = dict(
|
||||
filepath_or_buffer=path,
|
||||
encoding=enc,
|
||||
delimiter=delim,
|
||||
header=hdr,
|
||||
dtype=str,
|
||||
keep_default_na=False,
|
||||
on_bad_lines="warn",
|
||||
)
|
||||
|
||||
if chunk_size:
|
||||
return pd.read_csv(**kwargs, chunksize=chunk_size)
|
||||
|
||||
return pd.read_csv(**kwargs)
|
||||
|
||||
|
||||
def _read_excel(
|
||||
path: Path,
|
||||
*,
|
||||
header_row: Optional[int] = None,
|
||||
sheet_name: Optional[str | int] = 0,
|
||||
) -> pd.DataFrame:
|
||||
hdr = header_row if header_row is not None else 0
|
||||
logger.debug("Reading Excel {} (sheet={}, header_row={})", path.name, sheet_name, hdr)
|
||||
return pd.read_excel(
|
||||
path,
|
||||
sheet_name=sheet_name,
|
||||
header=hdr,
|
||||
dtype=str,
|
||||
keep_default_na=False,
|
||||
engine="openpyxl",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Writing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def write_file(
|
||||
df: pd.DataFrame,
|
||||
path: str | Path,
|
||||
*,
|
||||
file_format: Optional[str] = None,
|
||||
encoding: str = "utf-8-sig",
|
||||
) -> Path:
|
||||
"""Write a DataFrame to CSV or Excel.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
df : DataFrame to write
|
||||
path : output file path
|
||||
file_format : ``"csv"`` or ``"xlsx"``; auto-detected from *path* suffix if *None*
|
||||
encoding : output encoding (default ``utf-8-sig`` for Windows Excel compat)
|
||||
|
||||
Returns the resolved output Path.
|
||||
"""
|
||||
out = Path(path)
|
||||
fmt = file_format or out.suffix.lstrip(".").lower()
|
||||
if fmt in ("xlsx", "xls"):
|
||||
df.to_excel(out, index=False, engine="openpyxl")
|
||||
else:
|
||||
df.to_csv(out, index=False, encoding=encoding)
|
||||
logger.info("Wrote {} rows to {}", len(df), out)
|
||||
return out
|
||||
Reference in New Issue
Block a user