我用三个LangChain Agent搭了个“多部门协作”的智能客服,踩坑踩到怀疑人生

2026年4月27日

去年年底,我接手了一个棘手项目——给一家日活3万左右的家居电商平台重构客服系统。原本的系统就是个简单的关键词匹配,用户问“我的床什么时候到”,它回“您好,请问有什么可以帮助您的”,气得用户直接打投诉电话。老板下了死命令:新系统要能查订单、能退换货、能推荐替代商品,还得像个正常人一样把话说清楚。

起初我想偷懒,就扔给一个GPT-4o,把功能描述、数据库schema全塞进system prompt。结果第一天就翻车了:用户问“我上周买的床头柜能退吗”,模型直接编了个订单号出来,还信誓旦旦地说“您的订单号是ORD-2024-0888,退款预计3个工作日到账”。实际上这订单根本不存在,用户也没买过床头柜。客服主管发截图给我,配文:“你家AI是诈骗犯?”

这事让我意识到,单靠一个LLM做端到端推理是找死。复杂业务流程需要结构化拆解——得有专门的“角色”负责查询订单、专门的角色处理退款、专门的角色生成回复。正好那段时间LangChain的Agent框架更新了0.3版本,Tool装饰器和RunnableBranch的组合拳看起来很对味,我决定用多Agent编排重新搞。

这篇文章记录的就是整个架构设计、踩坑填坑的过程。不教基础概念,只聊实战中真正管用的东西。

30秒速览

  • 多Agent架构解决单Agent幻觉问题但对延迟有帮助,gpt-4o-mini做路由够了
  • 退款流程硬编码流水线,准确率拉到100%,LLM只管润色语气
  • 消息总线(共享上下文)比Agent间传话靠谱,但耦合度高
  • 别信框架最佳实践,你的业务场景会教你做人
  • Agent边界在于“不能错的事”,把确定性逻辑用代码写死

一个Agent搞不定的事,三个Agent就能搞定?别天真了

我先说结论:多Agent架构确实解决了单Agent幻觉和高延迟的问题,但引入的协调成本比你想象的高一个数量级。这半年我踩的坑,70%都在Agent之间的通信上。

先看业务需求拆解。这个客服系统要处理三类请求:

  • 订单查询:用户报手机号或订单号,查出订单状态、物流信息。听起来简单,但实际订单状态有17种(待付款、待发货、已发货、运输中、派送中、已签收、已拒收、部分退货、全部退货、退款中、已退款、换货中、已换货、异常、已取消、已完成、超时关闭),每种要返回不同的话术。
  • 售后处理:发起退款、换货、维修。涉及库存检查、金额计算、退货地址匹配。一个退款请求要写3张表:退款单、退货入库单、财务流水。
  • 商品推荐:根据用户购买记录和当前咨询意图,推荐关联商品。用协同过滤+LLM二次排序,不能让模型瞎编不存在的SKU。

我最初的设计是搞一个超级Agent,把所有工具函数(查订单、查库存、写退款单、调推荐接口)全挂上去。理论上LLM可以自己决策调用哪个工具。实际跑起来延迟高到爆炸——每次决策要发1次LLM call拆解意图,再发1次call选择工具,再发1次call组织回复。一个退款请求三个round trip,用户等了8秒才收到回复。老板的原话:“这比人工客服还慢。”

而且OpenAI的function calling在处理5个以上工具时,出错率显著上升。我统计了100次测试:挂7个工具时,选错工具的概率是8%;挂10个工具时飙升到23%。尤其“查订单”和“查退款单”两个工具,Schema太像了,模型经常搞混。

折腾了一周,我改成了三个专职Agent各管一摊:

Agent 职责 挂载工具数 平均决策时间
OrderAgent 订单查询、物流追踪 3个 1.2s
AfterSalesAgent 退款、换货、维修 5个 1.8s
RecommendAgent 商品推荐、优惠匹配 4个 1.5s

但别以为拆开就万事大吉。Agent之间怎么交接任务?用户说“我想退掉上周买的枕头,再看看有没有更舒服的推荐”,这需要AfterSalesAgent和RecommendAgent协同。我最开始用了一个无比愚蠢的方式——在Supervisor里写if-else:


# 千万别这么干——硬编码路由逻辑
def stupid_router(user_input: str) -> str:
    if "退" in user_input or "换" in user_input:
        return "after_sales"
    elif "推荐" in user_input or "有什么" in user_input:
        return "recommend"
    elif "订单" in user_input or "物流" in user_input:
        return "order"
    else:
        return "order"  # 默认走订单

上线第一天就炸了。用户说“我不退,我就问问退货政策”,被硬生生路由到AfterSalesAgent,Agent一看“不退”直接懵了,开始瞎编政策。更离谱的是“推荐”和“订单”同时出现时,路由随机选一个,把另一半需求丢了。

