342 lines
9.6 KiB
Python
Executable File
342 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Excel Toolkit - Auto Script Generator
|
||
自扩展脚本引擎:分析自然语言需求,自动生成并执行 Python 脚本
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import hashlib
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
# 配置路径
|
||
SCRIPT_DIR = Path(__file__).parent
|
||
SKILL_DIR = SCRIPT_DIR.parent
|
||
TEMPLATES_DIR = SKILL_DIR / "script_templates"
|
||
TEMP_SCRIPTS_DIR = SKILL_DIR / "temp_scripts"
|
||
|
||
# 需求到模板的映射
|
||
DEMAND_MAPPING = {
|
||
"货币": "currency_convert.py",
|
||
"汇率": "currency_convert.py",
|
||
"转换": "currency_convert.py",
|
||
"currency": "currency_convert.py",
|
||
"convert": "currency_convert.py",
|
||
|
||
"透视": "pivot_summary.py",
|
||
"汇总": "pivot_summary.py",
|
||
"聚合": "pivot_summary.py",
|
||
"pivot": "pivot_summary.py",
|
||
"summary": "pivot_summary.py",
|
||
|
||
"清洗": "data_clean.py",
|
||
"去空": "data_clean.py",
|
||
"格式化": "data_clean.py",
|
||
"clean": "data_clean.py",
|
||
|
||
"计算": "column_calc.py",
|
||
"加减乘除": "column_calc.py",
|
||
"公式": "column_calc.py",
|
||
"calc": "column_calc.py",
|
||
"calculate": "column_calc.py",
|
||
|
||
"合并列": "merge_columns.py",
|
||
"拆分": "merge_columns.py",
|
||
"split": "merge_columns.py",
|
||
"join": "merge_columns.py",
|
||
}
|
||
|
||
|
||
def hash_demand(demand: str) -> str:
|
||
"""生成需求的哈希值,用于脚本复用"""
|
||
return hashlib.md5(demand.encode()).hexdigest()[:16]
|
||
|
||
|
||
def find_template(demand: str) -> str:
|
||
"""根据需求查找对应的模板"""
|
||
demand_lower = demand.lower()
|
||
for keyword, template_file in DEMAND_MAPPING.items():
|
||
if keyword.lower() in demand_lower:
|
||
return template_file
|
||
return None
|
||
|
||
|
||
def check_temp_script(demand_hash: str) -> Path:
|
||
"""检查是否有可复用的临时脚本"""
|
||
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
|
||
if temp_script.exists():
|
||
return temp_script
|
||
return None
|
||
|
||
|
||
def generate_script_from_template(template_file: str, params: dict) -> str:
|
||
"""从模板生成脚本"""
|
||
template_path = TEMPLATES_DIR / template_file
|
||
if not template_path.exists():
|
||
raise FileNotFoundError(f"模板不存在: {template_file}")
|
||
|
||
with open(template_path, 'r', encoding='utf-8') as f:
|
||
template_content = f.read()
|
||
|
||
# 简单的参数替换(使用 Python 的 format 方法)
|
||
try:
|
||
script_content = template_content.format(**params)
|
||
except KeyError as e:
|
||
raise ValueError(f"缺少必要参数: {e}")
|
||
|
||
return script_content
|
||
|
||
|
||
def generate_script_auto(demand: str, params: dict) -> str:
|
||
"""自动生成脚本(当没有匹配的模板时)"""
|
||
# 基于需求生成简单的通用脚本模板
|
||
script = f'''#!/usr/bin/env python3
|
||
"""
|
||
Auto-generated script for: {demand}
|
||
Generated at: {datetime.now().isoformat()}
|
||
"""
|
||
|
||
import pandas as pd
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
def main():
|
||
# 参数配置
|
||
file_path = "{params.get('file', 'data.xlsx')}"
|
||
output_path = "{params.get('output', 'output.xlsx')}"
|
||
|
||
# 读取文件
|
||
df = pd.read_excel(file_path)
|
||
|
||
# TODO: 实现具体逻辑
|
||
# 当前需求: {demand}
|
||
# 请根据需求补充实现代码
|
||
|
||
# 示例:打印数据信息
|
||
print(f"文件: {{file_path}}")
|
||
print(f"行数: {{len(df)}}")
|
||
print(f"列数: {{len(df.columns)}}")
|
||
print(f"列名: {{list(df.columns)}}")
|
||
|
||
# 保存结果
|
||
df.to_excel(output_path, index=False)
|
||
print(f"结果已保存到: {{output_path}}")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
'''
|
||
return script
|
||
|
||
|
||
def save_temp_script(script_content: str, demand_hash: str) -> Path:
|
||
"""保存临时脚本"""
|
||
TEMP_SCRIPTS_DIR.mkdir(exist_ok=True)
|
||
temp_script = TEMP_SCRIPTS_DIR / f"script_{demand_hash}.py"
|
||
|
||
with open(temp_script, 'w', encoding='utf-8') as f:
|
||
f.write(script_content)
|
||
|
||
# 添加执行权限
|
||
os.chmod(temp_script, 0o755)
|
||
|
||
return temp_script
|
||
|
||
|
||
def execute_script(script_path: Path, params: dict) -> dict:
|
||
"""执行脚本并返回结果"""
|
||
try:
|
||
# 准备环境变量传递参数
|
||
env = os.environ.copy()
|
||
for key, value in params.items():
|
||
env[f"AUTO_PARAM_{key.upper()}"] = str(value)
|
||
|
||
# 执行脚本
|
||
result = subprocess.run(
|
||
[sys.executable, str(script_path)],
|
||
capture_output=True,
|
||
text=True,
|
||
env=env,
|
||
timeout=300 # 5分钟超时
|
||
)
|
||
|
||
return {
|
||
"success": result.returncode == 0,
|
||
"stdout": result.stdout,
|
||
"stderr": result.stderr,
|
||
"returncode": result.returncode
|
||
}
|
||
except subprocess.TimeoutExpired:
|
||
return {
|
||
"success": False,
|
||
"stdout": "",
|
||
"stderr": "脚本执行超时(5分钟)",
|
||
"returncode": -1
|
||
}
|
||
|
||
|
||
def analyze_demand(demand: str, params: dict) -> dict:
|
||
"""分析需求并生成/执行脚本"""
|
||
result = {
|
||
"demand": demand,
|
||
"template_used": None,
|
||
"script_generated": False,
|
||
"script_reused": False,
|
||
"script_path": None,
|
||
"execution": None
|
||
}
|
||
|
||
# 生成需求哈希
|
||
demand_hash = hash_demand(demand)
|
||
|
||
# 检查是否有可复用的临时脚本
|
||
temp_script = check_temp_script(demand_hash)
|
||
if temp_script:
|
||
result["script_reused"] = True
|
||
result["script_path"] = str(temp_script)
|
||
result["script_generated"] = False
|
||
else:
|
||
# 查找模板
|
||
template_file = find_template(demand)
|
||
|
||
if template_file:
|
||
result["template_used"] = template_file
|
||
script_content = generate_script_from_template(template_file, params)
|
||
else:
|
||
# 自动生成脚本
|
||
script_content = generate_script_auto(demand, params)
|
||
|
||
# 保存临时脚本
|
||
temp_script = save_temp_script(script_content, demand_hash)
|
||
result["script_generated"] = True
|
||
result["script_path"] = str(temp_script)
|
||
|
||
# 执行脚本
|
||
result["execution"] = execute_script(temp_script, params)
|
||
|
||
return result
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Excel Toolkit - Auto Script Generator",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
使用示例:
|
||
# 简单需求
|
||
python auto_script.py "把金额列从美元转换为人民币"
|
||
|
||
# 带参数
|
||
python auto_script.py "计算总价" --file data.xlsx --output result.xlsx
|
||
|
||
# 使用模板
|
||
python auto_script.py "数据透视汇总" --file sales.xlsx --group_by "地区,产品"
|
||
|
||
# JSON 输出
|
||
python auto_script.py "清洗数据" --output json
|
||
|
||
# 仅生成不执行
|
||
python auto_script.py "计算利润" --dry-run
|
||
"""
|
||
)
|
||
|
||
parser.add_argument(
|
||
"demand",
|
||
help="自然语言需求描述"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--file", "-f",
|
||
help="输入文件路径"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--output", "-o",
|
||
help="输出文件路径"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--param",
|
||
action="append",
|
||
help="额外参数,格式: key=value",
|
||
default=[]
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="仅生成脚本不执行"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--output-format",
|
||
choices=["text", "json"],
|
||
default="text",
|
||
help="输出格式"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 收集参数
|
||
params = {
|
||
"file": args.file or "data.xlsx",
|
||
"output": args.output or "output.xlsx"
|
||
}
|
||
|
||
# 解析额外参数
|
||
for param in args.param:
|
||
if "=" in param:
|
||
key, value = param.split("=", 1)
|
||
params[key] = value
|
||
|
||
# 分析需求
|
||
result = analyze_demand(args.demand, params)
|
||
|
||
if args.dry_run:
|
||
# 仅显示生成的脚本路径
|
||
if args.output_format == "json":
|
||
print(json.dumps({
|
||
"demand": result["demand"],
|
||
"template_used": result["template_used"],
|
||
"script_path": result["script_path"],
|
||
"script_generated": result["script_generated"],
|
||
"script_reused": result["script_reused"]
|
||
}, indent=2, ensure_ascii=False))
|
||
else:
|
||
print(f"需求: {result['demand']}")
|
||
print(f"模板: {result['template_used'] or '自动生成'}")
|
||
print(f"脚本路径: {result['script_path']}")
|
||
print(f"状态: {'复用已存在' if result['script_reused'] else '新生成'}")
|
||
else:
|
||
# 显示执行结果
|
||
if args.output_format == "json":
|
||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||
else:
|
||
print(f"=== 需求分析 ===")
|
||
print(f"需求: {result['demand']}")
|
||
print(f"模板: {result['template_used'] or '自动生成'}")
|
||
print(f"脚本: {result['script_path']}")
|
||
print(f"复用: {'是' if result['script_reused'] else '否'}")
|
||
print()
|
||
|
||
print(f"=== 执行结果 ===")
|
||
exec_result = result["execution"]
|
||
if exec_result["stdout"]:
|
||
print(exec_result["stdout"])
|
||
|
||
if exec_result["stderr"]:
|
||
print(f"错误输出:\n{exec_result['stderr']}", file=sys.stderr)
|
||
|
||
if exec_result["success"]:
|
||
print(f"✓ 脚本执行成功", file=sys.stderr)
|
||
else:
|
||
print(f"✗ 脚本执行失败 (返回码: {exec_result['returncode']})", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|