excel-toolkit/scripts/batch_process.py

407 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""Batch process Excel/CSV files with replacement, filtering, sorting, and deduplication."""
from __future__ import annotations
import argparse
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
try:
import pandas as pd
except ImportError as exc: # pragma: no cover - import guard
print(
"Error: missing dependency 'pandas'. Install it with: pip install pandas openpyxl",
file=sys.stderr,
)
raise SystemExit(1) from exc
SUPPORTED_SUFFIXES = {".xlsx", ".csv"}
class BatchProcessError(Exception):
"""User-facing processing error."""
@dataclass
class FileReport:
"""Processing results for a single file."""
source: Path
output: Path
backup: Path
status: str
rows_before: int = 0
rows_after: int = 0
replacements: int = 0
message: str = ""
@dataclass
class Summary:
"""Aggregate run summary."""
total_files: int = 0
processed_files: int = 0
skipped_files: int = 0
failed_files: int = 0
total_replacements: int = 0
reports: list[FileReport] = field(default_factory=list)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Batch process Excel/CSV files with replace, filter, sort, and dedup operations."
)
parser.add_argument(
"--input-dir",
default=".",
help="Input directory. Defaults to the current directory.",
)
parser.add_argument(
"--output-dir",
help="Output directory. Defaults to '<input-dir>_processed'.",
)
parser.add_argument(
"--pattern",
default="*.xlsx",
help="Glob pattern used to match files. Defaults to '*.xlsx'.",
)
parser.add_argument(
"--replace",
action="append",
default=[],
metavar="SEARCH|REPLACE",
help="Replacement pair. Repeat the flag to provide multiple pairs.",
)
parser.add_argument(
"--filter",
dest="filter_expr",
help="Filter expression, for example: 年龄 > 30",
)
parser.add_argument(
"--sort",
dest="sort_column",
help="Column name used for sorting.",
)
parser.add_argument(
"--sort-desc",
action="store_true",
help="Sort in descending order.",
)
parser.add_argument(
"--dedup",
dest="dedup_column",
help="Column name used for deduplication.",
)
parser.add_argument(
"--sheet",
help="Sheet name for Excel files. Ignored for CSV files.",
)
parser.add_argument(
"--recursive",
action="store_true",
help="Search subdirectories recursively.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show planned actions without writing backups or output files.",
)
return parser.parse_args()
def resolve_directories(args: argparse.Namespace) -> tuple[Path, Path]:
input_dir = Path(args.input_dir).expanduser().resolve()
if not input_dir.exists():
raise BatchProcessError(f"Input directory not found: {input_dir}")
if not input_dir.is_dir():
raise BatchProcessError(f"Input path is not a directory: {input_dir}")
if args.output_dir:
output_dir = Path(args.output_dir).expanduser().resolve()
else:
output_dir = input_dir.parent / f"{input_dir.name}_processed"
return input_dir, output_dir
def parse_replace_pairs(values: list[str]) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = []
for item in values:
if "|" not in item:
raise BatchProcessError(
f"Invalid --replace value '{item}'. Expected format: SEARCH|REPLACE"
)
search, replace = item.split("|", 1)
pairs.append((search, replace))
return pairs
def find_files(input_dir: Path, pattern: str, recursive: bool) -> list[Path]:
iterator: Iterable[Path]
if recursive:
iterator = input_dir.rglob(pattern)
else:
iterator = input_dir.glob(pattern)
files = sorted(
path.resolve()
for path in iterator
if path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES
)
return files
def read_table(path: Path, sheet_name: str | None) -> pd.DataFrame:
suffix = path.suffix.lower()
try:
if suffix == ".csv":
if sheet_name:
print(
f"Warning: --sheet ignored for CSV file {path}",
file=sys.stderr,
)
return pd.read_csv(path, encoding="utf-8-sig")
if suffix == ".xlsx":
return pd.read_excel(path, sheet_name=sheet_name, engine="openpyxl")
except ValueError as exc:
raise BatchProcessError(f"Failed to read {path}: {exc}") from exc
except Exception as exc: # pragma: no cover - pandas/openpyxl errors vary
if sheet_name:
raise BatchProcessError(
f"Failed to read sheet '{sheet_name}' from {path}: {exc}"
) from exc
raise BatchProcessError(f"Failed to read {path}: {exc}") from exc
raise BatchProcessError(f"Unsupported file type: {path}")
def write_table(dataframe: pd.DataFrame, path: Path, sheet_name: str | None) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
suffix = path.suffix.lower()
try:
if suffix == ".csv":
dataframe.to_csv(path, index=False, encoding="utf-8-sig")
return
target_sheet = (sheet_name or "Processed")[:31]
with pd.ExcelWriter(path, engine="openpyxl") as writer:
dataframe.to_excel(writer, index=False, sheet_name=target_sheet)
except Exception as exc: # pragma: no cover - filesystem/openpyxl errors vary
raise BatchProcessError(f"Failed to write {path}: {exc}") from exc
def ensure_column_exists(dataframe: pd.DataFrame, column: str, operation: str) -> None:
if column not in dataframe.columns:
available = ", ".join(str(item) for item in dataframe.columns)
raise BatchProcessError(
f"Cannot {operation}: column '{column}' not found. Available columns: {available}"
)
def normalize_filter_expression(dataframe: pd.DataFrame, expression: str) -> str:
normalized = expression
for column in sorted((str(item) for item in dataframe.columns), key=len, reverse=True):
normalized = normalized.replace(column, f"`{column}`")
return normalized
def apply_replace(dataframe: pd.DataFrame, pairs: list[tuple[str, str]]) -> tuple[pd.DataFrame, int]:
if not pairs:
return dataframe, 0
updated = dataframe.copy()
replacements = 0
for search, replace in pairs:
changed_mask = updated.astype(str).eq(search)
replacements += int(changed_mask.sum().sum())
updated = updated.replace(search, replace)
return updated, replacements
def apply_filter(dataframe: pd.DataFrame, expression: str | None) -> pd.DataFrame:
if not expression:
return dataframe
normalized = normalize_filter_expression(dataframe, expression)
try:
return dataframe.query(normalized, engine="python")
except Exception as exc:
raise BatchProcessError(f"Invalid filter expression '{expression}': {exc}") from exc
def apply_sort(dataframe: pd.DataFrame, column: str | None, descending: bool) -> pd.DataFrame:
if not column:
return dataframe
ensure_column_exists(dataframe, column, "sort")
try:
return dataframe.sort_values(by=column, ascending=not descending, kind="stable")
except Exception as exc:
raise BatchProcessError(f"Failed to sort by '{column}': {exc}") from exc
def apply_dedup(dataframe: pd.DataFrame, column: str | None) -> pd.DataFrame:
if not column:
return dataframe
ensure_column_exists(dataframe, column, "deduplicate")
try:
return dataframe.drop_duplicates(subset=[column], keep="first")
except Exception as exc:
raise BatchProcessError(f"Failed to deduplicate by '{column}': {exc}") from exc
def build_output_path(source: Path, input_dir: Path, output_dir: Path) -> Path:
relative = source.relative_to(input_dir)
return output_dir / relative
def build_backup_path(source: Path) -> Path:
return source.with_name(f"{source.name}.bak")
def process_file(
source: Path,
input_dir: Path,
output_dir: Path,
replace_pairs: list[tuple[str, str]],
filter_expr: str | None,
sort_column: str | None,
sort_desc: bool,
dedup_column: str | None,
sheet_name: str | None,
dry_run: bool,
) -> FileReport:
output_path = build_output_path(source, input_dir, output_dir)
backup_path = build_backup_path(source)
dataframe = read_table(source, sheet_name)
rows_before = len(dataframe)
dataframe, replacements = apply_replace(dataframe, replace_pairs)
dataframe = apply_filter(dataframe, filter_expr)
dataframe = apply_sort(dataframe, sort_column, sort_desc)
dataframe = apply_dedup(dataframe, dedup_column)
rows_after = len(dataframe)
if dry_run:
return FileReport(
source=source,
output=output_path,
backup=backup_path,
status="dry-run",
rows_before=rows_before,
rows_after=rows_after,
replacements=replacements,
message="No files were written.",
)
backup_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, backup_path)
write_table(dataframe, output_path, sheet_name)
return FileReport(
source=source,
output=output_path,
backup=backup_path,
status="processed",
rows_before=rows_before,
rows_after=rows_after,
replacements=replacements,
message="Processed successfully.",
)
def print_progress(index: int, total: int, source: Path) -> None:
print(f"[{index}/{total}] Processing: {source}")
def print_summary(summary: Summary, dry_run: bool) -> None:
print("\nSummary Report")
print(f"Total matched files: {summary.total_files}")
print(f"Processed files: {summary.processed_files}")
print(f"Skipped files: {summary.skipped_files}")
print(f"Failed files: {summary.failed_files}")
print(f"Total replacements: {summary.total_replacements}")
if dry_run:
print("Mode: dry-run")
for report in summary.reports:
print(
f"- {report.status}: {report.source} -> {report.output} "
f"(rows: {report.rows_before} -> {report.rows_after}, replacements: {report.replacements})"
)
if report.message:
print(f" {report.message}")
def main() -> None:
args = parse_args()
try:
input_dir, output_dir = resolve_directories(args)
replace_pairs = parse_replace_pairs(args.replace)
files = find_files(input_dir, args.pattern, args.recursive)
if not files:
raise BatchProcessError(
f"No matching Excel/CSV files found in {input_dir} with pattern '{args.pattern}'."
)
summary = Summary(total_files=len(files))
for index, source in enumerate(files, start=1):
print_progress(index, len(files), source)
try:
report = process_file(
source=source,
input_dir=input_dir,
output_dir=output_dir,
replace_pairs=replace_pairs,
filter_expr=args.filter_expr,
sort_column=args.sort_column,
sort_desc=args.sort_desc,
dedup_column=args.dedup_column,
sheet_name=args.sheet,
dry_run=args.dry_run,
)
if report.status == "dry-run":
summary.skipped_files += 1
else:
summary.processed_files += 1
summary.total_replacements += report.replacements
summary.reports.append(report)
except BatchProcessError as exc:
summary.failed_files += 1
summary.reports.append(
FileReport(
source=source,
output=build_output_path(source, input_dir, output_dir),
backup=build_backup_path(source),
status="failed",
message=str(exc),
)
)
print(f"Error: {exc}", file=sys.stderr)
print_summary(summary, args.dry_run)
if summary.failed_files:
raise SystemExit(1)
except BatchProcessError as exc:
print(f"Error: {exc}", file=sys.stderr)
raise SystemExit(1) from exc
except KeyboardInterrupt:
print("Error: operation cancelled by user.", file=sys.stderr)
raise SystemExit(130)
if __name__ == "__main__":
main()