老老实实上LLM路由。LangChain 0.3的RunnableBranch可以定义条件分支,但我的路由逻辑太复杂(要考虑意图、上下文、历史对话),简单分支不够用。最后用了一个RouterAgent,专门负责意图分类和任务分发:


from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from typing import Literal

# RouterAgent的意图分类prompt——注意我加了“不确定时走order”的兜底
ROUTER_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """你是客服路由专家。分析用户意图,返回单一类别:
- order: 查询订单状态、物流信息
- after_sales: 退款、换货、维修、投诉
- recommend: 商品推荐、找相似产品
- hybrid: 同时包含多个意图(此时返回hybrid,我会并行处理)

规则:
1. 询问政策和流程属于after_sales,不是order
2. 抱怨但不要求动作属于after_sales
3. 比较商品属于recommend
4. 不确定时返回order(至少能查订单,给用户一个交代)

只返回类别名称,不要解释。
"""),
    ("user", "{input}"),
    ("placeholder", "{history}")
])

router_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)  # 路由必须temperature=0
router_chain = ROUTER_PROMPT | router_llm | StrOutputParser()

这里有个关键决策:RouterAgent用gpt-4o-mini而不是gpt-4o。理由是路由任务不需要强推理能力(分类准确率mini和4o只差1.2%),但延迟从800ms降到200ms,成本降到1/10。对于每次对话都要调一次的路由来说,这个trade-off很值。

OrderAgent的定义过程:把“订单查询”拆成17种状态的决策树

OrderAgent看起来最简单——不就是查数据库返回结果吗?我一开始也这么想,然后就踩了最大的坑。

问题出在订单状态的复杂性上。数据库里order_status字段是枚举值:pending_payment, pending_shipment, shipped, in_transit, out_for_delivery, signed, rejected, partial_return, full_return, refunding, refunded, exchanging, exchanged, abnormal, cancelled, completed, closed。一共17种。

老板的要求是:“物流到哪了要精确到城市”、“退款到哪一步了要说清楚”、“异常订单要告诉用户原因”。这意味着OrderAgent不能只返回状态码,要根据状态码+上下文生成人性化的回复。

我第一版直接把17种状态的解释全写进system prompt,结果prompt膨胀到1200个token,Agent反应慢不说,还经常搞混“refunding”(退款中)和“refunded”(已退款)。用户问“钱什么时候到账”,Agent查出来是refunding状态,回复说“您的退款已到账”——用户查了银行卡发现没到账,反手就是一个投诉。

重构方案:把状态解释逻辑从prompt里抽出来,变成结构化数据+模板引擎:


# order_status_template.py —— 状态到回复模板的映射
# 每个状态对应一个模板函数,接收order对象,返回自然语言
from datetime import datetime
from typing import Dict, Any

def _format_logistics(order: Dict[str, Any]) -> str:
    """物流信息格式化——根据物流商API返回的轨迹动态生成"""
    traces = order.get("logistics_traces", [])
    if not traces:
        return "物流信息暂未更新,请稍后再查。"
    latest = traces[-1]
    return f"您的快递已到达【{latest['location']}】,{latest['status']}。预计{order.get('estimated_delivery', '近期')}送达。"

STATUS_TEMPLATES = {
    "pending_payment": lambda o: f"订单{o['order_id']}待付款,请在{o.get('payment_deadline', '24小时内')}完成支付,否则将自动取消。",
    "pending_shipment": lambda o: f"订单{o['order_id']}已支付,仓库正在备货,预计{o.get('ship_time', '1-3个工作日')}内发货。",
    "shipped": lambda o: f"订单{o['order_id']}已发货,{_format_logistics(o)}",
    "in_transit": lambda o: f"订单{o['order_id']}运输中,{_format_logistics(o)}",
    "out_for_delivery": lambda o: f"订单{o['order_id']}正在派送中,快递员{o.get('courier_name', '')},电话{o.get('courier_phone', '')}。",
    "signed": lambda o: f"订单{o['order_id']}已于{o.get('sign_time', '')}签收。",
    "refunding": lambda o: f"订单{o['order_id']}退款处理中,预计{o.get('refund_estimate', '3-7个工作日')}到账。",
    "refunded": lambda o: f"订单{o['order_id']}已退款{o.get('refund_amount', '')}元,请查收。若未到账请联系银行。",
    "abnormal": lambda o: f"订单{o['order_id']}状态异常:{o.get('abnormal_reason', '请联系人工客服')}。",
    "cancelled": lambda o: f"订单{o['order_id']}已取消" + (f",原因:{o.get('cancel_reason', '')}" if o.get('cancel_reason') else "") + "。",
    # ... 其他状态同理
}

