对话端点使用教程¶
对话端点是智能客服系统对外提供问答能力的核心入口,提供**同步**与 **SSE 流式**两套接口,共享同一套会话管理与多 Agent 编排逻辑,仅在最终生成阶段是否流式吐出上有所差异。
前置条件
- 服务已启动(默认
http://localhost:8000) - 端点前缀统一为
/api/v1/chat - 鉴权头:
X-API-Key。开发模式下API_KEY=空即免鉴权,生产环境需在.env配置API_KEY
端点概览¶
| 端点 | 方法 | 说明 | 响应类型 |
|---|---|---|---|
/api/v1/chat |
POST | 同步返回完整回复 | application/json |
/api/v1/chat/stream |
POST | SSE 流式逐 Token 吐出 | text/event-stream |
两个端点入参完全一致,业务侧可按是否需要"打字效果"自由切换。
同步对话:POST /api/v1/chat¶
请求体¶
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
message |
string | 是 | 用户消息内容 |
session_id |
string | 否 | 会话 ID,首次对话可不传,系统自动创建并返回 |
channel |
string | 否 | 接入渠道,默认 api,可选 web/app/wechat/dingtalk/api |
user_id |
string | 否 | 用户标识,用于会员识别与个性化 |
响应体¶
{
"session_id": "sess-9f3c2a1b",
"reply": "您可以登录「我的订单」页面查看物流状态...",
"status": "ok",
"data": {
"intent": "knowledge_qa",
"sources": ["产品FAQ.md", "物流说明.md"],
"escalate_to_human": false,
"escalation_card": null,
"turn_count": 2,
"failed_attempts": 0,
"emotion_score": 0.85,
"sub_tasks": []
}
}
关键字段说明
intent:意图识别结果,可能值为chitchat / knowledge_qa / business_query / emotion_sensitive / transfer_to_human / ticket等escalate_to_human:是否触发转人工,为true时escalation_card非空,可直接传递给坐席工作台sources:RAG 命中的知识来源文件名,未命中时为空数组turn_count:当前会话已进行的对话轮数failed_attempts:连续未解决次数,达到阈值会触发转人工
示例¶
import httpx
# 通过 session_id 续接多轮对话,缺失时由服务端自动创建
resp = httpx.post(
"http://localhost:8000/api/v1/chat",
headers={
"Content-Type": "application/json",
"X-API-Key": "", # 开发模式留空
},
json={
"message": "我的订单什么时候发货?",
"channel": "web",
"user_id": "u_10086",
},
timeout=30.0,
)
data = resp.json()
# 保存 session_id 以便后续多轮对话续接
session_id = data["session_id"]
print(data["reply"])
print("命中来源:", data["data"]["sources"])
SSE 流式对话:POST /api/v1/chat/stream¶
流式端点返回 text/event-stream,逐 Token 下发,适合前端实现打字机效果,首 Token < 1 秒。
事件类型¶
| 事件 | 触发时机 | data 关键字段 |
|---|---|---|
meta |
编排开始时(含意图与来源) | intent / sources / escalate |
token |
每次吐出一段文本(多次) | content |
done |
流正常结束 | turn_count / escalate / answer |
error |
任一阶段异常 | message |
首事件先于 LLM 调用
流式端点会先 yield meta 事件,让前端在 LLM 调用前就能展示意图,把首 Token 控制在 200ms 内。闲聊/转人工等快通道意图命中时直接跳过 LLM。
curl 流式接收¶
# -N 关闭缓冲,保证 token 实时下发到终端
curl -N -X POST http://localhost:8000/api/v1/chat/stream \
-H "Content-Type: application/json" \
-H "X-API-Key: ${API_KEY}" \
-d '{"message": "介绍一下你们的退换货政策"}'
输出示例(每行一个 SSE 事件):
event: meta
data: {"intent": "knowledge_qa", "sources": ["return_policy.md"]}
event: token
data: {"content": "我们的退换货政策如下:"}
event: token
data: {"content": "自签收之日起 7 天内可申请退货..."}
event: done
data: {"turn_count": 1, "escalate": false, "answer": "我们的退换货政策..."}
Python 流式接收¶
import httpx
import json
def stream_chat(message: str, session_id: str = None):
"""流式接收 SSE 事件,逐 token 拼装回复。"""
with httpx.stream(
"POST",
"http://localhost:8000/api/v1/chat/stream",
headers={"Content-Type": "application/json", "X-API-Key": ""},
json={"message": message, "session_id": session_id},
timeout=60.0,
) as resp:
event_type = None
full_answer = []
for line in resp.iter_lines():
if not line:
continue
# SSE 协议:event: 与 data: 前缀分别标识事件类型与负载
if line.startswith("event:"):
event_type = line.split(":", 1)[1].strip()
elif line.startswith("data:"):
payload = json.loads(line.split(":", 1)[1].strip())
if event_type == "token":
print(payload["content"], end="", flush=True)
full_answer.append(payload["content"])
elif event_type == "done":
return payload
elif event_type == "error":
raise RuntimeError(payload["message"])
stream_chat("介绍一下你们的退换货政策")
import httpx
from sseclient import SSEClient
# sseclient 自动处理 event/data 解析,代码更简洁
resp = httpx.post(
"http://localhost:8000/api/v1/chat/stream",
headers={"Content-Type": "application/json", "X-API-Key": ""},
json={"message": "介绍一下你们的退换货政策"},
timeout=60.0,
)
client = SSEClient(resp.iter_lines())
for event in client.events():
data = json.loads(event.data)
if event.event == "token":
print(data["content"], end="", flush=True)
elif event.event == "done":
print("\n完整回复:", data["answer"])
break
nginx 缓冲关闭
生产环境若经过 nginx 反代,需确保上游响应头 X-Accel-Buffering: no 生效(端点已默认下发),否则 token 会被缓冲到流结束才一次性吐出,丧失流式体验。
多轮对话:上下文自动管理¶
系统通过 SessionManager 自动维护对话上下文,无需业务侧手动拼接历史:
- 首次对话不传
session_id,响应体返回新创建的session_id - 后续对话把该
session_id回传,系统自动加载历史并写入新轮次 - 每轮
turn_count自增,便于前端展示"已进行 N 轮对话"
# 多轮对话示例:复用 session_id 续接上下文
session_id = None
questions = [
"我想查询订单状态", # 第 1 轮
"订单号是 ORD-2024-001", # 第 2 轮:承接上文"订单"
"那它的物流到哪了?", # 第 3 轮:上下文指代 ORD-2024-001
]
for question in questions:
resp = httpx.post(
"http://localhost:8000/api/v1/chat",
headers={"X-API-Key": ""},
json={"message": question, "session_id": session_id},
timeout=30.0,
)
data = resp.json()
session_id = data["session_id"] # 始终回传,保持同一会话
print(f"[轮次 {data['data']['turn_count']}] {data['reply']}")
会话过期与清理
会话默认存储在内存中,进程重启会清空。生产环境建议配置 REDIS_URL 持久化会话。长时间无活动的会话由系统按内部策略自动回收,业务侧无需关心。
会话管理¶
会话的创建、查询、续接均由 SessionManager 内部完成,业务侧只需关注 session_id 的传递。
会话生命周期¶
sequenceDiagram
participant Client as 客户端
participant API as 对话端点
participant SM as SessionManager
Client->>API: POST /chat (无 session_id)
API->>SM: get_or_create(session_id=None)
SM-->>API: 新建 session_id
API->>SM: increment_turn + append_history
API-->>Client: {session_id, reply, ...}
Client->>API: POST /chat (回传 session_id)
API->>SM: get_or_create(session_id=xxx)
SM-->>API: 返回现有会话(含历史)
API->>SM: 追加本轮 user/assistant 消息
API-->>Client: {session_id, reply, ...}
会话状态字段¶
会话内部维护的关键状态(部分透出到响应 data 字段):
turn_count:当前轮次failed_attempts:连续未解决次数,归零表示本轮已解决current_intent:最近一次识别的意图emotion_score:用户情绪得分,0-1 之间,越低越激动agent_status:坐席侧状态,转接后变为pending/assigned/resolved
HotQueryCache 命中场景¶
系统内置热点查询缓存(HotQueryCache),对**重复且已解决**的知识问答直接返回缓存结果,跳过整套多 Agent 编排。
命中性能
- 命中条件:相同 query + 相同上下文指纹(session_id/intent/turn_count/user_id)
- 命中延迟:首 Token < 30ms,跳过意图识别、检索、生成全链路
- 同步与流式端点共享缓存,任一端点命中过的查询在另一端点也能命中
命中流程¶
flowchart LR
A[请求进入] --> B{HotQueryCache 命中?}
B -- 命中 --> C[直接返回缓存回复<br/>跳过编排 <30ms]
B -- 未命中 --> D[意图识别 → 检索 → 生成]
D --> E{是否已解决?}
E -- 是 --> F[写入 HotQueryCache]
E -- 否 --> G[不缓存, failed_attempts+1]
F --> H[返回回复]
G --> H
验证命中¶
import time
import httpx
query = "退换货政策是什么?"
# 首次查询:未命中,走完整编排(约 1-2 秒)
t1 = time.perf_counter()
httpx.post("http://localhost:8000/api/v1/chat",
headers={"X-API-Key": ""}, json={"message": query})
print(f"首次:{(time.perf_counter()-t1)*1000:.0f}ms")
# 再次查询:命中缓存(首 Token <30ms,整体 <50ms)
t2 = time.perf_counter()
httpx.post("http://localhost:8000/api/v1/chat",
headers={"X-API-Key": ""}, json={"message": query})
print(f"命中:{(time.perf_counter()-t2)*1000:.0f}ms")
知识库更新后必须清缓存
知识库内容变更后,旧缓存可能返回过期回复。请调用 POST /api/v1/performance/cache/invalidate 清空热点缓存,详见 性能优化教程。
错误处理¶
429 限流¶
系统对 LLM 调用做了并发限流(默认 MAX_CONCURRENT_LLM_CALLS=10)。超限时返回 429:
客户端处理建议:指数退避重试 2-3 次,间隔 1s / 2s / 4s。
500 服务异常¶
LLM 服务不可用、向量库异常等内部错误返回 500,响应体含 detail 字段。系统已内置多重降级:
- LLM 不可用 → 熔断器打开,返回兜底话术
- 向量库异常 → 降级为 BM25 关键词检索
- 全部失败 → 返回"未找到相关内容"提示并累加
failed_attempts
import httpx
import time
def chat_with_retry(message, max_retries=3):
"""带重试的对话调用,处理 429/5xx 临时性错误。"""
for attempt in range(max_retries):
resp = httpx.post(
"http://localhost:8000/api/v1/chat",
headers={"X-API-Key": ""},
json={"message": message},
timeout=30.0,
)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 429 and attempt < max_retries - 1:
# 指数退避:1s, 2s, 4s
time.sleep(2 ** attempt)
continue
# 5xx 或重试耗尽,抛出供上层处理
resp.raise_for_status()
raise RuntimeError("对话请求重试耗尽")
流式端点的错误¶
SSE 协议约定 HTTP 状态保持 200,错误通过 error 事件下发。客户端收到 error 事件后应停止读取并展示错误信息: