Files
nonye/back/ai_processor.py
2025-07-17 23:13:04 +08:00

164 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import websocket
import json
import hmac
import hashlib
import base64
import time
import ssl
from urllib.parse import urlencode
import datetime
from threading import Lock
from flask import current_app
# 全局处理状态锁
status_lock = Lock()
processing_status = {}
def process_ai_message(appid, api_key, api_secret, spark_url, domain, messages):
"""处理AI消息并返回回答优化超时处理和错误处理"""
max_retries = 3
retries = 0
last_error = None
while retries < max_retries:
try:
# 生成认证URL
url = generate_auth_url(spark_url, api_key, api_secret)
# 创建WebSocket连接设置超时为30秒
ws = websocket.create_connection(
url,
sslopt={"cert_reqs": ssl.CERT_NONE},
timeout=30
)
# 准备请求数据
request_data = {
"header": {
"app_id": appid,
"uid": f"user_{int(time.time())}_{retries}"
},
"parameter": {
"chat": {
"domain": domain,
"temperature": 0.7,
"max_tokens": 2048,
"top_k": 3
}
},
"payload": {
"message": {
"text": messages
}
}
}
# 发送请求
ws.send(json.dumps(request_data))
# 接收响应
answer = ""
start_time = time.time()
while True:
try:
response = ws.recv()
if not response:
break
data = json.loads(response)
if "payload" in data and "choices" in data["payload"]:
if "text" in data["payload"]["choices"]:
for item in data["payload"]["choices"]["text"]:
if "content" in item:
answer += item["content"]
# 检查是否是最后一条消息
if "header" in data and "status" in data["header"] and data["header"]["status"] == 2:
break
# 超时检查45秒超时
if time.time() - start_time > 45:
current_app.logger.warning("AI接口响应超时")
raise TimeoutError("AI接口响应超时")
except websocket.WebSocketTimeoutException:
current_app.logger.warning("WebSocket接收超时")
raise TimeoutError("WebSocket接收超时")
# 关闭连接
ws.close()
# 检查回答是否有效
if answer.strip():
return answer
retries += 1
last_error = "AI返回空回答"
current_app.logger.warning(f"AI返回空回答重试 {retries}/{max_retries}")
except TimeoutError as te:
retries += 1
last_error = str(te)
current_app.logger.error(f"AI接口超时 ({retries}/{max_retries}): {str(te)}")
if ws:
try:
ws.close()
except:
pass
except websocket.WebSocketException as we:
retries += 1
last_error = str(we)
current_app.logger.error(f"WebSocket错误 ({retries}/{max_retries}): {str(we)}")
except Exception as e:
retries += 1
last_error = str(e)
current_app.logger.error(f"处理错误 ({retries}/{max_retries}): {str(e)}")
if ws:
try:
ws.close()
except:
pass
return f"抱歉AI处理失败: {last_error or '未知错误'}"
def generate_auth_url(api_url, api_key, api_secret):
"""生成认证URL"""
from urllib.parse import urlparse
url = urlparse(api_url)
host = url.netloc
path = url.path
# 生成RFC1123格式的时间戳
now = time.time()
date = datetime.datetime.fromtimestamp(now, datetime.timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT')
# 构建签名原始字符串
signature_origin = f"host: {host}\ndate: {date}\nGET {path} HTTP/1.1"
# 计算HMAC-SHA256签名
signature_sha = hmac.new(
api_secret.encode('utf-8'),
signature_origin.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
# Base64编码
signature_sha_base64 = base64.b64encode(signature_sha).decode()
# 构建认证字符串
authorization_origin = (
f'api_key="{api_key}", algorithm="hmac-sha256", '
f'headers="host date request-line", signature="{signature_sha_base64}"'
)
# Base64编码认证字符串
authorization = base64.b64encode(authorization_origin.encode()).decode()
# 构建最终URL
query_params = {
'authorization': authorization,
'date': date,
'host': host
}
return f"{api_url}?{urlencode(query_params)}"