def generate_status_reply(order: Dict[str, Any]) -> str:
    """根据订单状态调用对应的模板函数"""
    status = order.get("status", "unknown")
    template_func = STATUS_TEMPLATES.get(status)
    if template_func:
        return template_func(order)
    return f"订单{o.get('order_id', '未知')}状态:{status}。如有疑问请联系人工客服。"

这样一来,OrderAgent的工具函数返回的不是原始状态码,而是已经格式化好的文本。Agent的职责变成了:调用查订单工具 → 拿到格式化回复 → 结合对话历史微调语气(比如用户很着急就加安抚的话)。prompt从1200 token砍到400 token,决策速度提升明显。

但模板方案也有缺点——17个模板函数写了将近300行代码,维护成本不低。后来业务加了“冷链物流”和“大件配送”两种特殊物流类型,每种的状态展示又不一样。我试图让LLM直接生成回复,但测试了50次,有8次出现了事实性错误(把“运输中”说成“已发货”,把金额小数点搞错)。对于一个涉及钱的客服场景,8%的错误率不可接受。最后还是老老实实维护模板,但用了一个trick——让LLM在模板基础上做“润色”,而不是“生成”:


# OrderAgent的system prompt片段
# 关键约束:LLM只能润色,不能改事实
ORDER_AGENT_SYSTEM = """你是订单查询助手。

工作流程:
1. 从工具返回的result中提取【订单信息】(这是模板生成的,事实必须保持原样)
2. 如果用户情绪焦虑,在回复前加安抚语(如“我理解您很着急”)
3. 绝对禁止修改:订单号、金额、日期、物流位置、状态描述
4. 可以润色:语气、称谓、表情符号(适度)

错误示例❌:工具返回“订单ORD-12345已退款100元”,你改成“退款已到账”——这是篡改事实
正确示例✅:工具返回“订单ORD-12345已退款100元”,你改成“好的,我查到您的订单ORD-12345已经退款100元啦~请您查收一下银行卡哦”
"""

这个“模板生成事实 + LLM润色表达”的分层策略,在实际业务中跑了一个月,事实错误率从8%降到了0.3%(只有一次把“顺丰”说成了“中通”,是因为物流商API返回的数据本身写错了)。

AfterSalesAgent的“退款决策链”——五个工具怎么串成一条流水线

三个Agent里,AfterSalesAgent是最复杂的。一个退款请求涉及5个步骤:查询原始订单 → 检查库存状态(如果是退货) → 计算退款金额 → 匹配退货地址 → 写入退款单。每个步骤都可能失败,每个失败都要给用户能理解的回复。

我见过太多项目把这种流程写成Agent的“自由发挥”——给5个工具,让LLM自己决策调用顺序。结果就是:同一个退款请求,有时候先查库存再查订单(导致查不到,因为需要订单里的SKU去查库存),有时候跳过了金额计算直接写退款单(退款金额为0被财务系统拒了)。

这里必须上编排。LangChain 0.3的ToolCallingAgent支持定义工具间的依赖关系,但不够直观。我选择了更可控的方案——把5个工具串成一个chain,Agent只负责在chain的起点注入参数,以及在chain的终点处理异常。具体实现用了RunnableSequence:


from langchain_core.runnables import RunnableLambda, RunnableSequence
from langchain_core.tools import tool
from typing import Dict, Any

# ---- 五个工具函数(真实数据库和API调用) ----

@tool
def get_original_order(order_id: str) -> Dict[str, Any]:
    """根据订单ID查询原始订单,返回订单详情和商品列表"""
    # 真实场景:查询MySQL订单表+订单商品表
    # 这里简化——实际用SQLAlchemy查
    order = db_session.query(Order).filter_by(order_id=order_id).first()
    if not order:
        return {"error": f"未找到订单{order_id}"}
    return {
        "order_id": order.order_id,
        "status": order.status,  # 必须为signed/delivered才能退款
        "items": [{"sku": item.sku, "qty": item.qty, "price": item.price} for item in order.items],
        "total_amount": order.total_amount,
        "buyer_phone": order.buyer_phone
    }

@tool
def check_inventory_for_return(sku: str, qty: int) -> Dict[str, Any]:
    """检查库存容量——退货需要仓库有空位"""
    # 调用WMS系统API
    inventory = wms_client.get_available_capacity(sku)
    return {"sku": sku, "can_accept": inventory["available"] >= qty, "available_capacity": inventory["available"]}

@tool
def calculate_refund_amount(order_data: Dict, return_items: list) -> float:
    """计算退款金额,考虑优惠分摊和运费"""
    # 这里逻辑复杂:满减券平摊、运费按重量比例、积分抵扣
    return Calculator.compute(order_data, return_items)

@tool
def match_return_address(sku: str) -> str:
    """根据商品SKU匹配退货仓库地址"""
    # 不同品类退回不同仓库:家具回大件仓,家纺回小件仓
    return warehouse_service.get_address_by_sku(sku)

