excel-toolkit/scripts/auto_script.py

342 lines
9.6 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Excel Toolkit - Auto Script Generator
自扩展脚本引擎:分析自然语言需求,自动生成并执行 Python 脚本
"""
import argparse
import json
import os
import sys
import subprocess
import hashlib
from pathlib import Path
from datetime import datetime
# 配置路径
SCRIPT_DIR = Path(__file__).parent
SKILL_DIR = SCRIPT_DIR.parent
TEMPLATES_DIR = SKILL_DIR / "script_templates"
TEMP_SCRIPTS_DIR = SKILL_DIR / "temp_scripts"
# 需求到模板的映射
DEMAND_MAPPING = {
"货币": "currency_convert.py",
"汇率": "currency_convert.py",
"转换": "currency_convert.py",
"currency": "currency_convert.py",
"convert": "currency_convert.py",
"透视": "pivot_summary.py",
"汇总": "pivot_summary.py",
"聚合": "pivot_summary.py",
"pivot": "pivot_summary.py",
"summary": "pivot_summary.py",
"清洗": "data_clean.py",
"去空": "data_clean.py",
"格式化": "data_clean.py",
"clean": "data_clean.py",
"计算": "column_calc.py",
"加减乘除": "column_calc.py",
"公式": "column_calc.py",
"calc": "column_calc.py",
"calculate": "column_calc.py",
"合并列": "merge_columns.py",
"拆分": "merge_columns.py",
"split": "merge_columns.py",
"join": "merge_columns.py",
}
def hash_demand(demand: str) -> str:
"""生成需求的哈希值,用于脚本复用"""
return hashlib.md5(demand.encode()).hexdigest()[:16]
def find_template(demand: str) -> str:
"""根据需求查找对应的模板"""
demand_lower = demand.lower()
for keyword, template_file in DEMAND_MAPPING.items():
if keyword.lower() in demand_lower:
return template_file
return None
def check_temp_script(demand_hash: str) -> Path:
"""检查是否有可复用的临时脚本"""
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
if temp_script.exists():
return temp_script
return None
def generate_script_from_template(template_file: str, params: dict) -> str:
"""从模板生成脚本"""
template_path = TEMPLATES_DIR / template_file
if not template_path.exists():
raise FileNotFoundError(f"模板不存在: {template_file}")
with open(template_path, 'r', encoding='utf-8') as f:
template_content = f.read()
# 简单的参数替换(使用 Python 的 format 方法)
try:
script_content = template_content.format(**params)
except KeyError as e:
raise ValueError(f"缺少必要参数: {e}")
return script_content
def generate_script_auto(demand: str, params: dict) -> str:
"""自动生成脚本(当没有匹配的模板时)"""
# 基于需求生成简单的通用脚本模板
script = f'''#!/usr/bin/env python3
"""
Auto-generated script for: {demand}
Generated at: {datetime.now().isoformat()}
"""
import pandas as pd
import sys
from pathlib import Path
def main():
# 参数配置
file_path = "{params.get('file', 'data.xlsx')}"
output_path = "{params.get('output', 'output.xlsx')}"
# 读取文件
df = pd.read_excel(file_path)
# TODO: 实现具体逻辑
# 当前需求: {demand}
# 请根据需求补充实现代码
# 示例:打印数据信息
print(f"文件: {{file_path}}")
print(f"行数: {{len(df)}}")
print(f"列数: {{len(df.columns)}}")
print(f"列名: {{list(df.columns)}}")
# 保存结果
df.to_excel(output_path, index=False)
print(f"结果已保存到: {{output_path}}")
if __name__ == "__main__":
main()
'''
return script
def save_temp_script(script_content: str, demand_hash: str) -> Path:
"""保存临时脚本"""
TEMP_SCRIPTS_DIR.mkdir(exist_ok=True)
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
with open(temp_script, 'w', encoding='utf-8') as f:
f.write(script_content)
# 添加执行权限
os.chmod(temp_script, 0o755)
return temp_script
def execute_script(script_path: Path, params: dict) -> dict:
"""执行脚本并返回结果"""
try:
# 准备环境变量传递参数
env = os.environ.copy()
for key, value in params.items():
env[f"AUTO_PARAM_{key.upper()}"] = str(value)
# 执行脚本
result = subprocess.run(
[sys.executable, str(script_path)],
capture_output=True,
text=True,
env=env,
timeout=300 # 5分钟超时
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"stdout": "",
"stderr": "脚本执行超时5分钟",
"returncode": -1
}
def analyze_demand(demand: str, params: dict) -> dict:
"""分析需求并生成/执行脚本"""
result = {
"demand": demand,
"template_used": None,
"script_generated": False,
"script_reused": False,
"script_path": None,
"execution": None
}
# 生成需求哈希
demand_hash = hash_demand(demand)
# 检查是否有可复用的临时脚本
temp_script = check_temp_script(demand_hash)
if temp_script:
result["script_reused"] = True
result["script_path"] = str(temp_script)
result["script_generated"] = False
else:
# 查找模板
template_file = find_template(demand)
if template_file:
result["template_used"] = template_file
script_content = generate_script_from_template(template_file, params)
else:
# 自动生成脚本
script_content = generate_script_auto(demand, params)
# 保存临时脚本
temp_script = save_temp_script(script_content, demand_hash)
result["script_generated"] = True
result["script_path"] = str(temp_script)
# 执行脚本
result["execution"] = execute_script(temp_script, params)
return result
def main():
parser = argparse.ArgumentParser(
description="Excel Toolkit - Auto Script Generator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 简单需求
python auto_script.py "把金额列从美元转换为人民币"
# 带参数
python auto_script.py "计算总价" --file data.xlsx --output result.xlsx
# 使用模板
python auto_script.py "数据透视汇总" --file sales.xlsx --group_by "地区,产品"
# JSON 输出
python auto_script.py "清洗数据" --output json
# 仅生成不执行
python auto_script.py "计算利润" --dry-run
"""
)
parser.add_argument(
"demand",
help="自然语言需求描述"
)
parser.add_argument(
"--file", "-f",
help="输入文件路径"
)
parser.add_argument(
"--output", "-o",
help="输出文件路径"
)
parser.add_argument(
"--param",
action="append",
help="额外参数,格式: key=value",
default=[]
)
parser.add_argument(
"--dry-run",
action="store_true",
help="仅生成脚本不执行"
)
parser.add_argument(
"--output-format",
choices=["text", "json"],
default="text",
help="输出格式"
)
args = parser.parse_args()
# 收集参数
params = {
"file": args.file or "data.xlsx",
"output": args.output or "output.xlsx"
}
# 解析额外参数
for param in args.param:
if "=" in param:
key, value = param.split("=", 1)
params[key] = value
# 分析需求
result = analyze_demand(args.demand, params)
if args.dry_run:
# 仅显示生成的脚本路径
if args.output_format == "json":
print(json.dumps({
"demand": result["demand"],
"template_used": result["template_used"],
"script_path": result["script_path"],
"script_generated": result["script_generated"],
"script_reused": result["script_reused"]
}, indent=2, ensure_ascii=False))
else:
print(f"需求: {result['demand']}")
print(f"模板: {result['template_used'] or '自动生成'}")
print(f"脚本路径: {result['script_path']}")
print(f"状态: {'复用已存在' if result['script_reused'] else '新生成'}")
else:
# 显示执行结果
if args.output_format == "json":
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(f"=== 需求分析 ===")
print(f"需求: {result['demand']}")
print(f"模板: {result['template_used'] or '自动生成'}")
print(f"脚本: {result['script_path']}")
print(f"复用: {'' if result['script_reused'] else ''}")
print()
print(f"=== 执行结果 ===")
exec_result = result["execution"]
if exec_result["stdout"]:
print(exec_result["stdout"])
if exec_result["stderr"]:
print(f"错误输出:\n{exec_result['stderr']}", file=sys.stderr)
if exec_result["success"]:
print(f"✓ 脚本执行成功", file=sys.stderr)
else:
print(f"✗ 脚本执行失败 (返回码: {exec_result['returncode']})", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()