feat: 添加 OpenClaw Auth Runtime Python 实现

基于 ~/clawd/skills/_shared/auth-runtime 的 TypeScript 实现:
- auth_runtime.py: 核心认证运行时
  - CLIENT_KEY 认证
  - 令牌缓存(可配置 TTL)
  - 自动刷新过期令牌
  - 401/403 自动重试
- test_auth.py: 测试脚本
- 更新 .env.example 添加 CLIENT_KEY 配置
- 更新 requirements.txt 添加 requests 依赖
- AUTH_RUNTIME.md: 完整使用文档
This commit is contained in:
ivanberry 2026-03-11 20:21:05 +08:00
parent e5e2ce9671
commit 2eff4a6033
5 changed files with 708 additions and 5 deletions

View File

@ -1,8 +1,25 @@
# Gemini API Key 配置
# =============================================================================
# Excel Toolkit 环境变量配置
# =============================================================================
# -----------------------------------------------------------------------------
# Gemini API 配置(用于翻译功能)
# 获取 API Key: https://aistudio.google.com/app/apikey
# -----------------------------------------------------------------------------
GEMINI_API_KEY=your-gemini-api-key-here
# 方式 1: 使用 Gemini API Key推荐
GEMINI_API_KEY=your-api-key-here
# -----------------------------------------------------------------------------
# OpenClaw Auth Runtime 配置(用于 API 访问)
# 从 ~/clawd/skills/_shared 获取 CLIENT_KEY
# -----------------------------------------------------------------------------
# 认证基础 URL默认https://api-gw-test.yuanwei-lnc.com
# AUTH_BASE=https://api-gw-test.yuanwei-lnc.com
# 方式 2: 使用 Google API Key备选
# GOOGLE_API_KEY=your-google-api-key-here
# 客户端密钥(必需)- 从 ~/clawd/skills/_shared 获取
# CLIENT_KEY=your-client-key-here
# 缓存目录(默认:/tmp/skill-auth-cache
# AUTH_CACHE_DIR=/tmp/skill-auth-cache
# 最小令牌 TTL 秒数默认60
# AUTH_MIN_TTL_SEC=60

215
AUTH_RUNTIME.md Normal file
View File