@tool
def create_refund_order(order_id: str, amount: float, address: str, items: list) -> str:
    """写入退款单到数据库,返回退款单号"""
    refund_id = f"RF{datetime.now().strftime('%Y%m%d%H%M%S')}{order_id[-4:]}"
    # 写入refund_orders表,同时创建return_warehouse_entry单
    db_session.add(RefundOrder(refund_id=refund_id, ...))
    db_session.commit()
    return refund_id

# ---- 把五个工具串成Chain ----

# RunnableLambda包装,让工具可以流式传递上下文
step1 = RunnableLambda(lambda inputs: {
    "order": get_original_order(inputs["order_id"]),
    **inputs
})

step2 = RunnableLambda(lambda inputs: {
    **inputs,
    "inventory_check": [
        check_inventory_for_return(item["sku"], item["qty"]) 
        for item in inputs["order"]["items"] 
        if item["sku"] in [i["sku"] for i in inputs.get("return_items", inputs["order"]["items"])]
    ]
})

step3 = RunnableLambda(lambda inputs: {
    **inputs,
    "refund_amount": calculate_refund_amount(inputs["order"], inputs.get("return_items", inputs["order"]["items"]))
})

step4 = RunnableLambda(lambda inputs: {
    **inputs,
    "return_address": match_return_address(inputs["order"]["items"][0]["sku"])  # 简化为取第一个SKU
})

step5 = RunnableLambda(lambda inputs: {
    **inputs,
    "refund_id": create_refund_order(
        inputs["order"]["order_id"], 
        inputs["refund_amount"], 
        inputs["return_address"],
        inputs.get("return_items", inputs["order"]["items"])
    )
})

refund_pipeline = step1 | step2 | step3 | step4 | step5

这个方案的好处是每一步的输出都是结构化的dict,下一步可以依赖前一步的所有字段。Agent调用时只需要:


# Agent只负责收集参数,启动流水线,处理异常
def handle_refund_with_agent(user_input: str):
    # Agent提取order_id和return_items
    agent_result = after_sales_agent.invoke({"input": user_input})
    
    try:
        pipeline_result = refund_pipeline.invoke({
            "order_id": agent_result["order_id"],
            "return_items": agent_result.get("return_items", [])
        })
        # 拿到refund_id后,让Agent生成用户回复
        return reply_agent.invoke({"refund_id": pipeline_result["refund_id"], "amount": pipeline_result["refund_amount"]})
    except Exception as e:
        # 流水线某一步失败——根据失败步骤给不同的回复
        return handle_pipeline_error(e, agent_result)

我纠结过到底让Agent全权调度还是硬编码流水线。最后选了这种“Agent前置收集参数 + 硬编码流水线执行 + Agent后置生成回复”的混合模式。理由是:

  • 退款流程的步骤顺序是固定的,不存在“动态决策”的空间。让LLM决策调用顺序只会引入不确定性。
  • 但参数收集(识别用户要退哪些商品)确实需要LLM的理解能力,这里让Agent做。
  • 回复生成需要同理心和话术技巧,也是LLM的强项。

上线后数据:退款流程平均处理时间从11秒降到3.2秒(因为少了3次LLM round trip),退款金额计算准确率100%(因为逻辑是写死的),用户满意度评分从3.4升到4.1。代价是代码里多了一个300行的pipeline模块,每次新增退款场景(比如部分退款、运费单独退)都要改流水线代码,而不是改改prompt就行。但我接受这个代价——客户服务的底线是准确性,不是灵活性。

三个Agent的协作机制:从“硬编码交接”到“消息总线”的演变

前面提到,三个Agent之间的任务交接是最大的难题。我在这块花了整整两周,方案换了三次。

第一版:硬编码路由(失败)

就是前面那个if-else的愚蠢方案。不展开了,说多了都是泪。

第二版:Supervisor Agent调度

搞一个Supervisor Agent,把三个子Agent当成工具注册上去。Supervisor分析用户输入,决定调用哪个子Agent,拿到返回后再决定是否调用另一个子Agent,最后汇总回复。

这个方案跑了一个星期,效果还行,但有两个致命问题:

1. 延迟叠加:Supervisor自己决策一次(700ms)→ 调用子Agent(1.2s)→ Supervisor再决策(700ms)→ 调用第二个子Agent(1.5s)→ Supervisor汇总(800ms)。总共4.9秒,用户已经关了对话窗口。

2. 上下文丢失:Supervisor拿到子Agent的返回后,往往只保留最终结果,把中间推理丢了。比如OrderAgent查出来“用户买了3件商品,其中1件已签收”,Supervisor调用AfterSalesAgent退款时,只传了“用户要退款”,AfterSalesAgent又得重新查一遍订单确定退哪件。

我试图用LangChain的共享记忆(ConversationBufferMemory)来传递上下文,但发现不同Agent写入记忆的格式不一致:OrderAgent存的是订单JSON,AfterSalesAgent存的是退款ID,RecommendAgent存的是SKU列表。Supervisor在汇总时需要理解三种格式,复杂度爆炸。

