去年年底,我接手了一个棘手项目——给一家日活3万左右的家居电商平台重构客服系统。原本的系统就是个简单的关键词匹配,用户问“我的床什么时候到”,它回“您好,请问有什么可以帮助您的”,气得用户直接打投诉电话。老板下了死命令:新系统要能查订单、能退换货、能推荐替代商品,还得像个正常人一样把话说清楚。
起初我想偷懒,就扔给一个GPT-4o,把功能描述、数据库schema全塞进system prompt。结果第一天就翻车了:用户问“我上周买的床头柜能退吗”,模型直接编了个订单号出来,还信誓旦旦地说“您的订单号是ORD-2024-0888,退款预计3个工作日到账”。实际上这订单根本不存在,用户也没买过床头柜。客服主管发截图给我,配文:“你家AI是诈骗犯?”
这事让我意识到,单靠一个LLM做端到端推理是找死。复杂业务流程需要结构化拆解——得有专门的“角色”负责查询订单、专门的角色处理退款、专门的角色生成回复。正好那段时间LangChain的Agent框架更新了0.3版本,Tool装饰器和RunnableBranch的组合拳看起来很对味,我决定用多Agent编排重新搞。
这篇文章记录的就是整个架构设计、踩坑填坑的过程。不教基础概念,只聊实战中真正管用的东西。
30秒速览
- 多Agent架构解决单Agent幻觉问题但对延迟有帮助,gpt-4o-mini做路由够了
- 退款流程硬编码流水线,准确率拉到100%,LLM只管润色语气
- 消息总线(共享上下文)比Agent间传话靠谱,但耦合度高
- 别信框架最佳实践,你的业务场景会教你做人
- Agent边界在于“不能错的事”,把确定性逻辑用代码写死
一个Agent搞不定的事,三个Agent就能搞定?别天真了
我先说结论:多Agent架构确实解决了单Agent幻觉和高延迟的问题,但引入的协调成本比你想象的高一个数量级。这半年我踩的坑,70%都在Agent之间的通信上。
先看业务需求拆解。这个客服系统要处理三类请求:
- 订单查询:用户报手机号或订单号,查出订单状态、物流信息。听起来简单,但实际订单状态有17种(待付款、待发货、已发货、运输中、派送中、已签收、已拒收、部分退货、全部退货、退款中、已退款、换货中、已换货、异常、已取消、已完成、超时关闭),每种要返回不同的话术。
- 售后处理:发起退款、换货、维修。涉及库存检查、金额计算、退货地址匹配。一个退款请求要写3张表:退款单、退货入库单、财务流水。
- 商品推荐:根据用户购买记录和当前咨询意图,推荐关联商品。用协同过滤+LLM二次排序,不能让模型瞎编不存在的SKU。
我最初的设计是搞一个超级Agent,把所有工具函数(查订单、查库存、写退款单、调推荐接口)全挂上去。理论上LLM可以自己决策调用哪个工具。实际跑起来延迟高到爆炸——每次决策要发1次LLM call拆解意图,再发1次call选择工具,再发1次call组织回复。一个退款请求三个round trip,用户等了8秒才收到回复。老板的原话:“这比人工客服还慢。”
而且OpenAI的function calling在处理5个以上工具时,出错率显著上升。我统计了100次测试:挂7个工具时,选错工具的概率是8%;挂10个工具时飙升到23%。尤其“查订单”和“查退款单”两个工具,Schema太像了,模型经常搞混。
折腾了一周,我改成了三个专职Agent各管一摊:
| Agent | 职责 | 挂载工具数 | 平均决策时间 |
|---|---|---|---|
| OrderAgent | 订单查询、物流追踪 | 3个 | 1.2s |
| AfterSalesAgent | 退款、换货、维修 | 5个 | 1.8s |
| RecommendAgent | 商品推荐、优惠匹配 | 4个 | 1.5s |
但别以为拆开就万事大吉。Agent之间怎么交接任务?用户说“我想退掉上周买的枕头,再看看有没有更舒服的推荐”,这需要AfterSalesAgent和RecommendAgent协同。我最开始用了一个无比愚蠢的方式——在Supervisor里写if-else:
# 千万别这么干——硬编码路由逻辑
def stupid_router(user_input: str) -> str:
if "退" in user_input or "换" in user_input:
return "after_sales"
elif "推荐" in user_input or "有什么" in user_input:
return "recommend"
elif "订单" in user_input or "物流" in user_input:
return "order"
else:
return "order" # 默认走订单
上线第一天就炸了。用户说“我不退,我就问问退货政策”,被硬生生路由到AfterSalesAgent,Agent一看“不退”直接懵了,开始瞎编政策。更离谱的是“推荐”和“订单”同时出现时,路由随机选一个,把另一半需求丢了。
老老实实上LLM路由。LangChain 0.3的RunnableBranch可以定义条件分支,但我的路由逻辑太复杂(要考虑意图、上下文、历史对话),简单分支不够用。最后用了一个RouterAgent,专门负责意图分类和任务分发:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from typing import Literal
# RouterAgent的意图分类prompt——注意我加了“不确定时走order”的兜底
ROUTER_PROMPT = ChatPromptTemplate.from_messages([
("system", """你是客服路由专家。分析用户意图,返回单一类别:
- order: 查询订单状态、物流信息
- after_sales: 退款、换货、维修、投诉
- recommend: 商品推荐、找相似产品
- hybrid: 同时包含多个意图(此时返回hybrid,我会并行处理)
规则:
1. 询问政策和流程属于after_sales,不是order
2. 抱怨但不要求动作属于after_sales
3. 比较商品属于recommend
4. 不确定时返回order(至少能查订单,给用户一个交代)
只返回类别名称,不要解释。
"""),
("user", "{input}"),
("placeholder", "{history}")
])
router_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # 路由必须temperature=0
router_chain = ROUTER_PROMPT | router_llm | StrOutputParser()
这里有个关键决策:RouterAgent用gpt-4o-mini而不是gpt-4o。理由是路由任务不需要强推理能力(分类准确率mini和4o只差1.2%),但延迟从800ms降到200ms,成本降到1/10。对于每次对话都要调一次的路由来说,这个trade-off很值。
OrderAgent的定义过程:把“订单查询”拆成17种状态的决策树
OrderAgent看起来最简单——不就是查数据库返回结果吗?我一开始也这么想,然后就踩了最大的坑。
问题出在订单状态的复杂性上。数据库里order_status字段是枚举值:pending_payment, pending_shipment, shipped, in_transit, out_for_delivery, signed, rejected, partial_return, full_return, refunding, refunded, exchanging, exchanged, abnormal, cancelled, completed, closed。一共17种。
老板的要求是:“物流到哪了要精确到城市”、“退款到哪一步了要说清楚”、“异常订单要告诉用户原因”。这意味着OrderAgent不能只返回状态码,要根据状态码+上下文生成人性化的回复。
我第一版直接把17种状态的解释全写进system prompt,结果prompt膨胀到1200个token,Agent反应慢不说,还经常搞混“refunding”(退款中)和“refunded”(已退款)。用户问“钱什么时候到账”,Agent查出来是refunding状态,回复说“您的退款已到账”——用户查了银行卡发现没到账,反手就是一个投诉。
重构方案:把状态解释逻辑从prompt里抽出来,变成结构化数据+模板引擎:
# order_status_template.py —— 状态到回复模板的映射
# 每个状态对应一个模板函数,接收order对象,返回自然语言
from datetime import datetime
from typing import Dict, Any
def _format_logistics(order: Dict[str, Any]) -> str:
"""物流信息格式化——根据物流商API返回的轨迹动态生成"""
traces = order.get("logistics_traces", [])
if not traces:
return "物流信息暂未更新,请稍后再查。"
latest = traces[-1]
return f"您的快递已到达【{latest['location']}】,{latest['status']}。预计{order.get('estimated_delivery', '近期')}送达。"
STATUS_TEMPLATES = {
"pending_payment": lambda o: f"订单{o['order_id']}待付款,请在{o.get('payment_deadline', '24小时内')}完成支付,否则将自动取消。",
"pending_shipment": lambda o: f"订单{o['order_id']}已支付,仓库正在备货,预计{o.get('ship_time', '1-3个工作日')}内发货。",
"shipped": lambda o: f"订单{o['order_id']}已发货,{_format_logistics(o)}",
"in_transit": lambda o: f"订单{o['order_id']}运输中,{_format_logistics(o)}",
"out_for_delivery": lambda o: f"订单{o['order_id']}正在派送中,快递员{o.get('courier_name', '')},电话{o.get('courier_phone', '')}。",
"signed": lambda o: f"订单{o['order_id']}已于{o.get('sign_time', '')}签收。",
"refunding": lambda o: f"订单{o['order_id']}退款处理中,预计{o.get('refund_estimate', '3-7个工作日')}到账。",
"refunded": lambda o: f"订单{o['order_id']}已退款{o.get('refund_amount', '')}元,请查收。若未到账请联系银行。",
"abnormal": lambda o: f"订单{o['order_id']}状态异常:{o.get('abnormal_reason', '请联系人工客服')}。",
"cancelled": lambda o: f"订单{o['order_id']}已取消" + (f",原因:{o.get('cancel_reason', '')}" if o.get('cancel_reason') else "") + "。",
# ... 其他状态同理
}
def generate_status_reply(order: Dict[str, Any]) -> str:
"""根据订单状态调用对应的模板函数"""
status = order.get("status", "unknown")
template_func = STATUS_TEMPLATES.get(status)
if template_func:
return template_func(order)
return f"订单{o.get('order_id', '未知')}状态:{status}。如有疑问请联系人工客服。"
这样一来,OrderAgent的工具函数返回的不是原始状态码,而是已经格式化好的文本。Agent的职责变成了:调用查订单工具 → 拿到格式化回复 → 结合对话历史微调语气(比如用户很着急就加安抚的话)。prompt从1200 token砍到400 token,决策速度提升明显。
但模板方案也有缺点——17个模板函数写了将近300行代码,维护成本不低。后来业务加了“冷链物流”和“大件配送”两种特殊物流类型,每种的状态展示又不一样。我试图让LLM直接生成回复,但测试了50次,有8次出现了事实性错误(把“运输中”说成“已发货”,把金额小数点搞错)。对于一个涉及钱的客服场景,8%的错误率不可接受。最后还是老老实实维护模板,但用了一个trick——让LLM在模板基础上做“润色”,而不是“生成”:
# OrderAgent的system prompt片段
# 关键约束:LLM只能润色,不能改事实
ORDER_AGENT_SYSTEM = """你是订单查询助手。
工作流程:
1. 从工具返回的result中提取【订单信息】(这是模板生成的,事实必须保持原样)
2. 如果用户情绪焦虑,在回复前加安抚语(如“我理解您很着急”)
3. 绝对禁止修改:订单号、金额、日期、物流位置、状态描述
4. 可以润色:语气、称谓、表情符号(适度)
错误示例❌:工具返回“订单ORD-12345已退款100元”,你改成“退款已到账”——这是篡改事实
正确示例✅:工具返回“订单ORD-12345已退款100元”,你改成“好的,我查到您的订单ORD-12345已经退款100元啦~请您查收一下银行卡哦”
"""
这个“模板生成事实 + LLM润色表达”的分层策略,在实际业务中跑了一个月,事实错误率从8%降到了0.3%(只有一次把“顺丰”说成了“中通”,是因为物流商API返回的数据本身写错了)。
AfterSalesAgent的“退款决策链”——五个工具怎么串成一条流水线
三个Agent里,AfterSalesAgent是最复杂的。一个退款请求涉及5个步骤:查询原始订单 → 检查库存状态(如果是退货) → 计算退款金额 → 匹配退货地址 → 写入退款单。每个步骤都可能失败,每个失败都要给用户能理解的回复。
我见过太多项目把这种流程写成Agent的“自由发挥”——给5个工具,让LLM自己决策调用顺序。结果就是:同一个退款请求,有时候先查库存再查订单(导致查不到,因为需要订单里的SKU去查库存),有时候跳过了金额计算直接写退款单(退款金额为0被财务系统拒了)。
这里必须上编排。LangChain 0.3的ToolCallingAgent支持定义工具间的依赖关系,但不够直观。我选择了更可控的方案——把5个工具串成一个chain,Agent只负责在chain的起点注入参数,以及在chain的终点处理异常。具体实现用了RunnableSequence:
from langchain_core.runnables import RunnableLambda, RunnableSequence
from langchain_core.tools import tool
from typing import Dict, Any
# ---- 五个工具函数(真实数据库和API调用) ----
@tool
def get_original_order(order_id: str) -> Dict[str, Any]:
"""根据订单ID查询原始订单,返回订单详情和商品列表"""
# 真实场景:查询MySQL订单表+订单商品表
# 这里简化——实际用SQLAlchemy查
order = db_session.query(Order).filter_by(order_id=order_id).first()
if not order:
return {"error": f"未找到订单{order_id}"}
return {
"order_id": order.order_id,
"status": order.status, # 必须为signed/delivered才能退款
"items": [{"sku": item.sku, "qty": item.qty, "price": item.price} for item in order.items],
"total_amount": order.total_amount,
"buyer_phone": order.buyer_phone
}
@tool
def check_inventory_for_return(sku: str, qty: int) -> Dict[str, Any]:
"""检查库存容量——退货需要仓库有空位"""
# 调用WMS系统API
inventory = wms_client.get_available_capacity(sku)
return {"sku": sku, "can_accept": inventory["available"] >= qty, "available_capacity": inventory["available"]}
@tool
def calculate_refund_amount(order_data: Dict, return_items: list) -> float:
"""计算退款金额,考虑优惠分摊和运费"""
# 这里逻辑复杂:满减券平摊、运费按重量比例、积分抵扣
return Calculator.compute(order_data, return_items)
@tool
def match_return_address(sku: str) -> str:
"""根据商品SKU匹配退货仓库地址"""
# 不同品类退回不同仓库:家具回大件仓,家纺回小件仓
return warehouse_service.get_address_by_sku(sku)
@tool
def create_refund_order(order_id: str, amount: float, address: str, items: list) -> str:
"""写入退款单到数据库,返回退款单号"""
refund_id = f"RF{datetime.now().strftime('%Y%m%d%H%M%S')}{order_id[-4:]}"
# 写入refund_orders表,同时创建return_warehouse_entry单
db_session.add(RefundOrder(refund_id=refund_id, ...))
db_session.commit()
return refund_id
# ---- 把五个工具串成Chain ----
# RunnableLambda包装,让工具可以流式传递上下文
step1 = RunnableLambda(lambda inputs: {
"order": get_original_order(inputs["order_id"]),
**inputs
})
step2 = RunnableLambda(lambda inputs: {
**inputs,
"inventory_check": [
check_inventory_for_return(item["sku"], item["qty"])
for item in inputs["order"]["items"]
if item["sku"] in [i["sku"] for i in inputs.get("return_items", inputs["order"]["items"])]
]
})
step3 = RunnableLambda(lambda inputs: {
**inputs,
"refund_amount": calculate_refund_amount(inputs["order"], inputs.get("return_items", inputs["order"]["items"]))
})
step4 = RunnableLambda(lambda inputs: {
**inputs,
"return_address": match_return_address(inputs["order"]["items"][0]["sku"]) # 简化为取第一个SKU
})
step5 = RunnableLambda(lambda inputs: {
**inputs,
"refund_id": create_refund_order(
inputs["order"]["order_id"],
inputs["refund_amount"],
inputs["return_address"],
inputs.get("return_items", inputs["order"]["items"])
)
})
refund_pipeline = step1 | step2 | step3 | step4 | step5
这个方案的好处是每一步的输出都是结构化的dict,下一步可以依赖前一步的所有字段。Agent调用时只需要:
# Agent只负责收集参数,启动流水线,处理异常
def handle_refund_with_agent(user_input: str):
# Agent提取order_id和return_items
agent_result = after_sales_agent.invoke({"input": user_input})
try:
pipeline_result = refund_pipeline.invoke({
"order_id": agent_result["order_id"],
"return_items": agent_result.get("return_items", [])
})
# 拿到refund_id后,让Agent生成用户回复
return reply_agent.invoke({"refund_id": pipeline_result["refund_id"], "amount": pipeline_result["refund_amount"]})
except Exception as e:
# 流水线某一步失败——根据失败步骤给不同的回复
return handle_pipeline_error(e, agent_result)
我纠结过到底让Agent全权调度还是硬编码流水线。最后选了这种“Agent前置收集参数 + 硬编码流水线执行 + Agent后置生成回复”的混合模式。理由是:
- 退款流程的步骤顺序是固定的,不存在“动态决策”的空间。让LLM决策调用顺序只会引入不确定性。
- 但参数收集(识别用户要退哪些商品)确实需要LLM的理解能力,这里让Agent做。
- 回复生成需要同理心和话术技巧,也是LLM的强项。
上线后数据:退款流程平均处理时间从11秒降到3.2秒(因为少了3次LLM round trip),退款金额计算准确率100%(因为逻辑是写死的),用户满意度评分从3.4升到4.1。代价是代码里多了一个300行的pipeline模块,每次新增退款场景(比如部分退款、运费单独退)都要改流水线代码,而不是改改prompt就行。但我接受这个代价——客户服务的底线是准确性,不是灵活性。
三个Agent的协作机制:从“硬编码交接”到“消息总线”的演变
前面提到,三个Agent之间的任务交接是最大的难题。我在这块花了整整两周,方案换了三次。
第一版:硬编码路由(失败)
就是前面那个if-else的愚蠢方案。不展开了,说多了都是泪。
第二版:Supervisor Agent调度
搞一个Supervisor Agent,把三个子Agent当成工具注册上去。Supervisor分析用户输入,决定调用哪个子Agent,拿到返回后再决定是否调用另一个子Agent,最后汇总回复。
这个方案跑了一个星期,效果还行,但有两个致命问题:
1. 延迟叠加:Supervisor自己决策一次(700ms)→ 调用子Agent(1.2s)→ Supervisor再决策(700ms)→ 调用第二个子Agent(1.5s)→ Supervisor汇总(800ms)。总共4.9秒,用户已经关了对话窗口。
2. 上下文丢失:Supervisor拿到子Agent的返回后,往往只保留最终结果,把中间推理丢了。比如OrderAgent查出来“用户买了3件商品,其中1件已签收”,Supervisor调用AfterSalesAgent退款时,只传了“用户要退款”,AfterSalesAgent又得重新查一遍订单确定退哪件。
我试图用LangChain的共享记忆(ConversationBufferMemory)来传递上下文,但发现不同Agent写入记忆的格式不一致:OrderAgent存的是订单JSON,AfterSalesAgent存的是退款ID,RecommendAgent存的是SKU列表。Supervisor在汇总时需要理解三种格式,复杂度爆炸。
第三版:消息总线 + 共享上下文对象(当前方案)
受微服务架构的启发,我设计了一个轻量级的消息总线。原理很简单:所有Agent不直接通信,而是向一个Bus发布消息,Bus负责路由消息、维护共享上下文、决策任务是否完成。
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SharedContext:
"""三个Agent共享的上下文对象——这就是“消息总线”的核心"""
user_id: str
original_query: str
tasks: Dict[str, TaskStatus] = field(default_factory=dict) # {"order_query": PENDING, "refund": IN_PROGRESS, ...}
order_data: Optional[Dict] = None # OrderAgent查询结果
refund_data: Optional[Dict] = None # AfterSalesAgent处理结果
recommend_data: Optional[List] = None # RecommendAgent推荐结果
error_log: List[str] = field(default_factory=list) # 异常记录
def is_all_done(self) -> bool:
return all(s in [TaskStatus.COMPLETED, TaskStatus.FAILED] for s in self.tasks.values())
class AgentBus:
"""消息总线——管理共享上下文和任务状态"""
def __init__(self):
self.contexts: Dict[str, SharedContext] = {} # user_id -> context
def start_session(self, user_id: str, query: str, tasks: Dict[str, TaskStatus]):
self.contexts[user_id] = SharedContext(
user_id=user_id,
original_query=query,
tasks=tasks
)
def update_context(self, user_id: str, agent_name: str, data: Any):
"""Agent完成任务后更新上下文"""
ctx = self.contexts.get(user_id)
if not ctx:
raise ValueError(f"No session for user {user_id}")
if agent_name == "order":
ctx.order_data = data
ctx.tasks["order_query"] = TaskStatus.COMPLETED
elif agent_name == "after_sales":
ctx.refund_data = data
ctx.tasks["refund"] = TaskStatus.COMPLETED
elif agent_name == "recommend":
ctx.recommend_data = data
ctx.tasks["recommend"] = TaskStatus.COMPLETED
def get_context(self, user_id: str) -> SharedContext:
return self.contexts.get(user_id)
# 全局Bus实例
bus = AgentBus()
RouterAgent(替代了Supervisor)的工作流程变成:
def orchestrate(user_id: str, user_input: str, history: list):
# 1. RouterAgent分类意图
intent = route_intent(user_input, history)
# 2. 根据意图初始化任务列表
tasks = {}
if "order" in intent:
tasks["order_query"] = TaskStatus.PENDING
if "after_sales" in intent:
tasks["refund"] = TaskStatus.PENDING
if "recommend" in intent:
tasks["recommend"] = TaskStatus.PENDING
bus.start_session(user_id, user_input, tasks)
# 3. 并行执行独立任务
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
if tasks.get("order_query"):
futures["order"] = executor.submit(order_agent_execute, user_id, user_input, bus.get_context(user_id))
if tasks.get("refund"):
# refund可能依赖order,但order_query是独立任务,refund内部会先检查bus里的order_data
futures["after_sales"] = executor.submit(after_sales_agent_execute, user_id, user_input, bus.get_context(user_id))
if tasks.get("recommend"):
futures["recommend"] = executor.submit(recommend_agent_execute, user_id, user_input, bus.get_context(user_id))
# 等待所有任务完成
for name, future in futures.items():
try:
result = future.result(timeout=10) # 10秒超时
bus.update_context(user_id, name, result)
except Exception as e:
bus.contexts[user_id].error_log.append(f"{name} failed: {str(e)}")
bus.contexts[user_id].tasks[name] = TaskStatus.FAILED
# 4. 汇总Agent根据bus里的所有数据生成最终回复
ctx = bus.get_context(user_id)
return reply_synthesizer(ctx)
这个架构的关键设计决策:
- RouterAgent 不再做串行调度,而是识别意图后一次性把所有任务发出去并行执行。如果AfterSalesAgent需要OrderAgent的结果,它自己从bus里取(用get_context),取不到就自己查。
- 每个Agent执行时都会检查bus里是否已有数据可用。比如AfterSalesAgent会先检查ctx.order_data,有就直接用,省一次数据库查询。
- 汇总Agent(ReplySynthesizer)不参与业务逻辑,只负责把所有结果拼成自然语言。
性能提升明显:一个包含订单查询+退款+推荐的复合请求,从串行Supervisor的4.9秒降到2.3秒(并行执行+减少重复查询)。但代码复杂度翻倍了——bus的状态管理、异常处理、超时重试都是新问题。有次bus里某用户session卡在IN_PROGRESS状态一直没清理,内存泄漏导致服务撑了两天就OOM了。后来加了一个定时任务,每15分钟清理超过30分钟的session。
说实话,我也不确定这个消息总线的方案是不是最优解。它看起来很美(微服务思路),但调试地狱——一个请求涉及4个Agent和1个Bus,出问题时得同时翻5份日志。如果有下次,我可能试试直接把三个Agent的能力合并成一个ReAct Agent,但用结构化输出强制拆解步骤。那是另一个话题了。
性能优化:从“能跑就行”到“扛住双11峰值”的差距
系统上线后平稳跑了两个月,日均处理3000次对话,延迟中位数3.2秒,用户勉强接受。但老板说双11要来了,预估峰值QPS是平时的5倍。我做了两轮压测,发现三个瓶颈。
瓶颈1:LLM调用太密
一次对话平均调用LLM 4.3次:RouterAgent路由1次、OrderAgent决策1次、AfterSalesAgent决策1次、ReplySynthesizer汇总1次,加上偶尔失败重试。每次调用gpt-4o平均耗时800ms,4.3次就是3.4秒。
优化策略:
- RouterAgent换gpt-4o-mini:前面说了,分类准确率只降1.2%,延迟降75%,成本降90%。双11期间每天15万次路由调用,这个差价够买一台MacBook Pro。
- Agent工具调用改为批处理:原本OrderAgent查询订单时,先调get_order,再根据返回的SKU逐个调check_logistics。改成一次返回所有数据后,Agent决策的round trip从3次降到1次。
# 优化前:两个工具分步调用,Agent需要两轮决策
# Step 1: Agent调用get_order → 返回order基本信息和items列表
# Step 2: Agent分析返回,提取每个item的SKU,调用check_logistics
# Step 3: Agent合并结果,生成回复
# 总共3次LLM decision round + 2次工具调用 = 约3.6s
# 优化后:get_order_with_logistics一次返回全量数据
@tool
def get_order_with_logistics(order_id: str) -> Dict[str, Any]:
"""查询订单+所有商品的物流信息,一次返回"""
order = db_session.query(Order).filter_by(order_id=order_id).first()
if not order:
return {"error": f"订单{order_id}不存在"}
items = []
for item in order.items:
logistics = logistics_client.query(item.sku, order_id) # 批量查物流
items.append({
"sku": item.sku,
"name": item.product_name,
"qty": item.qty,
"price": item.price,
"logistics_status": logistics.get("status", "unknown"),
"logistics_location": logistics.get("location", ""),
"estimated_arrival": logistics.get("eta", "")
})
return {
"order_id": order.order_id,
"status": order.status,
"total": order.total_amount,
"items": items # 全量返回,Agent一次决策就够了
}
# Agent只需要1次LLM call:分析返回数据 → 套用模板 → 生成回复
# 总耗时降到约1.2s
- 引入缓存:对于订单状态这类变化不频繁的数据,在Agent调用工具前先查Redis缓存。同一个用户30秒内重复问“我的订单到哪了”,直接走缓存,LLM都不用调。
优化后,平均LLM调用次数从4.3降到1.8,延迟从3.2s降到1.5s。
瓶颈2:数据库查询没做连接池
每个Agent工具都是独立连接数据库,用完就关。平时没事,双11压测时QPS一上来,MySQL连接数爆炸(超过max_connections限制)。错误日志全是“Too many connections”。
解决方法很常规,但容易忽视:SQLAlchemy配置连接池,大小设成和Agent并发数匹配。
# 配置连接池——注意pool_size和max_overflow的配比
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"mysql+pymysql://user:pass@host:3306/customer_service",
pool_size=20, # 核心连接数,等于gunicorn worker数
max_overflow=10, # 溢出连接数,应对突发
pool_recycle=3600, # 1小时后回收连接,防止断开
pool_pre_ping=True, # 使用前ping一下,自动重连
echo=False
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
压测时用20个gunicorn worker,数据库连接稳定在20-25个,没有再爆过。
瓶颈3:Bus的并行执行有资源竞争
Orchestrate函数里用了ThreadPoolExecutor并行调用三个Agent,但每个Agent内部又可能并发调工具(比如AfterSalesAgent的流水线里,check_inventory_for_return要并发调多次)。线程嵌套线程,Python的GIL让CPU跑不满,上下文切换开销反而增大。
换成了asyncio + aiohttp,把Agent的工具调用全部改成异步:
import asyncio
import aiohttp
async def async_orchestrate(user_id: str, user_input: str, history: list):
intent = await async_route_intent(user_input, history)
tasks = {}
if "order" in intent:
tasks["order_query"] = TaskStatus.PENDING
if "after_sales" in intent:
tasks["refund"] = TaskStatus.PENDING
if "recommend" in intent:
tasks["recommend"] = TaskStatus.PENDING
bus.start_session(user_id, user_input, tasks)
ctx = bus.get_context(user_id)
# 用asyncio.gather并行执行
coroutines = []
if tasks.get("order_query"):
coroutines.append(async_order_agent(user_id, user_input, ctx))
if tasks.get("refund"):
coroutines.append(async_after_sales_agent(user_id, user_input, ctx))
if tasks.get("recommend"):
coroutines.append(async_recommend_agent(user_id, user_input, ctx))
results = await asyncio.gather(*coroutines, return_exceptions=True)
for name, result in zip(["order", "after_sales", "recommend"], results):
if isinstance(result, Exception):
ctx.error_log.append(f"{name} failed: {str(result)}")
else:
bus.update_context(user_id, name, result)
return await async_reply_synthesizer(ctx)
异步化后,双11压测QPS从85撑到240,P99延迟从8.2s降到3.1s。代价是之前用同步写的DB操作全部要改async SQLAlchemy(选sqlalchemy.ext.asyncio),工作量不小。
双11实际表现
双11当天峰值QPS 190(接近预估的200),系统扛住了。但中间出了一次事故——外部物流API突然超时率飙升(从0.3%到15%),导致OrderAgent大量超时重试,Bus里积累了大量未完成session,内存占用从2GB涨到8GB。好在设置了15分钟的session过期清理,没有OOM。事后复盘,应该在Agent层加熔断:外部依赖出问题时,直接返回“物流信息暂时无法查询,请稍后重试”,而不是不断重试拖垮整个系统。
上线三个月后,我被迫重写了50%的代码——聊聊可维护性的代价
系统上线的前三个月,一切看起来挺美好。然后业务方开始提“小需求”:
- “用户退货时如果购买了运费险,退款金额要自动加上理赔金额。”
- “大件家具的退货和普通商品不一样,需要先派师傅上门取件,不能直接让用户寄回。”
- “如果用户是VIP,退款审核流程要跳过,直接原路返回。”
- “推荐Agent要根据用户最近7天的浏览记录做推荐,不只是购买记录。”
每个需求看起来都“就加个if判断”,但加起来之后,代码变成了屎山。最核心的问题在于:流水线是硬编码的,每加一个分支就要在流水线里加新的Step,或者让Step内部做分支判断。三个月后,refund_pipeline从5个Step膨胀到12个Step,step3(计算退款金额)内部有了17个if-elif分支。
我试图让Agent分担更多决策——把“是否为VIP”这种判断交给Agent,而不是硬编码。但Agent的决策准确率只有89%(VIP判断的prompt很容易被用户语言误导,比如用户说“我是老客户了,能不能快点”,Agent就以为他是VIP)。
最终我做了一个艰难的决定:把流水线重构为“决策树 + 可插拔策略模式”。不是所有逻辑都让Agent做,也不是所有都硬编码,而是把“需要确定性”的部分用代码写死,把“需要灵活性”的部分暴露为策略接口:
from abc import ABC, abstractmethod
class RefundStrategy(ABC):
"""退款策略——不同场景走不同的流水线"""
@abstractmethod
def execute(self, order_data: Dict, return_items: List) -> Dict:
pass
class NormalRefund(RefundStrategy):
def execute(self, order_data, return_items):
# 标准退货流程:查库存 → 计算金额 → 匹配地址 → 写单
pipeline = step_check_inventory | step_calculate_normal | step_match_address | step_create_refund
return pipeline.invoke({"order": order_data, "items": return_items})
class InsuranceRefund(RefundStrategy):
def execute(self, order_data, return_items):
# 有运费险:标准流程 + 理赔计算
pipeline = step_check_inventory | step_calculate_with_insurance | step_match_address | step_create_refund
return pipeline.invoke({"order": order_data, "items": return_items})
class VIPRefund(RefundStrategy):
def execute(self, order_data, return_items):
# VIP用户:跳过库存检查(直接入库),金额按原价退
pipeline = step_calculate_vip_full | step_match_address | step_create_refund
return pipeline.invoke({"order": order_data, "items": return_items})
class FurnitureRefund(RefundStrategy):
def execute(self, order_data, return_items):
# 大件家具:不寄回,先派师傅上门 → 检查完毕再退款
pipeline = step_schedule_pickup | step_await_inspection | step_calculate | step_create_refund
return pipeline.invoke({"order": order_data, "items": return_items})
# Agent负责判断用哪个策略,然后执行策略
def handle_refund_v2(user_input: str):
agent_result = after_sales_agent.invoke({"input": user_input})
# Agent在extraction阶段需要返回"策略类型"
strategy_type = agent_result.get("strategy", "normal") # normal/insurance/vip/furniture
strategy_map = {
"normal": NormalRefund(),
"insurance": InsuranceRefund(),
"vip": VIPRefund(),
"furniture": FurnitureRefund()
}
strategy = strategy_map.get(strategy_type, NormalRefund())
return strategy.execute(agent_result["order_data"], agent_result["return_items"])
这个重构花了两个周末,但之后加新退货场景只需要新增一个Strategy类,无需改动已有代码。Agent的职责从“执行退款”变成了“分类退款场景 + 提取参数”,准确率要求从100%降低到了90%(因为即使分错类,也只是走了次优流程,不会出错,只是用户体验差一点)。
说实话,我一开始低估了这个系统的迭代速度。如果一开始就用策略模式,至少省下200小时的重构时间。但话说回来,没有前面三个月和业务的磨合,我也不知道该抽象出哪些策略。架构设计这事,没有银弹,只有交够学费后的后见之明。
折腾半年,我学到的三件事(和一些没说出口的想法)
这个项目从第一天写代码到双11扛住峰值,断断续续搞了半年。代码仓库从零涨到12000行(Python 8500行 + 测试2200行 + 配置/脚本1300行),期间重构了3次。回头看,有三件事是我真金白银换来的教训。
第一件事:Agent的边界不是“能做什么”,而是“不能做错什么”
一开始我把Agent当成万能遥控器,所有决策都让它做。后来发现LLM在确定性逻辑上(金额计算、状态判断、流程顺序)的失误率是传统代码的10倍以上。最终的分工是:确定性逻辑(计算、校验、状态机)用代码写死;不确定性逻辑(意图理解、参数提取、语气润色)交给Agent;两者之间通过结构化数据(模板生成的JSON、策略模式的结果)交接。
这个分层不是完美的——它增加了代码量和维护成本。但在错误成本极高的客服场景(一次错误退款可能损失几百元+用户信任),这个trade-off是必须做的。
第二件事:Agent间协作的“正确姿势”是共享数据,不是传递消息
最开始我照着LangChain官方的Multi-Agent示例做,Agent之间通过对话历史传递信息。结果发现传递过程中的信息损耗严重——每个Agent都用自己的方式总结上一手的输出,关键细节丢得七七八八。消息总线(共享上下文对象)解决了这个问题:所有Agent往同一个结构体里写数据,也从这个结构体里读数据。缺点是耦合度高(所有Agent都依赖SharedContext的结构),但在3个Agent的规模下,这种耦合比信息损耗更可接受。
如果Agent数量超过5个,我可能会换成Event-driven + CQRS模式,每个Agent只订阅自己关心的事件。但那是另一个数量级的复杂度了,我这个项目用不上。
第三件事:别相信框架的“最佳实践”,它在你的场景下大概率不是最优的
LangChain的文档建议用AgentExecutor + ToolCallingAgent来处理一切Agent任务。我老老实实用了一个月,后来发现AgentExecutor的默认重试逻辑(遇到解析错误就重试,最多3次)在我们的高延迟场景下是灾难——一个退款请求因为物流API超时,被重试了3次,总共耗时28秒。最后我把AgentExecutor的部分功能(工具调用、输出解析)拆开自己写,重试逻辑改为“工具调用失败不重试,直接降级返回缓存结果”,延迟问题大幅改善。
类似的情况还有Memory管理(ConversationBufferMemory用久了token爆炸,改成了ConversationSummaryBufferMemory但summary有时曲解用户原意,最后自己实现了一个滑动窗口+关键信息提取的混合Memory)。
我不是说LangChain不好——它省了我很多时间(尤其是工具定义和Chain拼接)。但它是一个通用框架,而每个业务场景都有自己的特殊约束。把框架当工具箱用,别当说明书用。
最后说点没说出口的想法。折腾这半年,最大的感触是:用LLM做复杂业务流程编排,难点从来不是AI技术,而是软件工程的基本功。怎么设计模块边界、怎么管理状态、怎么处理异常、怎么做性能优化——这些才是决定系统能不能用的关键。而AI社区的很多文章过于聚焦如何写好prompt、如何选模型,忽视了这些“枯燥”的工程问题。这行干了十年,我发现越新的技术,越需要老派的工程思维来兜底。LLM给了我们前所未有的语义理解能力,但如果底座是混乱的架构和糟糕的状态管理,这能力只会让系统更快的崩溃。
好了,吐槽结束。代码库里有几个模块我没展开(推荐Agent的协同过滤实现、ReplySynthesizer的话术模板系统、Redis缓存的过期策略),如果有人感兴趣我后续再写。有问题的可以直接在评论区问,我看到了会回。