excel-toolkit/scripts/read_excel.py

325 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import json
import math
import sys
from datetime import date, datetime, time
from pathlib import Path
from typing import Any
def format_cell_value(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat(sep=" ")
if isinstance(value, date):
return value.isoformat()
if isinstance(value, time):
return value.isoformat()
if isinstance(value, float) and math.isnan(value):
return None
return value
def is_empty_value(value: Any) -> bool:
if value is None:
return True
if isinstance(value, float) and math.isnan(value):
return True
if isinstance(value, str) and value.strip() == "":
return True
return False
def trim_matrix(matrix: list[list[Any]]) -> list[list[Any]]:
if not matrix:
return []
last_row = -1
last_col = -1
for row_index, row in enumerate(matrix):
for col_index, value in enumerate(row):
if not is_empty_value(value):
last_row = max(last_row, row_index)
last_col = max(last_col, col_index)
if last_row == -1 or last_col == -1:
return []
return [
[format_cell_value(value) for value in row[: last_col + 1]]
for row in matrix[: last_row + 1]
]
def find_header_row(matrix: list[list[Any]]) -> int | None:
for index, row in enumerate(matrix):
if any(not is_empty_value(value) for value in row):
return index
return None
def normalize_header_row(row: list[Any]) -> list[str]:
headers: list[str] = []
used: dict[str, int] = {}
for index, value in enumerate(row, start=1):
name = "" if is_empty_value(value) else str(value).strip()
if not name:
name = f"{index}"
count = used.get(name, 0)
used[name] = count + 1
headers.append(name if count == 0 else f"{name}_{count + 1}")
return headers
def infer_scalar_type(value: Any) -> str:
if is_empty_value(value):
return "empty"
if isinstance(value, bool):
return "bool"
if isinstance(value, int) and not isinstance(value, bool):
return "int"
if isinstance(value, float):
return "float"
if isinstance(value, datetime):
return "datetime"
if isinstance(value, date):
return "date"
if isinstance(value, time):
return "time"
if isinstance(value, str):
text = value.strip()
lower = text.lower()
if lower in {"true", "false"}:
return "bool"
for parser, type_name in (
(int, "int"),
(float, "float"),
(datetime.fromisoformat, "datetime"),
(date.fromisoformat, "date"),
(time.fromisoformat, "time"),
):
try:
parser(text)
return type_name
except Exception:
continue
return "str"
return type(value).__name__
def infer_column_types(rows: list[list[Any]], headers: list[str]) -> dict[str, str]:
column_types: dict[str, str] = {}
for col_index, header in enumerate(headers):
observed = {
infer_scalar_type(row[col_index])
for row in rows
if col_index < len(row) and not is_empty_value(row[col_index])
}
if not observed:
column_types[header] = "empty"
elif len(observed) == 1:
column_types[header] = observed.pop()
else:
column_types[header] = "mixed(" + ", ".join(sorted(observed)) + ")"
return column_types
def require_pandas():
try:
import pandas as pd # type: ignore
except ImportError as exc:
raise RuntimeError("缺少依赖 pandas请先安装pip install pandas") from exc
return pd
def require_openpyxl():
try:
from openpyxl import load_workbook # type: ignore
except ImportError as exc:
raise RuntimeError("缺少依赖 openpyxl请先安装pip install openpyxl") from exc
return load_workbook
def build_result(matrix: list[list[Any]], file_path: Path, file_type: str, sheet_names: list[str], selected_sheet: str | None) -> dict[str, Any]:
pd = require_pandas()
header_row_index = find_header_row(matrix)
if header_row_index is None:
headers: list[str] = []
data_rows: list[list[Any]] = []
else:
headers = normalize_header_row(matrix[header_row_index])
data_rows = [
row + [None] * (len(headers) - len(row)) if len(row) < len(headers) else row[: len(headers)]
for row in matrix[header_row_index + 1 :]
]
df = pd.DataFrame(data_rows, columns=headers) if headers else pd.DataFrame()
return {
"file": str(file_path),
"type": file_type,
"sheets": sheet_names,
"sheet": selected_sheet,
"row_count": len(matrix),
"column_count": max((len(row) for row in matrix), default=0),
"header_row_index": header_row_index,
"headers": headers,
"column_types": infer_column_types(data_rows, headers),
"dataframe": df,
}
def load_excel(file_path: Path, sheet_name: str | None) -> dict[str, Any]:
load_workbook = require_openpyxl()
workbook = load_workbook(file_path, data_only=True)
sheet_names = workbook.sheetnames
if sheet_name:
if sheet_name not in workbook.sheetnames:
raise ValueError(f"未找到工作表: {sheet_name}")
worksheet = workbook[sheet_name]
else:
worksheet = workbook[workbook.sheetnames[0]]
sheet_name = worksheet.title
matrix = [
[worksheet.cell(row=row, column=col).value for col in range(1, worksheet.max_column + 1)]
for row in range(1, worksheet.max_row + 1)
]
for merged_range in worksheet.merged_cells.ranges:
min_col, min_row, max_col, max_row = merged_range.bounds
top_left_value = worksheet.cell(row=min_row, column=min_col).value
for row in range(min_row - 1, max_row):
for col in range(min_col - 1, max_col):
matrix[row][col] = top_left_value
return build_result(
trim_matrix(matrix),
file_path=file_path,
file_type="Excel (.xlsx)",
sheet_names=sheet_names,
selected_sheet=sheet_name,
)
def load_csv(file_path: Path) -> dict[str, Any]:
pd = require_pandas()
last_error: Exception | None = None
for encoding in ("utf-8-sig", "utf-8", "gb18030"):
try:
df = pd.read_csv(file_path, header=None, dtype=object, keep_default_na=False, encoding=encoding)
matrix = df.where(pd.notna(df), None).values.tolist()
return build_result(
trim_matrix(matrix),
file_path=file_path,
file_type="CSV",
sheet_names=[],
selected_sheet=None,
)
except UnicodeDecodeError as exc:
last_error = exc
continue
except pd.errors.EmptyDataError:
return build_result([], file_path=file_path, file_type="CSV", sheet_names=[], selected_sheet=None)
raise ValueError(f"无法读取 CSV 文件编码: {last_error}")
def preview_dataframe(df: Any, rows: int, show_all: bool) -> Any:
if show_all:
return df
return df.head(rows)
def print_text_output(result: dict[str, Any], rows: int, show_all: bool) -> None:
print(f"文件: {result['file']}")
print(f"类型: {result['type']}")
if result["sheets"]:
print("Sheet 列表: [" + ", ".join(result["sheets"]) + "]")
print(f"当前 Sheet: {result['sheet']}")
print(f"总行数: {result['row_count']}, 总列数: {result['column_count']}")
print("表头: [" + ", ".join(result["headers"]) + "]" if result["headers"] else "表头: []")
print("数据类型: " + json.dumps(result["column_types"], ensure_ascii=False))
df = result["dataframe"]
print()
if df.empty:
print("数据内容: 文件为空,或未检测到表头后的数据行。")
return
title = "全部数据" if show_all else f"{min(rows, len(df))} 行数据"
print(f"{title}:")
print(preview_dataframe(df, rows, show_all).to_string(index=False))
def build_json_output(result: dict[str, Any], rows: int, show_all: bool) -> dict[str, Any]:
pd = require_pandas()
df = result["dataframe"]
preview_df = preview_dataframe(df, rows, show_all)
return {
"file": result["file"],
"type": result["type"],
"sheets": result["sheets"],
"sheet": result["sheet"],
"row_count": result["row_count"],
"column_count": result["column_count"],
"header_row_index": result["header_row_index"],
"headers": result["headers"],
"column_types": result["column_types"],
"preview_row_count": len(preview_df),
"data": preview_df.where(pd.notna(preview_df), None).to_dict(orient="records"),
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="读取 Excel(.xlsx) 或 CSV 文件并显示内容。")
parser.add_argument("--file", required=True, help="文件路径,支持 .xlsx 或 .csv")
parser.add_argument("--sheet", help="指定工作表名称,仅 Excel 可用")
parser.add_argument("--rows", type=int, default=10, help="默认显示前 N 行,默认 10")
parser.add_argument("--all", action="store_true", help="显示全部数据")
parser.add_argument("--json", action="store_true", help="以 JSON 格式输出")
return parser.parse_args()
def main() -> int:
args = parse_args()
file_path = Path(args.file).expanduser()
try:
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
if not file_path.is_file():
raise ValueError(f"路径不是文件: {file_path}")
if args.rows <= 0:
raise ValueError("--rows 必须大于 0")
suffix = file_path.suffix.lower()
if suffix == ".xlsx":
result = load_excel(file_path, args.sheet)
elif suffix == ".csv":
if args.sheet:
raise ValueError("CSV 文件不支持 --sheet 参数")
result = load_csv(file_path)
else:
raise ValueError(f"不支持的文件类型: {suffix},仅支持 .xlsx 和 .csv")
if args.json:
print(json.dumps(build_json_output(result, args.rows, args.all), ensure_ascii=False, indent=2))
else:
print_text_output(result, args.rows, args.all)
return 0
except KeyboardInterrupt:
print("已取消。", file=sys.stderr)
return 130
except Exception as exc:
print(f"错误: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())