excel-toolkit/scripts/auto_script.py

342 lines
9.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Excel Toolkit - Auto Script Generator
自扩展脚本引擎分析自然语言需求自动生成并执行 Python 脚本
"""
import argparse
import json
import os
import sys
import subprocess
import hashlib
from pathlib import Path
from datetime import datetime
# 配置路径
SCRIPT_DIR = Path(__file__).parent
SKILL_DIR = SCRIPT_DIR.parent
TEMPLATES_DIR = SKILL_DIR / "script_templates"
TEMP_SCRIPTS_DIR = SKILL_DIR / "temp_scripts"
# 需求到模板的映射
DEMAND_MAPPING = {
"货币": "currency_convert.py",
"汇率": "currency_convert.py",
"转换": "currency_convert.py",
"currency": "currency_convert.py",
"convert": "currency_convert.py",
"透视": "pivot_summary.py",
"汇总": "pivot_summary.py",
"聚合": "pivot_summary.py",
"pivot": "pivot_summary.py",
"summary": "pivot_summary.py",
"清洗": "data_clean.py",
"去空": "data_clean.py",
"格式化": "data_clean.py",
"clean": "data_clean.py",
"计算": "column_calc.py",
"加减乘除": "column_calc.py",
"公式": "column_calc.py",
"calc": "column_calc.py",
"calculate": "column_calc.py",
"合并列": "merge_columns.py",
"拆分": "merge_columns.py",
"split": "merge_columns.py",
"join": "merge_columns.py",
}
def hash_demand(demand: str) -> str:
"""生成需求的哈希值,用于脚本复用"""
return hashlib.md5(demand.encode()).hexdigest()[:16]
def find_template(demand: str) -> str:
"""根据需求查找对应的模板"""
demand_lower = demand.lower()
for keyword, template_file in DEMAND_MAPPING.items():
if keyword.lower() in demand_lower:
return template_file
return None
def check_temp_script(demand_hash: str) -> Path:
"""检查是否有可复用的临时脚本"""
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
if temp_script.exists():
return temp_script
return None
def generate_script_from_template(template_file: str, params: dict) -> str:
"""从模板生成脚本"""
template_path = TEMPLATES_DIR / template_file
if not template_path.exists():
raise FileNotFoundError(f"模板不存在: {template_file}")
with open(template_path, 'r', encoding='utf-8') as f:
template_content = f.read()
# 简单的参数替换(使用 Python 的 format 方法)
try:
script_content = template_content.format(**params)
except KeyError as e:
raise ValueError(f"缺少必要参数: {e}")
return script_content
def generate_script_auto(demand: str, params: dict) -> str:
"""自动生成脚本(当没有匹配的模板时)"""
# 基于需求生成简单的通用脚本模板
script = f'''#!/usr/bin/env python3
"""
Auto-generated script for: {demand}
Generated at: {datetime.now().isoformat()}
"""
import pandas as pd
import sys
from pathlib import Path
def main():
# 参数配置
file_path = "{params.get('file', 'data.xlsx')}"
output_path = "{params.get('output', 'output.xlsx')}"
# 读取文件
df = pd.read_excel(file_path)
# TODO: 实现具体逻辑
# 当前需求: {demand}
# 请根据需求补充实现代码
# 示例:打印数据信息
print(f"文件: {{file_path}}")
print(f"行数: {{len(df)}}")
print(f"列数: {{len(df.columns)}}")
print(f"列名: {{list(df.columns)}}")
# 保存结果
df.to_excel(output_path, index=False)
print(f"结果已保存到: {{output_path}}")
if __name__ == "__main__":
main()
'''
return script
def save_temp_script(script_content: str, demand_hash: str) -> Path:
"""保存临时脚本"""
TEMP_SCRIPTS_DIR.mkdir(exist_ok=True)
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
with open(temp_script, 'w', encoding='utf-8') as f:
f.write(script_content)
# 添加执行权限
os.chmod(temp_script, 0o755)
return temp_script
def execute_script(script_path: Path, params: dict) -> dict:
"""执行脚本并返回结果"""
try:
# 准备环境变量传递参数
env = os.environ.copy()
for key, value in params.items():
env[f"AUTO_PARAM_{key.upper()}"] = str(value)
# 执行脚本
result = subprocess.run(
[sys.executable, str(script_path)],
capture_output=True,
text=True,
env=env,
timeout=300 # 5分钟超时
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"stdout": "",
"stderr": "脚本执行超时5分钟",
"returncode": -1
}
def analyze_demand(demand: str, params: dict) -> dict:
"""分析需求并生成/执行脚本"""
result = {
"demand": demand,
"template_used": None,
"script_generated": False,
"script_reused": False,
"script_path": None,
"execution": None
}
# 生成需求哈希
demand_hash = hash_demand(demand)
# 检查是否有可复用的临时脚本
temp_script = check_temp_script(demand_hash)
if temp_script:
result["script_reused"] = True
result["script_path"] = str(temp_script)
result["script_generated"] = False
else:
# 查找模板
template_file = find_template(demand)
if template_file:
result["template_used"] = template_file
script_content = generate_script_from_template(template_file, params)
else:
# 自动生成脚本
script_content = generate_script_auto(demand, params)
# 保存临时脚本
temp_script = save_temp_script(script_content, demand_hash)
result["script_generated"] = True
result["script_path"] = str(temp_script)
# 执行脚本
result["execution"] = execute_script(temp_script, params)
return result
def main():
parser = argparse.ArgumentParser(
description="Excel Toolkit - Auto Script Generator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 简单需求
python auto_script.py "把金额列从美元转换为人民币"
# 带参数
python auto_script.py "计算总价" --file data.xlsx --output result.xlsx
# 使用模板
python auto_script.py "数据透视汇总" --file sales.xlsx --group_by "地区,产品"
# JSON 输出
python auto_script.py "清洗数据" --output json
# 仅生成不执行
python auto_script.py "计算利润" --dry-run
"""
)
parser.add_argument(
"demand",
help="自然语言需求描述"
)
parser.add_argument(
"--file", "-f",
help="输入文件路径"
)
parser.add_argument(
"--output", "-o",
help="输出文件路径"
)
parser.add_argument(
"--param",
action="append",
help="额外参数,格式: key=value",
default=[]
)
parser.add_argument(
"--dry-run",
action="store_true",
help="仅生成脚本不执行"
)
parser.add_argument(
"--output-format",
choices=["text", "json"],
default="text",
help="输出格式"
)
args = parser.parse_args()
# 收集参数
params = {
"file": args.file or "data.xlsx",
"output": args.output or "output.xlsx"
}
# 解析额外参数
for param in args.param:
if "=" in param:
key, value = param.split("=", 1)
params[key] = value
# 分析需求
result = analyze_demand(args.demand, params)
if args.dry_run:
# 仅显示生成的脚本路径
if args.output_format == "json":
print(json.dumps({
"demand": result["demand"],
"template_used": result["template_used"],
"script_path": result["script_path"],
"script_generated": result["script_generated"],
"script_reused": result["script_reused"]
}, indent=2, ensure_ascii=False))
else:
print(f"需求: {result['demand']}")
print(f"模板: {result['template_used'] or '自动生成'}")
print(f"脚本路径: {result['script_path']}")
print(f"状态: {'复用已存在' if result['script_reused'] else '新生成'}")
else:
# 显示执行结果
if args.output_format == "json":
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(f"=== 需求分析 ===")
print(f"需求: {result['demand']}")
print(f"模板: {result['template_used'] or '自动生成'}")
print(f"脚本: {result['script_path']}")
print(f"复用: {'' if result['script_reused'] else ''}")
print()
print(f"=== 执行结果 ===")
exec_result = result["execution"]
if exec_result["stdout"]:
print(exec_result["stdout"])
if exec_result["stderr"]:
print(f"错误输出:\n{exec_result['stderr']}", file=sys.stderr)
if exec_result["success"]:
print(f"✓ 脚本执行成功", file=sys.stderr)
else:
print(f"✗ 脚本执行失败 (返回码: {exec_result['returncode']})", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()