第三版:消息总线 + 共享上下文对象(当前方案)

受微服务架构的启发,我设计了一个轻量级的消息总线。原理很简单:所有Agent不直接通信,而是向一个Bus发布消息,Bus负责路由消息、维护共享上下文、决策任务是否完成。


from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class SharedContext:
    """三个Agent共享的上下文对象——这就是“消息总线”的核心"""
    user_id: str
    original_query: str
    tasks: Dict[str, TaskStatus] = field(default_factory=dict)  # {"order_query": PENDING, "refund": IN_PROGRESS, ...}
    order_data: Optional[Dict] = None  # OrderAgent查询结果
    refund_data: Optional[Dict] = None  # AfterSalesAgent处理结果
    recommend_data: Optional[List] = None  # RecommendAgent推荐结果
    error_log: List[str] = field(default_factory=list)  # 异常记录
    
    def is_all_done(self) -> bool:
        return all(s in [TaskStatus.COMPLETED, TaskStatus.FAILED] for s in self.tasks.values())

class AgentBus:
    """消息总线——管理共享上下文和任务状态"""
    
    def __init__(self):
        self.contexts: Dict[str, SharedContext] = {}  # user_id -> context
    
    def start_session(self, user_id: str, query: str, tasks: Dict[str, TaskStatus]):
        self.contexts[user_id] = SharedContext(
            user_id=user_id, 
            original_query=query,
            tasks=tasks
        )
    
    def update_context(self, user_id: str, agent_name: str, data: Any):
        """Agent完成任务后更新上下文"""
        ctx = self.contexts.get(user_id)
        if not ctx:
            raise ValueError(f"No session for user {user_id}")
        
        if agent_name == "order":
            ctx.order_data = data
            ctx.tasks["order_query"] = TaskStatus.COMPLETED
        elif agent_name == "after_sales":
            ctx.refund_data = data
            ctx.tasks["refund"] = TaskStatus.COMPLETED
        elif agent_name == "recommend":
            ctx.recommend_data = data
            ctx.tasks["recommend"] = TaskStatus.COMPLETED
    
    def get_context(self, user_id: str) -> SharedContext:
        return self.contexts.get(user_id)

# 全局Bus实例
bus = AgentBus()

RouterAgent(替代了Supervisor)的工作流程变成:


def orchestrate(user_id: str, user_input: str, history: list):
    # 1. RouterAgent分类意图
    intent = route_intent(user_input, history)
    
    # 2. 根据意图初始化任务列表
    tasks = {}
    if "order" in intent:
        tasks["order_query"] = TaskStatus.PENDING
    if "after_sales" in intent:
        tasks["refund"] = TaskStatus.PENDING
    if "recommend" in intent:
        tasks["recommend"] = TaskStatus.PENDING
    
    bus.start_session(user_id, user_input, tasks)
    
    # 3. 并行执行独立任务
    import concurrent.futures
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {}
        if tasks.get("order_query"):
            futures["order"] = executor.submit(order_agent_execute, user_id, user_input, bus.get_context(user_id))
        if tasks.get("refund"):
            # refund可能依赖order,但order_query是独立任务,refund内部会先检查bus里的order_data
            futures["after_sales"] = executor.submit(after_sales_agent_execute, user_id, user_input, bus.get_context(user_id))
        if tasks.get("recommend"):
            futures["recommend"] = executor.submit(recommend_agent_execute, user_id, user_input, bus.get_context(user_id))
        
        # 等待所有任务完成
        for name, future in futures.items():
            try:
                result = future.result(timeout=10)  # 10秒超时
                bus.update_context(user_id, name, result)
            except Exception as e:
                bus.contexts[user_id].error_log.append(f"{name} failed: {str(e)}")
                bus.contexts[user_id].tasks[name] = TaskStatus.FAILED
    
    # 4. 汇总Agent根据bus里的所有数据生成最终回复
    ctx = bus.get_context(user_id)
    return reply_synthesizer(ctx)

这个架构的关键设计决策:

  • RouterAgent 不再做串行调度,而是识别意图后一次性把所有任务发出去并行执行。如果AfterSalesAgent需要OrderAgent的结果,它自己从bus里取(用get_context),取不到就自己查。
  • 每个Agent执行时都会检查bus里是否已有数据可用。比如AfterSalesAgent会先检查ctx.order_data,有就直接用,省一次数据库查询。
  • 汇总Agent(ReplySynthesizer)不参与业务逻辑,只负责把所有结果拼成自然语言。

