#!/usr/bin/env python3 """ Excel Toolkit - Auto Script Generator 自扩展脚本引擎:分析自然语言需求,自动生成并执行 Python 脚本 """ import argparse import json import os import sys import subprocess import hashlib from pathlib import Path from datetime import datetime # 配置路径 SCRIPT_DIR = Path(__file__).parent SKILL_DIR = SCRIPT_DIR.parent TEMPLATES_DIR = SKILL_DIR / "script_templates" TEMP_SCRIPTS_DIR = SKILL_DIR / "temp_scripts" # 需求到模板的映射 DEMAND_MAPPING = { "货币": "currency_convert.py", "汇率": "currency_convert.py", "转换": "currency_convert.py", "currency": "currency_convert.py", "convert": "currency_convert.py", "透视": "pivot_summary.py", "汇总": "pivot_summary.py", "聚合": "pivot_summary.py", "pivot": "pivot_summary.py", "summary": "pivot_summary.py", "清洗": "data_clean.py", "去空": "data_clean.py", "格式化": "data_clean.py", "clean": "data_clean.py", "计算": "column_calc.py", "加减乘除": "column_calc.py", "公式": "column_calc.py", "calc": "column_calc.py", "calculate": "column_calc.py", "合并列": "merge_columns.py", "拆分": "merge_columns.py", "split": "merge_columns.py", "join": "merge_columns.py", } def hash_demand(demand: str) -> str: """生成需求的哈希值,用于脚本复用""" return hashlib.md5(demand.encode()).hexdigest()[:16] def find_template(demand: str) -> str: """根据需求查找对应的模板""" demand_lower = demand.lower() for keyword, template_file in DEMAND_MAPPING.items(): if keyword.lower() in demand_lower: return template_file return None def check_temp_script(demand_hash: str) -> Path: """检查是否有可复用的临时脚本""" temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py" if temp_script.exists(): return temp_script return None def generate_script_from_template(template_file: str, params: dict) -> str: """从模板生成脚本""" template_path = TEMPLATES_DIR / template_file if not template_path.exists(): raise FileNotFoundError(f"模板不存在: {template_file}") with open(template_path, 'r', encoding='utf-8') as f: template_content = f.read() # 简单的参数替换(使用 Python 的 format 方法) try: script_content = template_content.format(**params) except KeyError as e: raise ValueError(f"缺少必要参数: {e}") return script_content def generate_script_auto(demand: str, params: dict) -> str: """自动生成脚本(当没有匹配的模板时)""" # 基于需求生成简单的通用脚本模板 script = f'''#!/usr/bin/env python3 """ Auto-generated script for: {demand} Generated at: {datetime.now().isoformat()} """ import pandas as pd import sys from pathlib import Path def main(): # 参数配置 file_path = "{params.get('file', 'data.xlsx')}" output_path = "{params.get('output', 'output.xlsx')}" # 读取文件 df = pd.read_excel(file_path) # TODO: 实现具体逻辑 # 当前需求: {demand} # 请根据需求补充实现代码 # 示例:打印数据信息 print(f"文件: {{file_path}}") print(f"行数: {{len(df)}}") print(f"列数: {{len(df.columns)}}") print(f"列名: {{list(df.columns)}}") # 保存结果 df.to_excel(output_path, index=False) print(f"结果已保存到: {{output_path}}") if __name__ == "__main__": main() ''' return script def save_temp_script(script_content: str, demand_hash: str) -> Path: """保存临时脚本""" TEMP_SCRIPTS_DIR.mkdir(exist_ok=True) temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py" with open(temp_script, 'w', encoding='utf-8') as f: f.write(script_content) # 添加执行权限 os.chmod(temp_script, 0o755) return temp_script def execute_script(script_path: Path, params: dict) -> dict: """执行脚本并返回结果""" try: # 准备环境变量传递参数 env = os.environ.copy() for key, value in params.items(): env[f"AUTO_PARAM_{key.upper()}"] = str(value) # 执行脚本 result = subprocess.run( [sys.executable, str(script_path)], capture_output=True, text=True, env=env, timeout=300 # 5分钟超时 ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } except subprocess.TimeoutExpired: return { "success": False, "stdout": "", "stderr": "脚本执行超时(5分钟)", "returncode": -1 } def analyze_demand(demand: str, params: dict) -> dict: """分析需求并生成/执行脚本""" result = { "demand": demand, "template_used": None, "script_generated": False, "script_reused": False, "script_path": None, "execution": None } # 生成需求哈希 demand_hash = hash_demand(demand) # 检查是否有可复用的临时脚本 temp_script = check_temp_script(demand_hash) if temp_script: result["script_reused"] = True result["script_path"] = str(temp_script) result["script_generated"] = False else: # 查找模板 template_file = find_template(demand) if template_file: result["template_used"] = template_file script_content = generate_script_from_template(template_file, params) else: # 自动生成脚本 script_content = generate_script_auto(demand, params) # 保存临时脚本 temp_script = save_temp_script(script_content, demand_hash) result["script_generated"] = True result["script_path"] = str(temp_script) # 执行脚本 result["execution"] = execute_script(temp_script, params) return result def main(): parser = argparse.ArgumentParser( description="Excel Toolkit - Auto Script Generator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 简单需求 python auto_script.py "把金额列从美元转换为人民币" # 带参数 python auto_script.py "计算总价" --file data.xlsx --output result.xlsx # 使用模板 python auto_script.py "数据透视汇总" --file sales.xlsx --group_by "地区,产品" # JSON 输出 python auto_script.py "清洗数据" --output json # 仅生成不执行 python auto_script.py "计算利润" --dry-run """ ) parser.add_argument( "demand", help="自然语言需求描述" ) parser.add_argument( "--file", "-f", help="输入文件路径" ) parser.add_argument( "--output", "-o", help="输出文件路径" ) parser.add_argument( "--param", action="append", help="额外参数,格式: key=value", default=[] ) parser.add_argument( "--dry-run", action="store_true", help="仅生成脚本不执行" ) parser.add_argument( "--output-format", choices=["text", "json"], default="text", help="输出格式" ) args = parser.parse_args() # 收集参数 params = { "file": args.file or "data.xlsx", "output": args.output or "output.xlsx" } # 解析额外参数 for param in args.param: if "=" in param: key, value = param.split("=", 1) params[key] = value # 分析需求 result = analyze_demand(args.demand, params) if args.dry_run: # 仅显示生成的脚本路径 if args.output_format == "json": print(json.dumps({ "demand": result["demand"], "template_used": result["template_used"], "script_path": result["script_path"], "script_generated": result["script_generated"], "script_reused": result["script_reused"] }, indent=2, ensure_ascii=False)) else: print(f"需求: {result['demand']}") print(f"模板: {result['template_used'] or '自动生成'}") print(f"脚本路径: {result['script_path']}") print(f"状态: {'复用已存在' if result['script_reused'] else '新生成'}") else: # 显示执行结果 if args.output_format == "json": print(json.dumps(result, indent=2, ensure_ascii=False)) else: print(f"=== 需求分析 ===") print(f"需求: {result['demand']}") print(f"模板: {result['template_used'] or '自动生成'}") print(f"脚本: {result['script_path']}") print(f"复用: {'是' if result['script_reused'] else '否'}") print() print(f"=== 执行结果 ===") exec_result = result["execution"] if exec_result["stdout"]: print(exec_result["stdout"]) if exec_result["stderr"]: print(f"错误输出:\n{exec_result['stderr']}", file=sys.stderr) if exec_result["success"]: print(f"✓ 脚本执行成功", file=sys.stderr) else: print(f"✗ 脚本执行失败 (返回码: {exec_result['returncode']})", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()