@ -0,0 +1,215 @@
# OpenClaw Auth Runtime Python 实现
基于 `~/clawd/skills/_shared/auth-runtime` 的 TypeScript 实现,提供 Python 版本的客户端密钥认证。
## 功能特性
- ✅ 使用 CLIENT_KEY 获取访问令牌
- ✅ 支持令牌缓存(可配置 TTL
- ✅ 自动刷新过期令牌
- ✅ 401/403 自动重试
- ✅ 从 .env 文件自动加载配置
## 快速开始
### 1. 配置环境变量
`.env` 文件中设置 CLIENT_KEY
```bash
# 从 ~/clawd/skills/_shared 获取 CLIENT_KEY
CLIENT_KEY=your-client-key-here
```
### 2. 获取访问令牌
```python
from auth_runtime import create_env_config, get_access_token
# 创建配置(自动从环境变量加载)
config = create_env_config()
# 获取访问令牌(带缓存)
token = get_access_token(dry_run=False, config=config)
```
### 3. 发送 API 请求
```python
from auth_runtime import request_api_with_auto_refresh
# 发送请求(自动处理令牌刷新)
response = request_api_with_auto_refresh(
method="POST",
url="https://api-gw-test.yuanwei-lnc.com/your-api-endpoint",
dry_run=False,
config=config,
body={"key": "value"},
)
print(f"状态码:{response.status}")
print(f"响应体:{response.body}")
```
## API 参考
### 配置
#### `create_env_config() -> EnvConfig`
从环境变量创建配置:
| 环境变量 | 默认值 | 说明 |
|---------|--------|------|
| `AUTH_BASE` | `https://api-gw-test.yuanwei-lnc.com` | 认证基础 URL |
| `CLIENT_KEY` | **必需** | 客户端密钥 |
| `AUTH_CACHE_DIR` | `/tmp/skill-auth-cache` | 缓存目录 |
| `AUTH_MIN_TTL_SEC` | `60` | 最小令牌 TTL |
### 令牌管理
#### `get_access_token(dry_run, config) -> str`
获取访问令牌(带缓存):
1. 检查缓存
2. 如果缓存有效,返回缓存令牌
3. 否则请求新令牌并缓存
#### `refresh_access_token(dry_run, config) -> str`
刷新访问令牌(绕过缓存)
### API 请求
#### `request_api(method, url, auth_token, body) -> ApiResponse`
发送 HTTP 请求
#### `request_api_with_auto_refresh(method, url, dry_run, config, body) -> ApiResponse`
发送 API 请求并自动刷新令牌:
1. 使用当前令牌发送请求
2. 如果是 401/403 错误,刷新令牌并重试一次
### 便捷函数
#### `load_client_key_from_env(env_path) -> str`
从 .env 文件加载 CLIENT_KEY
优先级:
1. 环境变量 `CLIENT_KEY`
2. .env 文件中的 `CLIENT_KEY`
3. 抛出异常
## 使用示例
### 示例 1: 简单的 API 调用
```python
from auth_runtime import create_env_config, request_api_with_auto_refresh
config = create_env_config()
response = request_api_with_auto_refresh(
method="POST",
url=f"{config.auth_base}/your-endpoint",
dry_run=False,
config=config,
body={"url": "https://example.com"},
)
if response.status == 200:
print("成功:", response.body)
else:
print("失败:", response.status, response.body)
```
### 示例 2: 结合 Excel 翻译使用
```python
from auth_runtime import create_env_config, request_api_with_auto_refresh
import json
config = create_env_config()
# 调用 1688 API 获取商品详情
response = request_api_with_auto_refresh(
method="POST",
url=f"{config.auth_base}/ecom/tasks/scrape",
dry_run=False,
config=config,
body={"url": "https://detail.1688.com/offer/123.html"},
)
if response.status == 200:
data = json.loads(response.body)
print("商品价格:", data["price"])
```
## 缓存机制
令牌缓存位置:`/tmp/skill-auth-cache/token_<hash>.json`
缓存文件内容:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIs...",
"expires_at": 1234567890.123,
"expires_in": 900
}
```
缓存策略:
- 令牌在过期前 `AUTH_MIN_TTL_SEC` 秒内视为无效
- 默认最小 TTL 为 60 秒
- 缓存文件自动创建和删除
## 错误处理
### 常见错误
| 错误 | 原因 | 解决方案 |
|------|------|---------|
| `CLIENT_KEY is required` | 未设置 CLIENT_KEY | 在 .env 文件或环境变量中设置 |
| `Auth session request failed` | 认证请求失败 | 检查 CLIENT_KEY 是否有效 |
| `Missing accessToken` | 认证响应缺少令牌 | 检查认证服务是否正常 |
### 自动重试
以下情况会自动刷新令牌并重试一次:
- HTTP 401 Unauthorized
- HTTP 403 Forbidden
- 响应体包含 "expired"、"unauthorized" 等标记
## 测试
运行测试脚本:
```bash
cd /Users/xiaolongxia/Documents/ai-build-app/skills/excel-toolkit
uv run python scripts/test_auth.py
```
## 与 TypeScript 版本的差异
| 特性 | TypeScript | Python |
|------|-----------|--------|
| HTTP 客户端 | 原生 fetch | requests |
| 缓存文件 | Bun/Node fs | pathlib |
| 环境变量 | process.env | os.getenv |
| 类型系统 | TypeScript | dataclass |
功能完全对等,可以互换使用。
## 获取 CLIENT_KEY
CLIENT_KEY 存储在 `~/clawd/skills/_shared` 目录中。
查看相关文档:
- `~/clawd/skills/_shared/AUTH_RUNTIME_IMPLEMENTATION.md`
- `~/clawd/skills/_shared/auth-runtime/README.md`
## 许可证
与 OpenClaw 项目保持一致。

View File

@ -1,3 +1,4 @@
pandas>=1.5.0
openpyxl>=3.0.0
google-generativeai>=0.8.0
requests>=2.28.0

395
scripts/auth_runtime.py Normal file
View File

@ -0,0 +1,395 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Python 版本的 OpenClaw Auth Runtime
基于 ~/clawd/skills/_shared/auth-runtime TypeScript 实现
功能
- 使用 CLIENT_KEY 获取访问令牌
- 支持令牌缓存可配置 TTL
- 自动刷新过期令牌
- 401/403 自动重试
"""
import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
import requests # type: ignore
@dataclass
class EnvConfig:
"""环境配置"""
auth_base: str = "https://api-gw-test.yuanwei-lnc.com"
client_key: str = ""
auth_cache_dir: str = "/tmp/skill-auth-cache"
auth_min_ttl_sec: int = 60
@dataclass
class SessionResponse:
"""会话响应"""
access_token: str
hook_url: Optional[str] = None
hook_token: Optional[str] = None
expires_in: int = 900
@dataclass
class CachedTokenData:
"""缓存的令牌数据"""
access_token: str
expires_at: float
@dataclass
class ApiResponse:
"""HTTP 响应"""
status: int
body: str
headers: dict[str, str]
# 可重试的状态码
RETRYABLE_STATUS = {401, 403}
# 可重试的响应体标记
RETRYABLE_BODY_MARKERS = [
'session not found or expired',
'invalid or expired token',
'unauthorized',
'client key expired',
'client key revoked',
]
def create_env_config() -> EnvConfig:
"""
从环境变量创建配置
环境变量
- AUTH_BASE: 认证基础 URL默认https://api-gw-test.yuanwei-lnc.com
- CLIENT_KEY: 客户端密钥必需
- AUTH_CACHE_DIR: 缓存目录默认/tmp/skill-auth-cache
- AUTH_MIN_TTL_SEC: 最小令牌 TTL 秒数默认60
"""
auth_base = os.getenv("AUTH_BASE", "https://api-gw-test.yuanwei-lnc.com").rstrip("/")
client_key = os.getenv("CLIENT_KEY", "")
auth_cache_dir = os.getenv("AUTH_CACHE_DIR", "/tmp/skill-auth-cache")
auth_min_ttl_sec = int(os.getenv("AUTH_MIN_TTL_SEC", "60"))
return EnvConfig(
auth_base=auth_base,
client_key=client_key,
auth_cache_dir=auth_cache_dir,
auth_min_ttl_sec=auth_min_ttl_sec,
)
def get_cache_file(auth_base: str, client_key: str, cache_dir: str) -> Path:
"""
获取缓存文件路径
使用 auth_base client_key 的哈希值生成唯一的缓存文件名
"""
cache_path = Path(cache_dir)
cache_path.mkdir(parents=True, exist_ok=True)
# 生成缓存文件名的哈希
key_str = f"{auth_base}:{client_key}"
hash_value = hashlib.sha256(key_str.encode()).hexdigest()[:16]
return cache_path / f"token_{hash_value}.json"
def read_cached_token(cache_file: Path, min_ttl_sec: int) -> Optional[str]:
"""
读取缓存的令牌
如果令牌存在且剩余 TTL 大于最小 TTL则返回令牌
"""
if not cache_file.exists():
return None
try:
with open(cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
cached = CachedTokenData(
access_token=data["access_token"],
expires_at=data["expires_at"],
)
# 检查是否过期(考虑最小 TTL
if time.time() + min_ttl_sec < cached.expires_at:
return cached.access_token
else:
# 令牌已过期或即将过期,删除缓存
cache_file.unlink(missing_ok=True)
return None
except (json.JSONDecodeError, KeyError, Exception):
# 缓存损坏,删除并重新获取
cache_file.unlink(missing_ok=True)
return None
def write_cache(cache_file: Path, session: SessionResponse) -> None:
"""
写入缓存
缓存令牌和过期时间
"""
cache_file.parent.mkdir(parents=True, exist_ok=True)
data = {
"access_token": session.access_token,
"expires_at": time.time() + session.expires_in,
"expires_in": session.expires_in,
}
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def delete_cache(cache_file: Path) -> None:
"""删除缓存"""
cache_file.unlink(missing_ok=True)
def fetch_session_json(dry_run: bool, config: EnvConfig) -> SessionResponse:
"""
从认证端点获取会话 JSON
使用 CLIENT_KEY 请求访问令牌
"""
if dry_run:
return SessionResponse(
access_token="<dry-run-token>",
hook_url="<dry-run-hook-url>",
hook_token="<dry-run-hook-token>",
expires_in=900,
)
if not config.client_key:
raise ValueError("CLIENT_KEY is required")
# 请求认证端点
payload = {"clientKey": config.client_key}
url = f"{config.auth_base}/auth/skill-credit/session"
response = requests.post(url, json=payload, timeout=30)
if response.status_code < 200 or response.status_code >= 300:
raise RuntimeError(
f"Auth session request failed: HTTP {response.status_code} - {response.text}"
)
session_data = response.json()
if not session_data.get("accessToken"):
raise RuntimeError(f"Missing accessToken in session response: {response.text}")
return SessionResponse(
access_token=session_data["accessToken"],
hook_url=session_data.get("hookUrl"),
hook_token=session_data.get("hookToken"),
expires_in=session_data.get("expiresIn", 900),
)
def get_access_token(dry_run: bool, config: EnvConfig) -> str:
"""
获取访问令牌带缓存
1. 检查缓存
2. 如果缓存有效返回缓存的令牌
3. 否则请求新令牌并缓存
"""
if dry_run:
return "<dry-run-token>"
if not config.client_key:
raise ValueError("CLIENT_KEY is required")
cache_file = get_cache_file(config.auth_base, config.client_key, config.auth_cache_dir)
cached_token = read_cached_token(cache_file, config.auth_min_ttl_sec)
if cached_token:
print(f"✓ 使用缓存的访问令牌")
return cached_token
print(f"🔑 请求新的访问令牌...")
session = fetch_session_json(dry_run, config)
write_cache(cache_file, session)
print(f"✓ 令牌已缓存到:{cache_file}")
return session.access_token
def refresh_access_token(dry_run: bool, config: EnvConfig) -> str:
"""
刷新访问令牌绕过缓存
删除缓存并重新请求新令牌
"""
if dry_run:
return "<dry-run-token>"
if not config.client_key:
raise ValueError("CLIENT_KEY is required")
cache_file = get_cache_file(config.auth_base, config.client_key, config.auth_cache_dir)
delete_cache(cache_file)
print(f"🔄 刷新访问令牌...")
return get_access_token(dry_run, config)
def is_retryable_session_error(response: ApiResponse) -> bool:
"""
检查响应是否表示会话过期/无效
如果是 401/403 或响应体包含特定标记则可以重试
"""
if response.status not in RETRYABLE_STATUS:
return False
body = (response.body or "").lower()
if not body:
return True
return any(marker in body for marker in RETRYABLE_BODY_MARKERS)
def request_api(
method: str,
url: str,
auth_token: Optional[str] = None,
body: Optional[dict[str, Any]] = None,
timeout: int = 30,
) -> ApiResponse:
"""
发送 HTTP 请求
Args:
method: HTTP 方法GET, POST, PUT, DELETE
url: 请求 URL
auth_token: 访问令牌可选
body: 请求体可选自动序列化为 JSON
timeout: 超时时间
Returns:
ApiResponse: 响应对象
"""
headers = {"Content-Type": "application/json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, timeout=timeout)
elif method.upper() == "POST":
response = requests.post(
url, headers=headers, json=body, timeout=timeout
)
elif method.upper() == "PUT":
response = requests.put(
url, headers=headers, json=body, timeout=timeout
)
elif method.upper() == "DELETE":
response = requests.delete(url, headers=headers, timeout=timeout)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
return ApiResponse(
status=response.status_code,
body=response.text,
headers=dict(response.headers),
)
except requests.exceptions.RequestException as e:
return ApiResponse(
status=0,
body=str(e),
headers={},
)
def request_api_with_auto_refresh(
method: str,
url: str,
dry_run: bool,
config: EnvConfig,
body: Optional[dict[str, Any]] = None,
access_token: Optional[str] = None,
) -> ApiResponse:
"""
发送 API 请求并自动刷新令牌
1. 使用当前令牌发送请求
2. 如果是 401/403 错误刷新令牌并重试一次
"""
token = access_token or get_access_token(dry_run, config)
# 第一次请求
first = request_api(method, url, token, body)
# 检查是否需要重试
if not is_retryable_session_error(first):
return first
# 刷新令牌并重试
print(f"⚠️ 检测到会话过期,刷新令牌后重试...")
fresh_token = refresh_access_token(dry_run, config)
return request_api(method, url, fresh_token, body)
# 便捷函数:从 .env 文件加载 CLIENT_KEY
def load_client_key_from_env(env_path: Optional[Path] = None) -> str:
"""
.env 文件加载 CLIENT_KEY
优先级
1. 环境变量 CLIENT_KEY
2. .env 文件中的 CLIENT_KEY
3. 抛出异常
"""
# 先检查环境变量
client_key = os.getenv("CLIENT_KEY")
if client_key:
return client_key
# 查找 .env 文件
if env_path is None:
possible_paths = [
Path(".env"),
Path.cwd() / ".env",
Path(__file__).parent.parent / ".env",
]
for p in possible_paths:
if p.exists():
env_path = p
break
if env_path and env_path.exists():
with open(env_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
if key.strip() == "CLIENT_KEY":
return value.strip()
raise ValueError(
"CLIENT_KEY not found. Please set CLIENT_KEY environment variable "
"or add it to .env file"
)

75
scripts/test_auth.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试 OpenClaw Auth Runtime
"""
import sys
from pathlib import Path
# 添加当前目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from auth_runtime import (
create_env_config,
get_access_token,
request_api_with_auto_refresh,
load_client_key_from_env,
)
def main():
print("=" * 60)
print("OpenClaw Auth Runtime 测试")
print("=" * 60)
# 方式 1: 从 .env 加载 CLIENT_KEY
try:
client_key = load_client_key_from_env()
print(f"✓ 从 .env 文件加载 CLIENT_KEY: {client_key[:10]}...")
except ValueError as e:
print(f"{e}")
print("\n请在 .env 文件中设置 CLIENT_KEY")
print("或设置环境变量export CLIENT_KEY=your-key")
return 1
# 创建配置
config = create_env_config()
print(f"✓ 配置创建成功")
print(f" AUTH_BASE: {config.auth_base}")
print(f" AUTH_CACHE_DIR: {config.auth_cache_dir}")
# 获取访问令牌
print("\n【测试 1】获取访问令牌...")
try:
token = get_access_token(dry_run=False, config=config)
print(f"✓ 获取成功:{token[:20]}...")
except Exception as e:
print(f"❌ 失败:{e}")
return 1
# 测试 API 请求
print("\n【测试 2】测试 API 请求...")
try:
# 示例:请求认证端点
response = request_api_with_auto_refresh(
method="POST",
url=f"{config.auth_base}/auth/skill-credit/session",
dry_run=False,
config=config,
body={"clientKey": config.client_key},
)
print(f"✓ 响应状态:{response.status}")
print(f" 响应体:{response.body[:100]}...")
except Exception as e:
print(f"❌ 失败:{e}")
print("\n" + "=" * 60)
print("✅ 测试完成")
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())