性能提升明显:一个包含订单查询+退款+推荐的复合请求,从串行Supervisor的4.9秒降到2.3秒(并行执行+减少重复查询)。但代码复杂度翻倍了——bus的状态管理、异常处理、超时重试都是新问题。有次bus里某用户session卡在IN_PROGRESS状态一直没清理,内存泄漏导致服务撑了两天就OOM了。后来加了一个定时任务,每15分钟清理超过30分钟的session。

说实话,我也不确定这个消息总线的方案是不是最优解。它看起来很美(微服务思路),但调试地狱——一个请求涉及4个Agent和1个Bus,出问题时得同时翻5份日志。如果有下次,我可能试试直接把三个Agent的能力合并成一个ReAct Agent,但用结构化输出强制拆解步骤。那是另一个话题了。

性能优化:从“能跑就行”到“扛住双11峰值”的差距

系统上线后平稳跑了两个月,日均处理3000次对话,延迟中位数3.2秒,用户勉强接受。但老板说双11要来了,预估峰值QPS是平时的5倍。我做了两轮压测,发现三个瓶颈。

瓶颈1:LLM调用太密

一次对话平均调用LLM 4.3次:RouterAgent路由1次、OrderAgent决策1次、AfterSalesAgent决策1次、ReplySynthesizer汇总1次,加上偶尔失败重试。每次调用gpt-4o平均耗时800ms,4.3次就是3.4秒。

优化策略:

  • RouterAgent换gpt-4o-mini:前面说了,分类准确率只降1.2%,延迟降75%,成本降90%。双11期间每天15万次路由调用,这个差价够买一台MacBook Pro。
  • Agent工具调用改为批处理:原本OrderAgent查询订单时,先调get_order,再根据返回的SKU逐个调check_logistics。改成一次返回所有数据后,Agent决策的round trip从3次降到1次。

# 优化前:两个工具分步调用,Agent需要两轮决策
# Step 1: Agent调用get_order → 返回order基本信息和items列表
# Step 2: Agent分析返回,提取每个item的SKU,调用check_logistics
# Step 3: Agent合并结果,生成回复
# 总共3次LLM decision round + 2次工具调用 = 约3.6s

# 优化后:get_order_with_logistics一次返回全量数据
@tool
def get_order_with_logistics(order_id: str) -> Dict[str, Any]:
    """查询订单+所有商品的物流信息,一次返回"""
    order = db_session.query(Order).filter_by(order_id=order_id).first()
    if not order:
        return {"error": f"订单{order_id}不存在"}
    
    items = []
    for item in order.items:
        logistics = logistics_client.query(item.sku, order_id)  # 批量查物流
        items.append({
            "sku": item.sku,
            "name": item.product_name,
            "qty": item.qty,
            "price": item.price,
            "logistics_status": logistics.get("status", "unknown"),
            "logistics_location": logistics.get("location", ""),
            "estimated_arrival": logistics.get("eta", "")
        })
    
    return {
        "order_id": order.order_id,
        "status": order.status,
        "total": order.total_amount,
        "items": items  # 全量返回,Agent一次决策就够了
    }

# Agent只需要1次LLM call:分析返回数据 → 套用模板 → 生成回复
# 总耗时降到约1.2s
  • 引入缓存:对于订单状态这类变化不频繁的数据,在Agent调用工具前先查Redis缓存。同一个用户30秒内重复问“我的订单到哪了”,直接走缓存,LLM都不用调。

优化后,平均LLM调用次数从4.3降到1.8,延迟从3.2s降到1.5s。

瓶颈2:数据库查询没做连接池

每个Agent工具都是独立连接数据库,用完就关。平时没事,双11压测时QPS一上来,MySQL连接数爆炸(超过max_connections限制)。错误日志全是“Too many connections”。

解决方法很常规,但容易忽视:SQLAlchemy配置连接池,大小设成和Agent并发数匹配。


# 配置连接池——注意pool_size和max_overflow的配比
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    "mysql+pymysql://user:pass@host:3306/customer_service",
    pool_size=20,         # 核心连接数,等于gunicorn worker数
    max_overflow=10,      # 溢出连接数,应对突发
    pool_recycle=3600,    # 1小时后回收连接,防止断开
    pool_pre_ping=True,   # 使用前ping一下,自动重连
    echo=False
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

压测时用20个gunicorn worker,数据库连接稳定在20-25个,没有再爆过。

瓶颈3:Bus的并行执行有资源竞争

Orchestrate函数里用了ThreadPoolExecutor并行调用三个Agent,但每个Agent内部又可能并发调工具(比如AfterSalesAgent的流水线里,check_inventory_for_return要并发调多次)。线程嵌套线程,Python的GIL让CPU跑不满,上下文切换开销反而增大。

换成了asyncio + aiohttp,把Agent的工具调用全部改成异步:


import asyncio
import aiohttp

