#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import argparse import json import math import sys from datetime import date, datetime, time from pathlib import Path from typing import Any def format_cell_value(value: Any) -> Any: if isinstance(value, datetime): return value.isoformat(sep=" ") if isinstance(value, date): return value.isoformat() if isinstance(value, time): return value.isoformat() if isinstance(value, float) and math.isnan(value): return None return value def is_empty_value(value: Any) -> bool: if value is None: return True if isinstance(value, float) and math.isnan(value): return True if isinstance(value, str) and value.strip() == "": return True return False def trim_matrix(matrix: list[list[Any]]) -> list[list[Any]]: if not matrix: return [] last_row = -1 last_col = -1 for row_index, row in enumerate(matrix): for col_index, value in enumerate(row): if not is_empty_value(value): last_row = max(last_row, row_index) last_col = max(last_col, col_index) if last_row == -1 or last_col == -1: return [] return [ [format_cell_value(value) for value in row[: last_col + 1]] for row in matrix[: last_row + 1] ] def find_header_row(matrix: list[list[Any]]) -> int | None: for index, row in enumerate(matrix): if any(not is_empty_value(value) for value in row): return index return None def normalize_header_row(row: list[Any]) -> list[str]: headers: list[str] = [] used: dict[str, int] = {} for index, value in enumerate(row, start=1): name = "" if is_empty_value(value) else str(value).strip() if not name: name = f"列{index}" count = used.get(name, 0) used[name] = count + 1 headers.append(name if count == 0 else f"{name}_{count + 1}") return headers def infer_scalar_type(value: Any) -> str: if is_empty_value(value): return "empty" if isinstance(value, bool): return "bool" if isinstance(value, int) and not isinstance(value, bool): return "int" if isinstance(value, float): return "float" if isinstance(value, datetime): return "datetime" if isinstance(value, date): return "date" if isinstance(value, time): return "time" if isinstance(value, str): text = value.strip() lower = text.lower() if lower in {"true", "false"}: return "bool" for parser, type_name in ( (int, "int"), (float, "float"), (datetime.fromisoformat, "datetime"), (date.fromisoformat, "date"), (time.fromisoformat, "time"), ): try: parser(text) return type_name except Exception: continue return "str" return type(value).__name__ def infer_column_types(rows: list[list[Any]], headers: list[str]) -> dict[str, str]: column_types: dict[str, str] = {} for col_index, header in enumerate(headers): observed = { infer_scalar_type(row[col_index]) for row in rows if col_index < len(row) and not is_empty_value(row[col_index]) } if not observed: column_types[header] = "empty" elif len(observed) == 1: column_types[header] = observed.pop() else: column_types[header] = "mixed(" + ", ".join(sorted(observed)) + ")" return column_types def require_pandas(): try: import pandas as pd # type: ignore except ImportError as exc: raise RuntimeError("缺少依赖 pandas,请先安装:pip install pandas") from exc return pd def require_openpyxl(): try: from openpyxl import load_workbook # type: ignore except ImportError as exc: raise RuntimeError("缺少依赖 openpyxl,请先安装:pip install openpyxl") from exc return load_workbook def build_result(matrix: list[list[Any]], file_path: Path, file_type: str, sheet_names: list[str], selected_sheet: str | None) -> dict[str, Any]: pd = require_pandas() header_row_index = find_header_row(matrix) if header_row_index is None: headers: list[str] = [] data_rows: list[list[Any]] = [] else: headers = normalize_header_row(matrix[header_row_index]) data_rows = [ row + [None] * (len(headers) - len(row)) if len(row) < len(headers) else row[: len(headers)] for row in matrix[header_row_index + 1 :] ] df = pd.DataFrame(data_rows, columns=headers) if headers else pd.DataFrame() return { "file": str(file_path), "type": file_type, "sheets": sheet_names, "sheet": selected_sheet, "row_count": len(matrix), "column_count": max((len(row) for row in matrix), default=0), "header_row_index": header_row_index, "headers": headers, "column_types": infer_column_types(data_rows, headers), "dataframe": df, } def load_excel(file_path: Path, sheet_name: str | None) -> dict[str, Any]: load_workbook = require_openpyxl() workbook = load_workbook(file_path, data_only=True) sheet_names = workbook.sheetnames if sheet_name: if sheet_name not in workbook.sheetnames: raise ValueError(f"未找到工作表: {sheet_name}") worksheet = workbook[sheet_name] else: worksheet = workbook[workbook.sheetnames[0]] sheet_name = worksheet.title matrix = [ [worksheet.cell(row=row, column=col).value for col in range(1, worksheet.max_column + 1)] for row in range(1, worksheet.max_row + 1) ] for merged_range in worksheet.merged_cells.ranges: min_col, min_row, max_col, max_row = merged_range.bounds top_left_value = worksheet.cell(row=min_row, column=min_col).value for row in range(min_row - 1, max_row): for col in range(min_col - 1, max_col): matrix[row][col] = top_left_value return build_result( trim_matrix(matrix), file_path=file_path, file_type="Excel (.xlsx)", sheet_names=sheet_names, selected_sheet=sheet_name, ) def load_csv(file_path: Path) -> dict[str, Any]: pd = require_pandas() last_error: Exception | None = None for encoding in ("utf-8-sig", "utf-8", "gb18030"): try: df = pd.read_csv(file_path, header=None, dtype=object, keep_default_na=False, encoding=encoding) matrix = df.where(pd.notna(df), None).values.tolist() return build_result( trim_matrix(matrix), file_path=file_path, file_type="CSV", sheet_names=[], selected_sheet=None, ) except UnicodeDecodeError as exc: last_error = exc continue except pd.errors.EmptyDataError: return build_result([], file_path=file_path, file_type="CSV", sheet_names=[], selected_sheet=None) raise ValueError(f"无法读取 CSV 文件编码: {last_error}") def preview_dataframe(df: Any, rows: int, show_all: bool) -> Any: if show_all: return df return df.head(rows) def print_text_output(result: dict[str, Any], rows: int, show_all: bool) -> None: print(f"文件: {result['file']}") print(f"类型: {result['type']}") if result["sheets"]: print("Sheet 列表: [" + ", ".join(result["sheets"]) + "]") print(f"当前 Sheet: {result['sheet']}") print(f"总行数: {result['row_count']}, 总列数: {result['column_count']}") print("表头: [" + ", ".join(result["headers"]) + "]" if result["headers"] else "表头: []") print("数据类型: " + json.dumps(result["column_types"], ensure_ascii=False)) df = result["dataframe"] print() if df.empty: print("数据内容: 文件为空,或未检测到表头后的数据行。") return title = "全部数据" if show_all else f"前 {min(rows, len(df))} 行数据" print(f"{title}:") print(preview_dataframe(df, rows, show_all).to_string(index=False)) def build_json_output(result: dict[str, Any], rows: int, show_all: bool) -> dict[str, Any]: pd = require_pandas() df = result["dataframe"] preview_df = preview_dataframe(df, rows, show_all) return { "file": result["file"], "type": result["type"], "sheets": result["sheets"], "sheet": result["sheet"], "row_count": result["row_count"], "column_count": result["column_count"], "header_row_index": result["header_row_index"], "headers": result["headers"], "column_types": result["column_types"], "preview_row_count": len(preview_df), "data": preview_df.where(pd.notna(preview_df), None).to_dict(orient="records"), } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="读取 Excel(.xlsx) 或 CSV 文件并显示内容。") parser.add_argument("--file", required=True, help="文件路径,支持 .xlsx 或 .csv") parser.add_argument("--sheet", help="指定工作表名称,仅 Excel 可用") parser.add_argument("--rows", type=int, default=10, help="默认显示前 N 行,默认 10") parser.add_argument("--all", action="store_true", help="显示全部数据") parser.add_argument("--json", action="store_true", help="以 JSON 格式输出") return parser.parse_args() def main() -> int: args = parse_args() file_path = Path(args.file).expanduser() try: if not file_path.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") if not file_path.is_file(): raise ValueError(f"路径不是文件: {file_path}") if args.rows <= 0: raise ValueError("--rows 必须大于 0") suffix = file_path.suffix.lower() if suffix == ".xlsx": result = load_excel(file_path, args.sheet) elif suffix == ".csv": if args.sheet: raise ValueError("CSV 文件不支持 --sheet 参数") result = load_csv(file_path) else: raise ValueError(f"不支持的文件类型: {suffix},仅支持 .xlsx 和 .csv") if args.json: print(json.dumps(build_json_output(result, args.rows, args.all), ensure_ascii=False, indent=2)) else: print_text_output(result, args.rows, args.all) return 0 except KeyboardInterrupt: print("已取消。", file=sys.stderr) return 130 except Exception as exc: print(f"错误: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main())