excel-toolkit/scripts/read_excel.py

325 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import json
import math
import sys
from datetime import date, datetime, time
from pathlib import Path
from typing import Any
def format_cell_value(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat(sep=" ")
if isinstance(value, date):
return value.isoformat()
if isinstance(value, time):
return value.isoformat()
if isinstance(value, float) and math.isnan(value):
return None
return value
def is_empty_value(value: Any) -> bool:
if value is None:
return True
if isinstance(value, float) and math.isnan(value):
return True
if isinstance(value, str) and value.strip() == "":
return True
return False
def trim_matrix(matrix: list[list[Any]]) -> list[list[Any]]:
if not matrix:
return []
last_row = -1
last_col = -1
for row_index, row in enumerate(matrix):
for col_index, value in enumerate(row):
if not is_empty_value(value):
last_row = max(last_row, row_index)
last_col = max(last_col, col_index)
if last_row == -1 or last_col == -1:
return []
return [
[format_cell_value(value) for value in row[: last_col + 1]]
for row in matrix[: last_row + 1]
]
def find_header_row(matrix: list[list[Any]]) -> int | None:
for index, row in enumerate(matrix):
if any(not is_empty_value(value) for value in row):
return index
return None
def normalize_header_row(row: list[Any]) -> list[str]:
headers: list[str] = []
used: dict[str, int] = {}
for index, value in enumerate(row, start=1):
name = "" if is_empty_value(value) else str(value).strip()
if not name:
name = f"{index}"
count = used.get(name, 0)
used[name] = count + 1
headers.append(name if count == 0 else f"{name}_{count + 1}")
return headers
def infer_scalar_type(value: Any) -> str:
if is_empty_value(value):
return "empty"
if isinstance(value, bool):
return "bool"
if isinstance(value, int) and not isinstance(value, bool):
return "int"
if isinstance(value, float):
return "float"
if isinstance(value, datetime):
return "datetime"
if isinstance(value, date):
return "date"
if isinstance(value, time):
return "time"
if isinstance(value, str):
text = value.strip()
lower = text.lower()
if lower in {"true", "false"}:
return "bool"
for parser, type_name in (
(int, "int"),
(float, "float"),
(datetime.fromisoformat, "datetime"),
(date.fromisoformat, "date"),
(time.fromisoformat, "time"),
):
try:
parser(text)
return type_name
except Exception:
continue
return "str"
return type(value).__name__
def infer_column_types(rows: list[list[Any]], headers: list[str]) -> dict[str, str]:
column_types: dict[str, str] = {}
for col_index, header in enumerate(headers):
observed = {
infer_scalar_type(row[col_index])
for row in rows
if col_index < len(row) and not is_empty_value(row[col_index])
}
if not observed:
column_types[header] = "empty"
elif len(observed) == 1:
column_types[header] = observed.pop()
else:
column_types[header] = "mixed(" + ", ".join(sorted(observed)) + ")"
return column_types
def require_pandas():
try:
import pandas as pd # type: ignore
except ImportError as exc:
raise RuntimeError("缺少依赖 pandas请先安装pip install pandas") from exc
return pd
def require_openpyxl():
try:
from openpyxl import load_workbook # type: ignore
except ImportError as exc:
raise RuntimeError("缺少依赖 openpyxl请先安装pip install openpyxl") from exc
return load_workbook
def build_result(matrix: list[list[Any]], file_path: Path, file_type: str, sheet_names: list[str], selected_sheet: str | None) -> dict[str, Any]:
pd = require_pandas()
header_row_index = find_header_row(matrix)
if header_row_index is None:
headers: list[str] = []
data_rows: list[list[Any]] = []
else:
headers = normalize_header_row(matrix[header_row_index])
data_rows = [
row + [None] * (len(headers) - len(row)) if len(row) < len(headers) else row[: len(headers)]
for row in matrix[header_row_index + 1 :]
]
df = pd.DataFrame(data_rows, columns=headers) if headers else pd.DataFrame()
return {
"file": str(file_path),
"type": file_type,
"sheets": sheet_names,
"sheet": selected_sheet,
"row_count": len(matrix),
"column_count": max((len(row) for row in matrix), default=0),
"header_row_index": header_row_index,
"headers": headers,
"column_types": infer_column_types(data_rows, headers),
"dataframe": df,
}
def load_excel(file_path: Path, sheet_name: str | None) -> dict[str, Any]:
load_workbook = require_openpyxl()
workbook = load_workbook(file_path, data_only=True)
sheet_names = workbook.sheetnames
if sheet_name:
if sheet_name not in workbook.sheetnames:
raise ValueError(f"未找到工作表: {sheet_name}")
worksheet = workbook[sheet_name]
else:
worksheet = workbook[workbook.sheetnames[0]]
sheet_name = worksheet.title
matrix = [
[worksheet.cell(row=row, column=col).value for col in range(1, worksheet.max_column + 1)]
for row in range(1, worksheet.max_row + 1)
]
for merged_range in worksheet.merged_cells.ranges:
min_col, min_row, max_col, max_row = merged_range.bounds
top_left_value = worksheet.cell(row=min_row, column=min_col).value
for row in range(min_row - 1, max_row):
for col in range(min_col - 1, max_col):
matrix[row][col] = top_left_value
return build_result(
trim_matrix(matrix),
file_path=file_path,
file_type="Excel (.xlsx)",
sheet_names=sheet_names,
selected_sheet=sheet_name,
)
def load_csv(file_path: Path) -> dict[str, Any]:
pd = require_pandas()
last_error: Exception | None = None
for encoding in ("utf-8-sig", "utf-8", "gb18030"):
try:
df = pd.read_csv(file_path, header=None, dtype=object, keep_default_na=False, encoding=encoding)
matrix = df.where(pd.notna(df), None).values.tolist()
return build_result(
trim_matrix(matrix),
file_path=file_path,
file_type="CSV",
sheet_names=[],
selected_sheet=None,
)
except UnicodeDecodeError as exc:
last_error = exc
continue
except pd.errors.EmptyDataError:
return build_result([], file_path=file_path, file_type="CSV", sheet_names=[], selected_sheet=None)
raise ValueError(f"无法读取 CSV 文件编码: {last_error}")
def preview_dataframe(df: Any, rows: int, show_all: bool) -> Any:
if show_all:
return df
return df.head(rows)
def print_text_output(result: dict[str, Any], rows: int, show_all: bool) -> None:
print(f"文件: {result['file']}")
print(f"类型: {result['type']}")
if result["sheets"]:
print("Sheet 列表: [" + ", ".join(result["sheets"]) + "]")
print(f"当前 Sheet: {result['sheet']}")
print(f"总行数: {result['row_count']}, 总列数: {result['column_count']}")
print("表头: [" + ", ".join(result["headers"]) + "]" if result["headers"] else "表头: []")
print("数据类型: " + json.dumps(result["column_types"], ensure_ascii=False))
df = result["dataframe"]
print()
if df.empty:
print("数据内容: 文件为空,或未检测到表头后的数据行。")
return
title = "全部数据" if show_all else f"{min(rows, len(df))} 行数据"
print(f"{title}:")
print(preview_dataframe(df, rows, show_all).to_string(index=False))
def build_json_output(result: dict[str, Any], rows: int, show_all: bool) -> dict[str, Any]:
pd = require_pandas()
df = result["dataframe"]
preview_df = preview_dataframe(df, rows, show_all)
return {
"file": result["file"],
"type": result["type"],
"sheets": result["sheets"],
"sheet": result["sheet"],
"row_count": result["row_count"],
"column_count": result["column_count"],
"header_row_index": result["header_row_index"],
"headers": result["headers"],
"column_types": result["column_types"],
"preview_row_count": len(preview_df),
"data": preview_df.where(pd.notna(preview_df), None).to_dict(orient="records"),
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="读取 Excel(.xlsx) 或 CSV 文件并显示内容。")
parser.add_argument("--file", required=True, help="文件路径,支持 .xlsx 或 .csv")
parser.add_argument("--sheet", help="指定工作表名称,仅 Excel 可用")
parser.add_argument("--rows", type=int, default=10, help="默认显示前 N 行,默认 10")
parser.add_argument("--all", action="store_true", help="显示全部数据")
parser.add_argument("--json", action="store_true", help="以 JSON 格式输出")
return parser.parse_args()
def main() -> int:
args = parse_args()
file_path = Path(args.file).expanduser()
try:
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
if not file_path.is_file():
raise ValueError(f"路径不是文件: {file_path}")
if args.rows <= 0:
raise ValueError("--rows 必须大于 0")
suffix = file_path.suffix.lower()
if suffix == ".xlsx":
result = load_excel(file_path, args.sheet)
elif suffix == ".csv":
if args.sheet:
raise ValueError("CSV 文件不支持 --sheet 参数")
result = load_csv(file_path)
else:
raise ValueError(f"不支持的文件类型: {suffix},仅支持 .xlsx 和 .csv")
if args.json:
print(json.dumps(build_json_output(result, args.rows, args.all), ensure_ascii=False, indent=2))
else:
print_text_output(result, args.rows, args.all)
return 0
except KeyboardInterrupt:
print("已取消。", file=sys.stderr)
return 130
except Exception as exc:
print(f"错误: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())