client-finder/scripts/cliet-finder.sh

461 lines
13 KiB
Bash
Raw Permalink Normal View History

2026-03-11 23:36:43 +00:00
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
SKILL_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
SKILLS_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
SHARED_AUTH_RUNTIME="$SKILLS_DIR/_shared/auth-runtime.sh"
[ ! -f "$SHARED_AUTH_RUNTIME" ] && { echo "Missing shared auth runtime: $SHARED_AUTH_RUNTIME" >&2; exit 1; }
source "$SHARED_AUTH_RUNTIME"
SKILL_ENV_FILE="${SKILL_ENV_FILE:-$SKILL_ROOT/.env.local}"
load_skill_env_file() {
if [ ! -f "$SKILL_ENV_FILE" ]; then
return
fi
set -a
# shellcheck disable=SC1090
. "$SKILL_ENV_FILE"
set +a
}
load_skill_env_file
usage() {
cat <<'EOF'
Usage:
cliet-finder.sh [--client-key=<sk_xxx.yyy>] [--auth-base=<url>] "<query>" [country] [--dry-run]
Examples:
cliet-finder.sh --client-key='sk_xxx.yyy' "office machine" "us"
cliet-finder.sh --auth-base='https://api-gw-test.yuanwei-lnc.com' --client-key='sk_xxx.yyy' "office machine" "us"
QUERY_EXPANSION_JSON='{"expandedQueries":["coffee shop us","coffee roastery us"],"primaryQuery":"coffee roastery us"}' \
cliet-finder.sh --client-key='sk_xxx.yyy' "coffee" "us"
cliet-finder.sh --client-key='sk_xxx.yyy' "office machine in US" --dry-run
EOF
}
DRY_RUN=0
CLIENT_KEY_INPUT=""
AUTH_BASE_INPUT=""
POSITIONALS=()
for arg in "$@"; do
case "$arg" in
--dry-run)
DRY_RUN=1
;;
--client-key=*)
CLIENT_KEY_INPUT="${arg#*=}"
;;
--auth-base=*)
AUTH_BASE_INPUT="${arg#*=}"
;;
-h|--help)
usage
exit 0
;;
*)
POSITIONALS+=("$arg")
;;
esac
done
QUERY="${POSITIONALS[0]:-}"
COUNTRY="${POSITIONALS[1]:-us}"
AUTH_BASE="${AUTH_BASE_INPUT:-${AUTH_BASE:-https://api-gw-test.yuanwei-lnc.com}}"
AUTH_BASE="${AUTH_BASE%/}"
CLIENT_KEY="${CLIENT_KEY_INPUT:-}"
QUERY_EXPANSION_JSON="${QUERY_EXPANSION_JSON:-}"
trim() {
local input="$1"
printf '%s' "$input" | sed -E 's/^[[:space:]]+|[[:space:]]+$//g'
}
normalize_query() {
local input
input="$(trim "$1")"
printf '%s' "$input" | sed -E 's/^[Cc][Oo][Ll][Dd]-[Oo][Uu][Tt][Rr][Ee][Aa][Cc][Hh]:[[:space:]]*//'
}
json_escape() {
local text="$1"
python3 - "$text" <<'PY'
import json
import sys
print(json.dumps(sys.argv[1]))
PY
}
json_array_value() {
local raw="$1"
local key="$2"
python3 - "$raw" "$key" <<'PY'
import json
import sys
raw = sys.argv[1]
key = sys.argv[2]
try:
data = json.loads(raw)
except Exception:
print("[]")
raise SystemExit(0)
value = data.get(key, [])
if not isinstance(value, list):
value = []
print(json.dumps(value))
PY
}
is_true() {
local value
value="$(printf '%s' "${1:-}" | tr '[:upper:]' '[:lower:]')"
case "$value" in
true|1|yes) return 0 ;;
*) return 1 ;;
esac
}
emit_result() {
local status="$1"
local error="$2"
local input_query="$3"
local expanded_json="$4"
local primary_query="$5"
local expansion_status="$6"
local expansion_source="$7"
local expansion_error="${8:-}"
local used_fallback_query="${9:-false}"
local run_id="${10:-}"
local workflow_id="${11:-}"
local workflow_status="${12:-}"
local businesses_count="${13:-0}"
local contacts_count="${14:-0}"
local unique_domains="${15:-0}"
local billing_reserve_status="${16:-}"
local billing_finalize_status="${17:-}"
python3 - \
"$status" \
"$error" \
"$input_query" \
"$expanded_json" \
"$primary_query" \
"$expansion_status" \
"$expansion_source" \
"$expansion_error" \
"$used_fallback_query" \
"$run_id" \
"$workflow_id" \
"$workflow_status" \
"$businesses_count" \
"$contacts_count" \
"$unique_domains" \
"$billing_reserve_status" \
"$billing_finalize_status" <<'PY'
import json
import sys
(
status,
error,
input_query,
expanded_raw,
primary_query,
expansion_status,
expansion_source,
expansion_error,
used_fallback_query,
run_id,
workflow_id,
workflow_status,
businesses_count,
contacts_count,
unique_domains,
billing_reserve_status,
billing_finalize_status,
) = sys.argv[1:]
try:
expanded_queries = json.loads(expanded_raw) if expanded_raw else []
except Exception:
expanded_queries = []
def as_int(raw):
try:
return int(float(raw))
except Exception:
return 0
def as_bool(raw):
if not isinstance(raw, str):
return False
return raw.strip().lower() in ("true", "1", "yes")
result = {
"status": status,
"error": None if error in ("", "null", "None") else error,
"inputQuery": input_query,
"expandedQueries": expanded_queries,
"primaryQuery": primary_query,
"expansionStatus": expansion_status,
"expansionSource": expansion_source,
"expansionError": None if expansion_error in ("", "null", "None") else expansion_error,
"usedFallbackQuery": as_bool(used_fallback_query),
"runId": run_id,
"workflowId": workflow_id,
"workflowStatus": workflow_status,
"businessesCount": as_int(businesses_count),
"contactsCount": as_int(contacts_count),
"uniqueContactDomains": as_int(unique_domains),
"billingReserveStatus": billing_reserve_status,
"billingFinalizeStatus": billing_finalize_status,
}
print(json.dumps(result, ensure_ascii=False))
PY
}
single_item_array_json() {
local input="$1"
python3 - "$input" <<'PY'
import json
import sys
item = (sys.argv[1] or "").strip()
print(json.dumps([item] if item else []))
PY
}
resolve_expansion() {
local raw_query="$1"
local country_upper="$2"
local llm_expansion="$3"
python3 - "$raw_query" "$country_upper" "$llm_expansion" <<'PY'
import json
import re
import sys
raw_query = (sys.argv[1] or "").strip()
country_upper = (sys.argv[2] or "").strip().upper() or "US"
llm_expansion = sys.argv[3] or ""
def compact(value):
if not isinstance(value, str):
return ""
return re.sub(r"\s+", " ", value).strip()
def dedupe_keep_order(items):
seen = set()
output = []
for item in items:
cleaned = compact(item)
if not cleaned:
continue
key = cleaned.lower()
if key in seen:
continue
seen.add(key)
output.append(cleaned)
return output
def fail(message):
print(json.dumps({
"ok": False,
"error": message,
"expandedQueries": [],
"primaryQuery": "",
"expansionSource": ""
}))
if not raw_query:
fail("query is empty after normalization")
raise SystemExit(0)
if llm_expansion.strip():
try:
parsed = json.loads(llm_expansion)
except Exception:
fail("QUERY_EXPANSION_JSON is not valid JSON")
raise SystemExit(0)
if isinstance(parsed, list):
expanded = dedupe_keep_order(parsed)
primary = expanded[0] if expanded else ""
elif isinstance(parsed, dict):
expanded = dedupe_keep_order(
parsed.get("expandedQueries")
or parsed.get("queries")
or []
)
primary = compact(
parsed.get("primaryQuery")
or parsed.get("primary_query")
or (expanded[0] if expanded else "")
)
else:
fail("QUERY_EXPANSION_JSON must be an array or object")
raise SystemExit(0)
if not expanded:
fail("expandedQueries is empty")
raise SystemExit(0)
if not primary:
fail("primaryQuery is empty")
raise SystemExit(0)
if primary.lower() not in {q.lower() for q in expanded}:
expanded.insert(0, primary)
print(json.dumps({
"ok": True,
"error": "",
"expandedQueries": expanded,
"primaryQuery": primary,
"expansionSource": "llm"
}))
raise SystemExit(0)
rule_candidates = []
base = compact(raw_query)
rule_candidates.extend([
f"{base} {country_upper}",
f"{base} supplier {country_upper}",
f"{base} wholesale {country_upper}",
f"{base} distributor {country_upper}",
f"{base} b2b {country_upper}",
])
lower = base.lower()
if "coffee" in lower:
rule_candidates.extend([
f"coffee shop {country_upper}",
f"coffee roastery {country_upper}",
f"specialty coffee wholesale {country_upper}",
])
if "office machine" in lower or "office equipment" in lower:
rule_candidates.extend([
f"office equipment supplier {country_upper}",
f"office machine distributor {country_upper}",
])
expanded = dedupe_keep_order(rule_candidates)
if not expanded:
fail("failed to build expanded queries")
raise SystemExit(0)
print(json.dumps({
"ok": True,
"error": "",
"expandedQueries": expanded[:8],
"primaryQuery": expanded[0],
"expansionSource": "rule",
}))
PY
}
# --- Extract workflow_id from ecom start response ---
parse_workflow_id() {
local raw="$1"
python3 - "$raw" <<'PY'
import json
import sys
raw = sys.argv[1]
try:
data = json.loads(raw)
except Exception:
print("")
raise SystemExit(0)
wf_id = data.get("workflow_id") or data.get("workflowId") or data.get("id") or ""
print(str(wf_id))
PY
}
if [ -z "$QUERY" ]; then
emit_result "failed" "missing query argument" "" "[]" "" "failed" "" "missing query argument" "false" "" "" "" "0" "0" "0" "" ""
exit 1
fi
RAW_QUERY="$(normalize_query "$QUERY")"
COUNTRY_UPPER="$(printf '%s' "$COUNTRY" | tr '[:lower:]' '[:upper:]')"
COUNTRY_LOWER="$(printf '%s' "$COUNTRY" | tr '[:upper:]' '[:lower:]')"
EXPANSION_RESPONSE="$(resolve_expansion "$RAW_QUERY" "$COUNTRY_UPPER" "$QUERY_EXPANSION_JSON")"
EXPANSION_OK="$(auth_runtime_json_get "$EXPANSION_RESPONSE" "ok")"
EXPANSION_RAW_ERROR="$(auth_runtime_json_get "$EXPANSION_RESPONSE" "error")"
EXPANDED_QUERIES_JSON="$(json_array_value "$EXPANSION_RESPONSE" "expandedQueries")"
PRIMARY_QUERY="$(auth_runtime_json_get "$EXPANSION_RESPONSE" "primaryQuery")"
EXPANSION_SOURCE="$(auth_runtime_json_get "$EXPANSION_RESPONSE" "expansionSource")"
EXPANSION_STATUS="success"
EXPANSION_ERROR=""
USED_FALLBACK_QUERY="false"
if ! is_true "$EXPANSION_OK"; then
EXPANSION_STATUS="failed"
if [ -n "$RAW_QUERY" ]; then
PRIMARY_QUERY="$RAW_QUERY"
EXPANDED_QUERIES_JSON="$(single_item_array_json "$RAW_QUERY")"
EXPANSION_SOURCE="raw_query"
EXPANSION_ERROR="query expansion failed: $EXPANSION_RAW_ERROR; fallback to raw query"
USED_FALLBACK_QUERY="true"
else
PRIMARY_QUERY=""
EXPANDED_QUERIES_JSON="[]"
EXPANSION_SOURCE=""
EXPANSION_ERROR="query expansion failed: $EXPANSION_RAW_ERROR"
USED_FALLBACK_QUERY="false"
fi
fi
if [ -z "$PRIMARY_QUERY" ]; then
if [ -z "$EXPANSION_ERROR" ]; then
EXPANSION_ERROR="query expansion failed: primary query is empty"
fi
emit_result "failed" "$EXPANSION_ERROR" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "" "" "0" "0" "0" "" ""
exit 1
fi
if [ "$DRY_RUN" -eq 1 ]; then
emit_result "success" "" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "" "dry_run" "0" "0" "0" "DRY_RUN" "DRY_RUN"
exit 0
fi
CLIENT_KEY="$(trim "$CLIENT_KEY")"
if [ -z "$CLIENT_KEY" ]; then
emit_result "failed" "missing required input: --client-key=<...>" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "" "" "0" "0" "0" "" ""
exit 1
fi
# =============================================================================
# Step 1: Exchange client key for skill runtime token
# =============================================================================
ACCESS_TOKEN="$(auth_runtime_get_access_token 0)" || {
emit_result "failed" "failed to exchange skill session token" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "" "" "0" "0" "0" "" ""
exit 1
}
# =============================================================================
# Step 2: Start workflow via ecom flow API
# =============================================================================
START_PAYLOAD="{\"query\":$(json_escape "$PRIMARY_QUERY"),\"country\":$(json_escape "$COUNTRY_LOWER")}"
START_RAW="$(auth_runtime_request_api POST "$AUTH_BASE/ecom/cold-outreach/run-flow" "$ACCESS_TOKEN" "$START_PAYLOAD" || true)"
START_RESPONSE="$(auth_runtime_extract_body "$START_RAW")"
WORKFLOW_ID="$(parse_workflow_id "$START_RESPONSE")"
if [ -z "$WORKFLOW_ID" ]; then
START_ERROR="$(auth_runtime_json_get "$START_RESPONSE" "error")"
if [ -z "$START_ERROR" ]; then
START_ERROR="$(auth_runtime_json_get "$START_RESPONSE" "message")"
fi
if [ -z "$START_ERROR" ]; then
START_ERROR="failed to start workflow (missing workflowId)"
fi
emit_result "failed" "start failed: $START_ERROR" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "" "" "0" "0" "0" "SKIPPED" "SKIPPED"
exit 1
fi
# =============================================================================
# Step 3: Return accepted immediately.
# woo-data-scrawler will push terminal webhook using client-key hook config.
# =============================================================================
emit_result "success" "" "$RAW_QUERY" "$EXPANDED_QUERIES_JSON" "$PRIMARY_QUERY" "$EXPANSION_STATUS" "$EXPANSION_SOURCE" "$EXPANSION_ERROR" "$USED_FALLBACK_QUERY" "" "$WORKFLOW_ID" "accepted" "0" "0" "0" "SKIPPED" "SKIPPED"