async def async_orchestrate(user_id: str, user_input: str, history: list):
    intent = await async_route_intent(user_input, history)
    tasks = {}
    if "order" in intent:
        tasks["order_query"] = TaskStatus.PENDING
    if "after_sales" in intent:
        tasks["refund"] = TaskStatus.PENDING
    if "recommend" in intent:
        tasks["recommend"] = TaskStatus.PENDING
    
    bus.start_session(user_id, user_input, tasks)
    ctx = bus.get_context(user_id)
    
    # 用asyncio.gather并行执行
    coroutines = []
    if tasks.get("order_query"):
        coroutines.append(async_order_agent(user_id, user_input, ctx))
    if tasks.get("refund"):
        coroutines.append(async_after_sales_agent(user_id, user_input, ctx))
    if tasks.get("recommend"):
        coroutines.append(async_recommend_agent(user_id, user_input, ctx))
    
    results = await asyncio.gather(*coroutines, return_exceptions=True)
    
    for name, result in zip(["order", "after_sales", "recommend"], results):
        if isinstance(result, Exception):
            ctx.error_log.append(f"{name} failed: {str(result)}")
        else:
            bus.update_context(user_id, name, result)
    
    return await async_reply_synthesizer(ctx)

异步化后,双11压测QPS从85撑到240,P99延迟从8.2s降到3.1s。代价是之前用同步写的DB操作全部要改async SQLAlchemy(选sqlalchemy.ext.asyncio),工作量不小。

双11实际表现

双11当天峰值QPS 190(接近预估的200),系统扛住了。但中间出了一次事故——外部物流API突然超时率飙升(从0.3%到15%),导致OrderAgent大量超时重试,Bus里积累了大量未完成session,内存占用从2GB涨到8GB。好在设置了15分钟的session过期清理,没有OOM。事后复盘,应该在Agent层加熔断:外部依赖出问题时,直接返回“物流信息暂时无法查询,请稍后重试”,而不是不断重试拖垮整个系统。

上线三个月后,我被迫重写了50%的代码——聊聊可维护性的代价

系统上线的前三个月,一切看起来挺美好。然后业务方开始提“小需求”:

  • “用户退货时如果购买了运费险,退款金额要自动加上理赔金额。”
  • “大件家具的退货和普通商品不一样,需要先派师傅上门取件,不能直接让用户寄回。”
  • “如果用户是VIP,退款审核流程要跳过,直接原路返回。”
  • “推荐Agent要根据用户最近7天的浏览记录做推荐,不只是购买记录。”

每个需求看起来都“就加个if判断”,但加起来之后,代码变成了屎山。最核心的问题在于:流水线是硬编码的,每加一个分支就要在流水线里加新的Step,或者让Step内部做分支判断。三个月后,refund_pipeline从5个Step膨胀到12个Step,step3(计算退款金额)内部有了17个if-elif分支。

我试图让Agent分担更多决策——把“是否为VIP”这种判断交给Agent,而不是硬编码。但Agent的决策准确率只有89%(VIP判断的prompt很容易被用户语言误导,比如用户说“我是老客户了,能不能快点”,Agent就以为他是VIP)。

最终我做了一个艰难的决定:把流水线重构为“决策树 + 可插拔策略模式”。不是所有逻辑都让Agent做,也不是所有都硬编码,而是把“需要确定性”的部分用代码写死,把“需要灵活性”的部分暴露为策略接口:


from abc import ABC, abstractmethod

class RefundStrategy(ABC):
    """退款策略——不同场景走不同的流水线"""
    @abstractmethod
    def execute(self, order_data: Dict, return_items: List) -> Dict:
        pass

class NormalRefund(RefundStrategy):
    def execute(self, order_data, return_items):
        # 标准退货流程:查库存 → 计算金额 → 匹配地址 → 写单
        pipeline = step_check_inventory | step_calculate_normal | step_match_address | step_create_refund
        return pipeline.invoke({"order": order_data, "items": return_items})

class InsuranceRefund(RefundStrategy):
    def execute(self, order_data, return_items):
        # 有运费险:标准流程 + 理赔计算
        pipeline = step_check_inventory | step_calculate_with_insurance | step_match_address | step_create_refund
        return pipeline.invoke({"order": order_data, "items": return_items})

class VIPRefund(RefundStrategy):
    def execute(self, order_data, return_items):
        # VIP用户:跳过库存检查(直接入库),金额按原价退
        pipeline = step_calculate_vip_full | step_match_address | step_create_refund
        return pipeline.invoke({"order": order_data, "items": return_items})

class FurnitureRefund(RefundStrategy):
    def execute(self, order_data, return_items):
        # 大件家具:不寄回,先派师傅上门 → 检查完毕再退款
        pipeline = step_schedule_pickup | step_await_inspection | step_calculate | step_create_refund
        return pipeline.invoke({"order": order_data, "items": return_items})

