325 lines
11 KiB
Python
325 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import json
|
|||
|
|
import math
|
|||
|
|
import sys
|
|||
|
|
from datetime import date, datetime, time
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
|
|||
|
|
def format_cell_value(value: Any) -> Any:
|
|||
|
|
if isinstance(value, datetime):
|
|||
|
|
return value.isoformat(sep=" ")
|
|||
|
|
if isinstance(value, date):
|
|||
|
|
return value.isoformat()
|
|||
|
|
if isinstance(value, time):
|
|||
|
|
return value.isoformat()
|
|||
|
|
if isinstance(value, float) and math.isnan(value):
|
|||
|
|
return None
|
|||
|
|
return value
|
|||
|
|
|
|||
|
|
|
|||
|
|
def is_empty_value(value: Any) -> bool:
|
|||
|
|
if value is None:
|
|||
|
|
return True
|
|||
|
|
if isinstance(value, float) and math.isnan(value):
|
|||
|
|
return True
|
|||
|
|
if isinstance(value, str) and value.strip() == "":
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
def trim_matrix(matrix: list[list[Any]]) -> list[list[Any]]:
|
|||
|
|
if not matrix:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
last_row = -1
|
|||
|
|
last_col = -1
|
|||
|
|
for row_index, row in enumerate(matrix):
|
|||
|
|
for col_index, value in enumerate(row):
|
|||
|
|
if not is_empty_value(value):
|
|||
|
|
last_row = max(last_row, row_index)
|
|||
|
|
last_col = max(last_col, col_index)
|
|||
|
|
|
|||
|
|
if last_row == -1 or last_col == -1:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
return [
|
|||
|
|
[format_cell_value(value) for value in row[: last_col + 1]]
|
|||
|
|
for row in matrix[: last_row + 1]
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def find_header_row(matrix: list[list[Any]]) -> int | None:
|
|||
|
|
for index, row in enumerate(matrix):
|
|||
|
|
if any(not is_empty_value(value) for value in row):
|
|||
|
|
return index
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def normalize_header_row(row: list[Any]) -> list[str]:
|
|||
|
|
headers: list[str] = []
|
|||
|
|
used: dict[str, int] = {}
|
|||
|
|
for index, value in enumerate(row, start=1):
|
|||
|
|
name = "" if is_empty_value(value) else str(value).strip()
|
|||
|
|
if not name:
|
|||
|
|
name = f"列{index}"
|
|||
|
|
count = used.get(name, 0)
|
|||
|
|
used[name] = count + 1
|
|||
|
|
headers.append(name if count == 0 else f"{name}_{count + 1}")
|
|||
|
|
return headers
|
|||
|
|
|
|||
|
|
|
|||
|
|
def infer_scalar_type(value: Any) -> str:
|
|||
|
|
if is_empty_value(value):
|
|||
|
|
return "empty"
|
|||
|
|
if isinstance(value, bool):
|
|||
|
|
return "bool"
|
|||
|
|
if isinstance(value, int) and not isinstance(value, bool):
|
|||
|
|
return "int"
|
|||
|
|
if isinstance(value, float):
|
|||
|
|
return "float"
|
|||
|
|
if isinstance(value, datetime):
|
|||
|
|
return "datetime"
|
|||
|
|
if isinstance(value, date):
|
|||
|
|
return "date"
|
|||
|
|
if isinstance(value, time):
|
|||
|
|
return "time"
|
|||
|
|
if isinstance(value, str):
|
|||
|
|
text = value.strip()
|
|||
|
|
lower = text.lower()
|
|||
|
|
if lower in {"true", "false"}:
|
|||
|
|
return "bool"
|
|||
|
|
for parser, type_name in (
|
|||
|
|
(int, "int"),
|
|||
|
|
(float, "float"),
|
|||
|
|
(datetime.fromisoformat, "datetime"),
|
|||
|
|
(date.fromisoformat, "date"),
|
|||
|
|
(time.fromisoformat, "time"),
|
|||
|
|
):
|
|||
|
|
try:
|
|||
|
|
parser(text)
|
|||
|
|
return type_name
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
return "str"
|
|||
|
|
return type(value).__name__
|
|||
|
|
|
|||
|
|
|
|||
|
|
def infer_column_types(rows: list[list[Any]], headers: list[str]) -> dict[str, str]:
|
|||
|
|
column_types: dict[str, str] = {}
|
|||
|
|
for col_index, header in enumerate(headers):
|
|||
|
|
observed = {
|
|||
|
|
infer_scalar_type(row[col_index])
|
|||
|
|
for row in rows
|
|||
|
|
if col_index < len(row) and not is_empty_value(row[col_index])
|
|||
|
|
}
|
|||
|
|
if not observed:
|
|||
|
|
column_types[header] = "empty"
|
|||
|
|
elif len(observed) == 1:
|
|||
|
|
column_types[header] = observed.pop()
|
|||
|
|
else:
|
|||
|
|
column_types[header] = "mixed(" + ", ".join(sorted(observed)) + ")"
|
|||
|
|
return column_types
|
|||
|
|
|
|||
|
|
|
|||
|
|
def require_pandas():
|
|||
|
|
try:
|
|||
|
|
import pandas as pd # type: ignore
|
|||
|
|
except ImportError as exc:
|
|||
|
|
raise RuntimeError("缺少依赖 pandas,请先安装:pip install pandas") from exc
|
|||
|
|
return pd
|
|||
|
|
|
|||
|
|
|
|||
|
|
def require_openpyxl():
|
|||
|
|
try:
|
|||
|
|
from openpyxl import load_workbook # type: ignore
|
|||
|
|
except ImportError as exc:
|
|||
|
|
raise RuntimeError("缺少依赖 openpyxl,请先安装:pip install openpyxl") from exc
|
|||
|
|
return load_workbook
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_result(matrix: list[list[Any]], file_path: Path, file_type: str, sheet_names: list[str], selected_sheet: str | None) -> dict[str, Any]:
|
|||
|
|
pd = require_pandas()
|
|||
|
|
header_row_index = find_header_row(matrix)
|
|||
|
|
if header_row_index is None:
|
|||
|
|
headers: list[str] = []
|
|||
|
|
data_rows: list[list[Any]] = []
|
|||
|
|
else:
|
|||
|
|
headers = normalize_header_row(matrix[header_row_index])
|
|||
|
|
data_rows = [
|
|||
|
|
row + [None] * (len(headers) - len(row)) if len(row) < len(headers) else row[: len(headers)]
|
|||
|
|
for row in matrix[header_row_index + 1 :]
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
df = pd.DataFrame(data_rows, columns=headers) if headers else pd.DataFrame()
|
|||
|
|
return {
|
|||
|
|
"file": str(file_path),
|
|||
|
|
"type": file_type,
|
|||
|
|
"sheets": sheet_names,
|
|||
|
|
"sheet": selected_sheet,
|
|||
|
|
"row_count": len(matrix),
|
|||
|
|
"column_count": max((len(row) for row in matrix), default=0),
|
|||
|
|
"header_row_index": header_row_index,
|
|||
|
|
"headers": headers,
|
|||
|
|
"column_types": infer_column_types(data_rows, headers),
|
|||
|
|
"dataframe": df,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_excel(file_path: Path, sheet_name: str | None) -> dict[str, Any]:
|
|||
|
|
load_workbook = require_openpyxl()
|
|||
|
|
workbook = load_workbook(file_path, data_only=True)
|
|||
|
|
sheet_names = workbook.sheetnames
|
|||
|
|
|
|||
|
|
if sheet_name:
|
|||
|
|
if sheet_name not in workbook.sheetnames:
|
|||
|
|
raise ValueError(f"未找到工作表: {sheet_name}")
|
|||
|
|
worksheet = workbook[sheet_name]
|
|||
|
|
else:
|
|||
|
|
worksheet = workbook[workbook.sheetnames[0]]
|
|||
|
|
sheet_name = worksheet.title
|
|||
|
|
|
|||
|
|
matrix = [
|
|||
|
|
[worksheet.cell(row=row, column=col).value for col in range(1, worksheet.max_column + 1)]
|
|||
|
|
for row in range(1, worksheet.max_row + 1)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for merged_range in worksheet.merged_cells.ranges:
|
|||
|
|
min_col, min_row, max_col, max_row = merged_range.bounds
|
|||
|
|
top_left_value = worksheet.cell(row=min_row, column=min_col).value
|
|||
|
|
for row in range(min_row - 1, max_row):
|
|||
|
|
for col in range(min_col - 1, max_col):
|
|||
|
|
matrix[row][col] = top_left_value
|
|||
|
|
|
|||
|
|
return build_result(
|
|||
|
|
trim_matrix(matrix),
|
|||
|
|
file_path=file_path,
|
|||
|
|
file_type="Excel (.xlsx)",
|
|||
|
|
sheet_names=sheet_names,
|
|||
|
|
selected_sheet=sheet_name,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_csv(file_path: Path) -> dict[str, Any]:
|
|||
|
|
pd = require_pandas()
|
|||
|
|
last_error: Exception | None = None
|
|||
|
|
for encoding in ("utf-8-sig", "utf-8", "gb18030"):
|
|||
|
|
try:
|
|||
|
|
df = pd.read_csv(file_path, header=None, dtype=object, keep_default_na=False, encoding=encoding)
|
|||
|
|
matrix = df.where(pd.notna(df), None).values.tolist()
|
|||
|
|
return build_result(
|
|||
|
|
trim_matrix(matrix),
|
|||
|
|
file_path=file_path,
|
|||
|
|
file_type="CSV",
|
|||
|
|
sheet_names=[],
|
|||
|
|
selected_sheet=None,
|
|||
|
|
)
|
|||
|
|
except UnicodeDecodeError as exc:
|
|||
|
|
last_error = exc
|
|||
|
|
continue
|
|||
|
|
except pd.errors.EmptyDataError:
|
|||
|
|
return build_result([], file_path=file_path, file_type="CSV", sheet_names=[], selected_sheet=None)
|
|||
|
|
|
|||
|
|
raise ValueError(f"无法读取 CSV 文件编码: {last_error}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def preview_dataframe(df: Any, rows: int, show_all: bool) -> Any:
|
|||
|
|
if show_all:
|
|||
|
|
return df
|
|||
|
|
return df.head(rows)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def print_text_output(result: dict[str, Any], rows: int, show_all: bool) -> None:
|
|||
|
|
print(f"文件: {result['file']}")
|
|||
|
|
print(f"类型: {result['type']}")
|
|||
|
|
if result["sheets"]:
|
|||
|
|
print("Sheet 列表: [" + ", ".join(result["sheets"]) + "]")
|
|||
|
|
print(f"当前 Sheet: {result['sheet']}")
|
|||
|
|
print(f"总行数: {result['row_count']}, 总列数: {result['column_count']}")
|
|||
|
|
print("表头: [" + ", ".join(result["headers"]) + "]" if result["headers"] else "表头: []")
|
|||
|
|
print("数据类型: " + json.dumps(result["column_types"], ensure_ascii=False))
|
|||
|
|
|
|||
|
|
df = result["dataframe"]
|
|||
|
|
print()
|
|||
|
|
if df.empty:
|
|||
|
|
print("数据内容: 文件为空,或未检测到表头后的数据行。")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
title = "全部数据" if show_all else f"前 {min(rows, len(df))} 行数据"
|
|||
|
|
print(f"{title}:")
|
|||
|
|
print(preview_dataframe(df, rows, show_all).to_string(index=False))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_json_output(result: dict[str, Any], rows: int, show_all: bool) -> dict[str, Any]:
|
|||
|
|
pd = require_pandas()
|
|||
|
|
df = result["dataframe"]
|
|||
|
|
preview_df = preview_dataframe(df, rows, show_all)
|
|||
|
|
return {
|
|||
|
|
"file": result["file"],
|
|||
|
|
"type": result["type"],
|
|||
|
|
"sheets": result["sheets"],
|
|||
|
|
"sheet": result["sheet"],
|
|||
|
|
"row_count": result["row_count"],
|
|||
|
|
"column_count": result["column_count"],
|
|||
|
|
"header_row_index": result["header_row_index"],
|
|||
|
|
"headers": result["headers"],
|
|||
|
|
"column_types": result["column_types"],
|
|||
|
|
"preview_row_count": len(preview_df),
|
|||
|
|
"data": preview_df.where(pd.notna(preview_df), None).to_dict(orient="records"),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_args() -> argparse.Namespace:
|
|||
|
|
parser = argparse.ArgumentParser(description="读取 Excel(.xlsx) 或 CSV 文件并显示内容。")
|
|||
|
|
parser.add_argument("--file", required=True, help="文件路径,支持 .xlsx 或 .csv")
|
|||
|
|
parser.add_argument("--sheet", help="指定工作表名称,仅 Excel 可用")
|
|||
|
|
parser.add_argument("--rows", type=int, default=10, help="默认显示前 N 行,默认 10")
|
|||
|
|
parser.add_argument("--all", action="store_true", help="显示全部数据")
|
|||
|
|
parser.add_argument("--json", action="store_true", help="以 JSON 格式输出")
|
|||
|
|
return parser.parse_args()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> int:
|
|||
|
|
args = parse_args()
|
|||
|
|
file_path = Path(args.file).expanduser()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if not file_path.exists():
|
|||
|
|
raise FileNotFoundError(f"文件不存在: {file_path}")
|
|||
|
|
if not file_path.is_file():
|
|||
|
|
raise ValueError(f"路径不是文件: {file_path}")
|
|||
|
|
if args.rows <= 0:
|
|||
|
|
raise ValueError("--rows 必须大于 0")
|
|||
|
|
|
|||
|
|
suffix = file_path.suffix.lower()
|
|||
|
|
if suffix == ".xlsx":
|
|||
|
|
result = load_excel(file_path, args.sheet)
|
|||
|
|
elif suffix == ".csv":
|
|||
|
|
if args.sheet:
|
|||
|
|
raise ValueError("CSV 文件不支持 --sheet 参数")
|
|||
|
|
result = load_csv(file_path)
|
|||
|
|
else:
|
|||
|
|
raise ValueError(f"不支持的文件类型: {suffix},仅支持 .xlsx 和 .csv")
|
|||
|
|
|
|||
|
|
if args.json:
|
|||
|
|
print(json.dumps(build_json_output(result, args.rows, args.all), ensure_ascii=False, indent=2))
|
|||
|
|
else:
|
|||
|
|
print_text_output(result, args.rows, args.all)
|
|||
|
|
return 0
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
print("已取消。", file=sys.stderr)
|
|||
|
|
return 130
|
|||
|
|
except Exception as exc:
|
|||
|
|
print(f"错误: {exc}", file=sys.stderr)
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
sys.exit(main())
|