190 lines
6.2 KiB
Python
Executable File
190 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Merge multiple Excel or CSV files."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Iterable, List, Tuple
|
|
|
|
try:
|
|
import pandas as pd
|
|
except ImportError as exc: # pragma: no cover - import guard
|
|
print(
|
|
"Error: missing dependency 'pandas'. Install it with: pip install pandas openpyxl",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(1) from exc
|
|
|
|
|
|
SUPPORTED_SUFFIXES = {".xlsx", ".csv"}
|
|
|
|
|
|
class MergeExcelError(Exception):
|
|
"""User-facing merge error."""
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Merge multiple Excel/CSV files by rows, columns, or a specific sheet."
|
|
)
|
|
parser.add_argument(
|
|
"--mode",
|
|
required=True,
|
|
choices=("row", "col", "sheet"),
|
|
help="Merge mode: row, col, or sheet.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
help="Output file path. If omitted, the last positional file is treated as the output path.",
|
|
)
|
|
parser.add_argument(
|
|
"--sheet",
|
|
help="Sheet name to merge when --mode sheet is used.",
|
|
)
|
|
parser.add_argument(
|
|
"files",
|
|
nargs="+",
|
|
help="Input files followed by output file when --output is not provided.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_paths(args: argparse.Namespace) -> Tuple[List[Path], Path]:
|
|
raw_files = [Path(item) for item in args.files]
|
|
|
|
if args.output:
|
|
input_files = raw_files
|
|
output_file = Path(args.output)
|
|
else:
|
|
if len(raw_files) < 3:
|
|
raise MergeExcelError(
|
|
"At least 2 input files and 1 output path are required. "
|
|
"Use --output or provide the output as the last positional argument."
|
|
)
|
|
input_files = raw_files[:-1]
|
|
output_file = raw_files[-1]
|
|
|
|
if len(input_files) < 2:
|
|
raise MergeExcelError("At least 2 input files are required.")
|
|
|
|
for path in input_files:
|
|
validate_input_file(path)
|
|
validate_output_file(output_file)
|
|
|
|
if output_file in input_files:
|
|
raise MergeExcelError("Output path must be different from input files.")
|
|
|
|
return input_files, output_file
|
|
|
|
|
|
def validate_input_file(path: Path) -> None:
|
|
if not path.exists():
|
|
raise MergeExcelError(f"Input file not found: {path}")
|
|
if not path.is_file():
|
|
raise MergeExcelError(f"Input path is not a file: {path}")
|
|
if path.suffix.lower() not in SUPPORTED_SUFFIXES:
|
|
raise MergeExcelError(
|
|
f"Unsupported input format: {path}. Supported formats: .xlsx, .csv"
|
|
)
|
|
|
|
|
|
def validate_output_file(path: Path) -> None:
|
|
if path.suffix.lower() not in SUPPORTED_SUFFIXES:
|
|
raise MergeExcelError(
|
|
f"Unsupported output format: {path}. Supported formats: .xlsx, .csv"
|
|
)
|
|
|
|
|
|
def read_table(path: Path, sheet_name: str | None = None) -> pd.DataFrame:
|
|
suffix = path.suffix.lower()
|
|
try:
|
|
if suffix == ".csv":
|
|
if sheet_name:
|
|
raise MergeExcelError(f"CSV file does not support sheets: {path}")
|
|
return pd.read_csv(path, encoding="utf-8-sig")
|
|
if suffix == ".xlsx":
|
|
return pd.read_excel(path, sheet_name=sheet_name, engine="openpyxl")
|
|
except ValueError as exc:
|
|
raise MergeExcelError(f"Failed to read {path}: {exc}") from exc
|
|
except FileNotFoundError as exc:
|
|
raise MergeExcelError(f"Input file not found: {path}") from exc
|
|
except Exception as exc: # pragma: no cover - pandas/openpyxl errors vary
|
|
if sheet_name:
|
|
raise MergeExcelError(
|
|
f"Failed to read sheet '{sheet_name}' from {path}: {exc}"
|
|
) from exc
|
|
raise MergeExcelError(f"Failed to read {path}: {exc}") from exc
|
|
|
|
raise MergeExcelError(f"Unsupported input format: {path}")
|
|
|
|
|
|
def merge_by_rows(input_files: Iterable[Path]) -> pd.DataFrame:
|
|
frames = [read_table(path) for path in input_files]
|
|
return pd.concat(frames, axis=0, ignore_index=True, sort=False)
|
|
|
|
|
|
def merge_by_columns(input_files: Iterable[Path]) -> pd.DataFrame:
|
|
frames = [read_table(path).reset_index(drop=True) for path in input_files]
|
|
return pd.concat(frames, axis=1)
|
|
|
|
|
|
def merge_specific_sheet(input_files: Iterable[Path], sheet_name: str) -> pd.DataFrame:
|
|
frames = [read_table(path, sheet_name=sheet_name) for path in input_files]
|
|
return pd.concat(frames, axis=0, ignore_index=True, sort=False)
|
|
|
|
|
|
def write_output(dataframe: pd.DataFrame, output_file: Path, sheet_name: str | None) -> None:
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
suffix = output_file.suffix.lower()
|
|
|
|
try:
|
|
if suffix == ".csv":
|
|
dataframe.to_csv(output_file, index=False, encoding="utf-8-sig")
|
|
return
|
|
|
|
sheet = sheet_name if sheet_name else "Merged"
|
|
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
|
|
dataframe.to_excel(writer, index=False, sheet_name=sheet[:31])
|
|
except Exception as exc: # pragma: no cover - filesystem/openpyxl errors vary
|
|
raise MergeExcelError(f"Failed to write output file {output_file}: {exc}") from exc
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
|
|
try:
|
|
input_files, output_file = resolve_paths(args)
|
|
|
|
if args.mode == "sheet" and not args.sheet:
|
|
raise MergeExcelError("--sheet is required when --mode sheet is used.")
|
|
if args.mode != "sheet" and args.sheet:
|
|
raise MergeExcelError("--sheet can only be used with --mode sheet.")
|
|
|
|
if args.mode == "row":
|
|
merged = merge_by_rows(input_files)
|
|
target_sheet = None
|
|
elif args.mode == "col":
|
|
merged = merge_by_columns(input_files)
|
|
target_sheet = None
|
|
else:
|
|
merged = merge_specific_sheet(input_files, args.sheet)
|
|
target_sheet = args.sheet
|
|
|
|
write_output(merged, output_file, target_sheet)
|
|
|
|
print(
|
|
f"Successfully merged {len(input_files)} files in {args.mode} mode -> {output_file}"
|
|
)
|
|
except MergeExcelError as exc:
|
|
print(f"Error: {exc}", file=sys.stderr)
|
|
raise SystemExit(1) from exc
|
|
except KeyboardInterrupt:
|
|
print("Error: operation cancelled by user.", file=sys.stderr)
|
|
raise SystemExit(130)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|