excel-toolkit/scripts/filter_data.py

247 lines
7.7 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Filter, sort, and deduplicate Excel/CSV data."""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
import pandas as pd
class DataProcessingError(Exception):
"""Raised when input arguments or data operations are invalid."""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Filter, sort, and deduplicate Excel/CSV data."
)
parser.add_argument("--input", required=True, help="Input Excel/CSV file path.")
parser.add_argument("--output", required=True, help="Output Excel/CSV file path.")
parser.add_argument(
"--sheet",
help="Sheet name for Excel files. Defaults to the first sheet.",
)
parser.add_argument(
"--filter",
dest="filter_expr",
help='Filter expression, for example: 年龄 > 30 or 部门 == "技术部".',
)
parser.add_argument(
"--sort",
action="append",
default=[],
help="Sort column name. Can be specified multiple times.",
)
order_group = parser.add_mutually_exclusive_group()
order_group.add_argument(
"--sort-asc",
action="store_true",
default=True,
help="Sort ascending (default).",
)
order_group.add_argument(
"--sort-desc",
action="store_true",
help="Sort descending.",
)
parser.add_argument("--dedup", help="Deduplicate by the specified column name.")
parser.add_argument(
"--keep",
choices=("first", "last"),
default="first",
help="Which row to keep when deduplicating. Default: first.",
)
return parser.parse_args()
def file_kind(path: Path) -> str:
suffix = path.suffix.lower()
if suffix == ".csv":
return "csv"
if suffix in {".xlsx", ".xls", ".xlsm", ".xlsb", ".ods"}:
return "excel"
raise DataProcessingError(
f"Unsupported file type for '{path}'. Please use CSV or Excel files."
)
def get_pandas() -> Any:
try:
import pandas as pd
except ImportError as exc:
raise DataProcessingError(
"This script requires pandas. Please install it first, for example: "
"pip install pandas openpyxl"
) from exc
return pd
def load_data(path: Path, sheet_name: str | None) -> "pd.DataFrame":
if not path.exists():
raise DataProcessingError(f"Input file does not exist: {path}")
kind = file_kind(path)
pd = get_pandas()
try:
if kind == "csv":
return pd.read_csv(path)
return pd.read_excel(path, sheet_name=sheet_name if sheet_name else 0)
except ValueError as exc:
raise DataProcessingError(f"Unable to read sheet '{sheet_name}': {exc}") from exc
except ImportError as exc:
raise DataProcessingError(
"Reading Excel files requires the appropriate engine. "
"Please install openpyxl or the engine required by your file format."
) from exc
except Exception as exc: # pragma: no cover - defensive
raise DataProcessingError(f"Failed to read input file '{path}': {exc}") from exc
def save_data(dataframe: "pd.DataFrame", path: Path) -> None:
kind = file_kind(path)
path.parent.mkdir(parents=True, exist_ok=True)
try:
if kind == "csv":
dataframe.to_csv(path, index=False, encoding="utf-8-sig")
else:
dataframe.to_excel(path, index=False)
except ImportError as exc:
raise DataProcessingError(
"Writing Excel files requires the appropriate engine. "
"Please install openpyxl or the engine required by your file format."
) from exc
except Exception as exc: # pragma: no cover - defensive
raise DataProcessingError(f"Failed to write output file '{path}': {exc}") from exc
def validate_columns(dataframe: "pd.DataFrame", columns: list[str], action: str) -> None:
missing = [column for column in columns if column not in dataframe.columns]
if missing:
available = ", ".join(map(str, dataframe.columns))
missing_text = ", ".join(missing)
raise DataProcessingError(
f"Cannot {action}. Missing column(s): {missing_text}. "
f"Available columns: {available}"
)
def normalize_filter_expression(expression: str, columns: list[str]) -> str:
"""Wrap column names in backticks unless already inside quotes/backticks."""
if not expression or not columns:
return expression
ordered_columns = sorted((str(column) for column in columns), key=len, reverse=True)
parts: list[str] = []
i = 0
quote_char: str | None = None
in_backticks = False
while i < len(expression):
char = expression[i]
if quote_char:
parts.append(char)
if char == "\\" and i + 1 < len(expression):
i += 1
parts.append(expression[i])
elif char == quote_char:
quote_char = None
i += 1
continue
if in_backticks:
parts.append(char)
if char == "`":
in_backticks = False
i += 1
continue
if char in {"'", '"'}:
quote_char = char
parts.append(char)
i += 1
continue
if char == "`":
in_backticks = True
parts.append(char)
i += 1
continue
matched = None
for column in ordered_columns:
if expression.startswith(column, i):
prev_char = expression[i - 1] if i > 0 else ""
next_index = i + len(column)
next_char = expression[next_index] if next_index < len(expression) else ""
if (prev_char and (prev_char.isalnum() or prev_char == "_")) or (
next_char and (next_char.isalnum() or next_char == "_")
):
continue
matched = column
break
if matched is not None:
parts.append(f"`{matched}`")
i += len(matched)
continue
parts.append(char)
i += 1
return "".join(parts)
def apply_filter(dataframe: "pd.DataFrame", expression: str) -> "pd.DataFrame":
normalized_expression = normalize_filter_expression(expression, list(dataframe.columns))
try:
return dataframe.query(normalized_expression, engine="python")
except Exception as exc:
raise DataProcessingError(
f"Invalid filter expression '{expression}': {exc}"
) from exc
def process_data(args: argparse.Namespace) -> "pd.DataFrame":
dataframe = load_data(Path(args.input), args.sheet)
original_rows = len(dataframe)
if args.filter_expr:
dataframe = apply_filter(dataframe, args.filter_expr)
if args.sort:
validate_columns(dataframe, args.sort, "sort")
dataframe = dataframe.sort_values(by=args.sort, ascending=not args.sort_desc)
if args.dedup:
validate_columns(dataframe, [args.dedup], "deduplicate")
dataframe = dataframe.drop_duplicates(subset=[args.dedup], keep=args.keep)
final_rows = len(dataframe)
print(f"Rows before processing: {original_rows}")
print(f"Rows after processing: {final_rows}")
return dataframe
def main() -> int:
args = parse_args()
try:
result = process_data(args)
save_data(result, Path(args.output))
print(f"Output written to: {args.output}")
return 0
except DataProcessingError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
except KeyboardInterrupt:
print("Error: Operation cancelled by user.", file=sys.stderr)
return 130
if __name__ == "__main__":
sys.exit(main())