# Agent负责判断用哪个策略,然后执行策略
def handle_refund_v2(user_input: str):
    agent_result = after_sales_agent.invoke({"input": user_input})
    
    # Agent在extraction阶段需要返回"策略类型"
    strategy_type = agent_result.get("strategy", "normal")  # normal/insurance/vip/furniture
    
    strategy_map = {
        "normal": NormalRefund(),
        "insurance": InsuranceRefund(),
        "vip": VIPRefund(),
        "furniture": FurnitureRefund()
    }
    
    strategy = strategy_map.get(strategy_type, NormalRefund())
    return strategy.execute(agent_result["order_data"], agent_result["return_items"])

这个重构花了两个周末,但之后加新退货场景只需要新增一个Strategy类,无需改动已有代码。Agent的职责从“执行退款”变成了“分类退款场景 + 提取参数”,准确率要求从100%降低到了90%(因为即使分错类,也只是走了次优流程,不会出错,只是用户体验差一点)。

说实话,我一开始低估了这个系统的迭代速度。如果一开始就用策略模式,至少省下200小时的重构时间。但话说回来,没有前面三个月和业务的磨合,我也不知道该抽象出哪些策略。架构设计这事,没有银弹,只有交够学费后的后见之明。

折腾半年,我学到的三件事(和一些没说出口的想法)

这个项目从第一天写代码到双11扛住峰值,断断续续搞了半年。代码仓库从零涨到12000行(Python 8500行 + 测试2200行 + 配置/脚本1300行),期间重构了3次。回头看,有三件事是我真金白银换来的教训。

第一件事:Agent的边界不是“能做什么”,而是“不能做错什么”

一开始我把Agent当成万能遥控器,所有决策都让它做。后来发现LLM在确定性逻辑上(金额计算、状态判断、流程顺序)的失误率是传统代码的10倍以上。最终的分工是:确定性逻辑(计算、校验、状态机)用代码写死;不确定性逻辑(意图理解、参数提取、语气润色)交给Agent;两者之间通过结构化数据(模板生成的JSON、策略模式的结果)交接。

这个分层不是完美的——它增加了代码量和维护成本。但在错误成本极高的客服场景(一次错误退款可能损失几百元+用户信任),这个trade-off是必须做的。

第二件事:Agent间协作的“正确姿势”是共享数据,不是传递消息

最开始我照着LangChain官方的Multi-Agent示例做,Agent之间通过对话历史传递信息。结果发现传递过程中的信息损耗严重——每个Agent都用自己的方式总结上一手的输出,关键细节丢得七七八八。消息总线(共享上下文对象)解决了这个问题:所有Agent往同一个结构体里写数据,也从这个结构体里读数据。缺点是耦合度高(所有Agent都依赖SharedContext的结构),但在3个Agent的规模下,这种耦合比信息损耗更可接受。

如果Agent数量超过5个,我可能会换成Event-driven + CQRS模式,每个Agent只订阅自己关心的事件。但那是另一个数量级的复杂度了,我这个项目用不上。

第三件事:别相信框架的“最佳实践”,它在你的场景下大概率不是最优的

LangChain的文档建议用AgentExecutor + ToolCallingAgent来处理一切Agent任务。我老老实实用了一个月,后来发现AgentExecutor的默认重试逻辑(遇到解析错误就重试,最多3次)在我们的高延迟场景下是灾难——一个退款请求因为物流API超时,被重试了3次,总共耗时28秒。最后我把AgentExecutor的部分功能(工具调用、输出解析)拆开自己写,重试逻辑改为“工具调用失败不重试,直接降级返回缓存结果”,延迟问题大幅改善。

类似的情况还有Memory管理(ConversationBufferMemory用久了token爆炸,改成了ConversationSummaryBufferMemory但summary有时曲解用户原意,最后自己实现了一个滑动窗口+关键信息提取的混合Memory)。

我不是说LangChain不好——它省了我很多时间(尤其是工具定义和Chain拼接)。但它是一个通用框架,而每个业务场景都有自己的特殊约束。把框架当工具箱用,别当说明书用。

最后说点没说出口的想法。折腾这半年,最大的感触是:用LLM做复杂业务流程编排,难点从来不是AI技术,而是软件工程的基本功。怎么设计模块边界、怎么管理状态、怎么处理异常、怎么做性能优化——这些才是决定系统能不能用的关键。而AI社区的很多文章过于聚焦如何写好prompt、如何选模型,忽视了这些“枯燥”的工程问题。这行干了十年,我发现越新的技术,越需要老派的工程思维来兜底。LLM给了我们前所未有的语义理解能力,但如果底座是混乱的架构和糟糕的状态管理,这能力只会让系统更快的崩溃。

好了,吐槽结束。代码库里有几个模块我没展开(推荐Agent的协同过滤实现、ReplySynthesizer的话术模板系统、Redis缓存的过期策略),如果有人感兴趣我后续再写。有问题的可以直接在评论区问,我看到了会回。

关于作者

发表评论