LangGraph多Agent协作死锁亲历:订单系统里我靠超时检测和补偿事务救回每天2万笔卡死的交易

2026年5月5日

30秒速览

  • 死锁检测别塞进LLM的prompt里,它分不清慢和卡死的区别,得用Wait-For Graph做确定性检测
  • 心跳机制把死锁发现从2秒压到0.6秒,关键是心跳要携带等待资源信息,不只是"我还活着"
  • 解锁Agent用LLM生成补偿计划挺靠谱,但必须挂安全校验器,不然容易出数据不一致的骚操作
  • 混沌实验真的会翻车——两阶段提交在补偿事务里虽然重但值得,部分执行导致的漏操作坑了我两天
  • 安全校验器投入了60%的开发时间,拦截了4.6%的危险操作,这个ROI比写业务代码高多了

凌晨三点,订单系统突然不干活了

那是上周四凌晨,我被运维电话叫醒。打开监控面板一看,订单处理吞吐量从每秒300笔掉到了3笔,几乎停滞。不是数据库挂了,不是Redis炸了,而是我们的多Agent订单处理系统陷入了大规模死锁——16个Agent互相等待对方释放资源,像早高峰十字路口四个方向的车都往前顶了一米,结果谁也动不了。

这事让我回想起十年前刚入行时,在分布式数据库里第一次碰到两阶段锁死锁的场景。当时Oracle检测到死锁就随机杀掉一个事务,粗暴但有效。这么多年过去,我们在LangGraph上搭建的订单处理流水线,本质上遇到了同一个问题,只是这次”事务”变成了Agent之间通过图状态传递的上下文,”锁”变成了对共享资源(库存、优惠券、用户额度)的独占访问。

我们的订单系统每天处理十几万笔订单,长链路包括验仓、风控、优惠计算、支付路由、库存扣减、物流分配六个阶段,每个阶段由独立的Agent负责。这些Agent通过LangGraph的状态图串联,正常情况下走得很顺:

# 简化版订单处理图——看起来人畜无害对吧?错了
from langgraph.graph import StateGraph, END
from typing import TypedDict

class OrderState(TypedDict):
    order_id: str
    user_id: str
    items: list
    warehouse_locked: bool
    coupon_applied: bool
    payment_authorized: bool
    stock_reserved: bool

graph = StateGraph(OrderState)

# 六个节点按顺序执行,看起来是完美的DAG
graph.add_node("warehouse_check", warehouse_agent)
graph.add_node("risk_assessment", risk_agent)
graph.add_node("coupon_apply", coupon_agent)
graph.add_node("payment_route", payment_agent)
graph.add_node("stock_reserve", stock_agent)
graph.add_node("logistics_assign", logistics_agent)

# 线性依赖,没毛病
graph.add_edge("warehouse_check", "risk_assessment")
graph.add_edge("risk_assessment", "coupon_apply")
graph.add_edge("coupon_apply", "payment_route")
graph.add_edge("payment_route", "stock_reserve")
graph.add_edge("stock_reserve", "logistics_assign")
graph.add_edge("logistics_assign", END)

问题出在哪?出在优惠券Agent和支付路由Agent都需要访问用户额度服务,而我们为了性能给这个服务加了分布式锁——谁先拿到谁先用,用完释放。正常情况下Agent按顺序执行,A用完B才用,相安无事。但一旦有异常(网络抖动导致锁没释放、Agent内部异常没走到finally、或者超时配置不合理),下一个Agent就开始傻等。更可怕的是,LangGraph默认的节点执行是同步阻塞的——一个节点卡住,整条链路瘫痪。

更头疼的是,我们还做了并发优化:为了提高吞吐,我们对同一用户的不同订单允许并行处理(毕竟验仓和风控可以独立做)。这意味着两个订单的Agent可能同时争抢同一个用户的额度锁。死锁概率直接指数级上升。

那天凌晨就是典型的死锁场景:订单A的优惠券Agent拿到了用户额度锁,在等支付路由的响应;订单B的支付路由Agent在等同一个用户额度锁,而优惠券Agent已经提前占了一个库存锁。两个订单互相等对方释放锁,加上中间有个Redis节点短暂不可用导致锁超时没触发——经典的环形等待。

别指望LLM自己去解死锁,它连自己卡住了都不知道

我最开始的想法很天真:让Agent自己判断是不是死锁了,然后做决策。我们在每个Agent的prompt里加了一段:”如果发现等待某个资源超过5秒,检查是否是死锁,如果是则释放自己持有的锁并重试。”

这个方案上线两天就翻车了。LLM对”死锁”的判断极其不稳定,有时候网络慢个2秒它就觉得锁死了,主动释放锁——结果另一个Agent还在用那个锁,数据一致性直接炸裂。更离谱的是,有一次GPT-4产生了一段”死锁分析报告”,洋洋洒洒分析了2000字为什么它认为自己在死锁、应该如何预防,唯独没有真正去释放锁。prompt里写的”释放自己持有的锁”被它理解成了”写一篇关于释放锁的文章”。

踩坑记录:不要试图把分布式系统的核心可靠性问题交给LLM的推理能力。LLM在Agent里的职责是理解业务意图和做决策,但死锁检测这类确定性逻辑必须用确定性的代码实现。LLM可以参与补偿事务的设计(比如理解订单失败后需要回滚哪些操作),但检测和触发机制必须外挂。

最终我们的方案是:在LangGraph图执行引擎外部挂载一个独立的死锁检测器,它不依赖LLM,纯用超时计时器和资源等待图来判断。一旦检测到死锁,触发专门的”解锁Agent”执行补偿事务。这个解锁Agent可以用LLM(因为它只需要理解补偿逻辑,不需要实时性),但检测必须是确定性的。

基于超时的死锁检测器:用Wait-For Graph搞定环形等待

分布式系统经典的死锁检测算法就那么几个:超时法、Wait-For Graph(WFG)、路径推算法(Path-Pushing)、边缘追踪法(Edge-Chasing)。我们场景的特点是有中心化的图执行引擎(LangGraph本身),这意味着我们可以全局观察到所有Agent的等待关系——这恰好是构建全局Wait-For Graph的前提。

核心思路:每个Agent在请求共享资源前,向检测器注册”我在等什么资源和锁”;获取资源后注册”我现在持有哪些锁”。检测器维护两个映射表:

  • waits_for: Agent A 在等资源 R 被释放
  • holds: 资源 R 当前被 Agent B 持有

然后每2秒跑一次环检测——在有向图中找环,有环就是死锁。这个在数据结构课上学过,DFS着色法O(V+E)复杂度,对于几十个Agent几百个资源完全够用。

# 死锁检测器核心实现
import threading
import time
from collections import defaultdict
from enum import Enum

class ResourceType(Enum):
    USER_CREDIT_LOCK = "user_credit"
    COUPON_LOCK = "coupon"
    STOCK_LOCK = "stock"
    WAREHOUSE_LOCK = "warehouse"

class WaitForGraph:
    """维护Agent之间的等待关系,定期检测环"""
    
    def __init__(self):
        # agent_name -> set of resources it's waiting for
        self.waits_for = defaultdict(set)
        # resource_key -> agent_name that holds it
        self.holds = {}
        self.lock = threading.Lock()
        # 死锁历史记录,用于避免重复处理
        self.deadlock_history = set()
        
    def register_wait(self, agent_id: str, resource_key: str):
        """Agent说自己开始等某个资源"""
        with self.lock:
            self.waits_for[agent_id].add(resource_key)
            
    def register_hold(self, agent_id: str, resource_key: str):
        """Agent说自己持有了某个资源"""
        with self.lock:
            self.holds[resource_key] = agent_id
            
    def release(self, agent_id: str, resource_key: str = None):
        """Agent释放资源(正常释放或强制释放都走这个)"""
        with self.lock:
            if resource_key:
                if resource_key in self.holds:
                    del self.holds[resource_key]
                self.waits_for[agent_id].discard(resource_key)
            else:
                # 释放该Agent所有相关记录
                self.waits_for.pop(agent_id, None)
                to_delete = [k for k, v in self.holds.items() if v == agent_id]
                for k in to_delete:
                    del self.holds[k]
                    
    def detect_cycles(self) -> list:
        """DFS着色法检测有向图中的环,返回死锁环的Agent列表"""
        with self.lock:
            # 构建邻接表:如果A在等资源R,且R被B持有,则存在A->B的边
            graph = defaultdict(list)
            for agent_a, resources in self.waits_for.items():
                for res in resources:
                    if res in self.holds:
                        agent_b = self.holds[res]
                        if agent_b != agent_a:  # 避免自环(自己等自己,虽然不是死锁但也是问题)
                            graph[agent_a].append(agent_b)
            
            # DFS检测环
            WHITE, GRAY, BLACK = 0, 1, 2
            color = {node: WHITE for node in graph}
            cycles = []
            
            def dfs(node, path):
                color[node] = GRAY
                path.append(node)
                for neighbor in graph[node]:
                    if color[neighbor] == GRAY:
                        # 找到环,从path中提取环路
                        cycle_start = path.index(neighbor)
                        cycle = tuple(sorted(path[cycle_start:]))
                        if cycle not in self.deadlock_history:
                            cycles.append(cycle)
                    elif color[neighbor] == WHITE:
                        dfs(neighbor, path)
                path.pop()
                color[node] = BLACK
                
            for node in graph:
                if color[node] == WHITE:
                    dfs(node, [])
                    
            # 记录已处理的死锁环,避免重复处理
            for cycle in cycles:
                self.deadlock_history.add(cycle)
                
            return cycles

这里有个细节:deadlock_history 用来去重。因为检测器每2秒跑一次,同一个死锁环可能连续几轮都被检测到,如果每次都触发解锁逻辑会造成冗余操作。但这里有个坑——如果解锁失败(比如网络问题导致补偿事务没执行成功),死锁还在,但已经被历史记录过滤了。所以加了个TTL逻辑,超过30秒的死锁记录自动过期:

# 加上过期机制的改进版
from datetime import datetime, timedelta

class WaitForGraph:
    def __init__(self):
        # ... 前面的代码 ...
        # deadlock_history 改成带时间戳的
        self.deadlock_history = {}  # cycle_tuple -> detected_at
        self.history_ttl = timedelta(seconds=30)
        
    def detect_cycles(self) -> list:
        # ... 省略DFS代码 ...
        now = datetime.now()
        # 清理过期的历史记录
        expired = [k for k, v in self.deadlock_history.items() 
                   if now - v > self.history_ttl]
        for k in expired:
            del self.deadlock_history[k]
            
        for cycle in cycles:
            if cycle not in self.deadlock_history:
                self.deadlock_history[cycle] = now
        return cycles

这个检测器作为独立线程跑在LangGraph引擎旁边,不侵入业务逻辑。Agent通过一个轻量的客户端库来注册等待和持有事件:

# Agent使用的资源管理客户端
class ResourceManager:
    """每个Agent节点调用这个来管理锁资源"""
    
    def __init__(self, agent_id: str, wfg: WaitForGraph):
        self.agent_id = agent_id
        self.wfg = wfg
        self.held_resources = []
        
    def acquire(self, resource_type: ResourceType, resource_id: str, 
                timeout_ms: int = 5000) -> bool:
        """尝试获取资源锁,带超时"""
        resource_key = f"{resource_type.value}:{resource_id}"
        
        # 先注册等待意向——这很重要,让检测器知道我在等
        self.wfg.register_wait(self.agent_id, resource_key)
        
        # 实际获取锁(这里用Redis分布式锁举例)
        lock = redis_client.lock(resource_key, timeout=timeout_ms / 1000)
        acquired = lock.acquire(blocking=True, blocking_timeout=timeout_ms / 1000)
        
        if acquired:
            # 拿到锁了,更新状态:不再等,改为持有
            self.wfg.register_hold(self.agent_id, resource_key)
            self.held_resources.append((resource_key, lock))
        else:
            # 超时没拿到,从等待列表中移除
            self.wfg.release(self.agent_id, resource_key)
            
        return acquired
        
    def release_all(self):
        """释放所有持有的资源"""
        for resource_key, lock in self.held_resources:
            try:
                lock.release()
            except:
                pass  # 锁可能已经过期了
            self.wfg.release(self.agent_id, resource_key)
        self.held_resources.clear()

超时值的设置是个经验活。设太短,正常网络抖动就被判定为死锁;设太长,死锁持续时间长,影响面大。我们的订单链路峰值QPS是300,每个Agent处理平均耗时200ms,所以超时设5秒——给10倍正常处理时间的buffer。这个数字不是拍脑袋的,是根据P99延迟(1.8秒)乘以3算的。后面混沌测试验证了这个值在大多数场景下合理。

自动化解锁Agent:让补偿事务不是事后补救而是自动触发

检测到死锁只是第一步,关键是解锁。粗暴的做法是随机kill掉环中的一个Agent并回滚它的操作。但我们做的是电商订单,有些操作不能简单回滚——优惠券已经核销了、库存已经预占了、支付已经授权了。必须执行补偿事务,而这些补偿逻辑本身就有业务复杂度。

最开始的解锁Agent是一个硬编码的规则引擎:根据死锁环中每个Agent的操作类型,查表执行对应的补偿操作。维护到第3个版本时已经难以管理了——我们的业务规则每周都在变,补偿逻辑也跟着变,规则引擎的配置文件变成了3000行的YAML地狱。

后来我们把解锁Agent改成了LLM驱动的,但加了三道保险:

  • 解锁Agent不直接操作数据库,只生成补偿操作列表
  • 补偿操作列表经过安全校验器(检查是否会导致余额负数、重复退款等)
  • 实际执行由确定性的补偿执行器来做

这样LLM的职责被限定在”理解死锁上下文并制定补偿计划”,而数据安全性由代码保证。实际效果出乎意料好:LLM生成的补偿计划比人工维护的规则覆盖了更多边缘情况,比如它能理解”虽然A订单的优惠券应该回滚,但如果B订单可以复用这张优惠券(同一用户),就不需要回滚,直接转移锁的持有者更高效”——这种灵活的决策靠规则引擎很难穷举。

# 解锁Agent的核心逻辑
import json
from langgraph.prebuilt import ToolNode

class DeadlockResolver:
    """检测到死锁后,生成并执行补偿计划"""
    
    def __init__(self, llm, safety_checker, executor):
        self.llm = llm
        self.safety_checker = safety_checker
        self.executor = executor
        
    def resolve(self, deadlock_cycle: list, wfg: WaitForGraph) -> dict:
        """
        deadlock_cycle: 死锁环中的Agent列表
        返回: 解锁结果
        """
        # Step 1: 收集死锁上下文
        context = self._collect_context(deadlock_cycle)
        
        # Step 2: 让LLM生成补偿计划
        compensation_plan = self._generate_plan(context)
        
        # Step 3: 安全校验(这一层绝对不能跳过!)
        validated_plan = self.safety_checker.validate(compensation_plan)
        if validated_plan["blocked"]:
            # 如果校验不通过,走fallback:随机选一个Agent完整回滚
            return self._fallback_rollback(deadlock_cycle[0])
            
        # Step 4: 执行补偿计划
        result = self.executor.execute(validated_plan)
        
        # Step 5: 清理死锁检测器状态
        for agent_id in deadlock_cycle:
            wfg.release(agent_id)
            
        return result
        
    def _generate_plan(self, context: dict) -> dict:
        """用LLM生成补偿计划,注意prompt的设计"""
        prompt = f"""你是订单系统的死锁解锁专家。当前发现以下Agent形成死锁:

死锁上下文:
{json.dumps(context, indent=2, ensure_ascii=False)}

请制定补偿计划,目标是打破死锁的同时最小化业务损失。
规则:
1. 优先考虑转移锁而不是回滚(例如优惠券可以转给死锁环中的另一个订单)
2. 如果必须回滚,优先回滚进度最少的订单(减少补偿成本)
3. 库存锁如果已经预占,尽量保留(因为库存是稀缺资源)
4. 支付授权是最重的操作,尽量避免回滚已授权的支付

输出JSON格式的补偿计划:
{{
    "strategy": "lock_transfer" | "partial_rollback" | "full_rollback",
    "target_agent": "被选中的Agent名",
    "actions": [
        {{"type": "release_lock", "resource": "...", "agent": "..."}},
        {{"type": "rollback_coupon", "coupon_id": "...", "order_id": "..."}}
    ]
}}
"""
        response = self.llm.invoke(prompt)
        # 这里用LangChain的结构化输出解析器保证格式正确
        return json.loads(response.content)

安全校验器是整个解锁链路里最重要的组件。LLM有时候会”创造性”地提出一些危险操作,比如有一次它建议”把库存锁直接删除”来解决死锁——这会导致超卖。校验器会检查:

  • 释放锁的操作,对应的Agent确实持有该锁
  • 回滚优惠券时,优惠券确实被该订单使用且未过期
  • 库存回滚不会导致库存数超过仓库容量
  • 支付退款金额不超过原支付金额
  • 任何操作执行后,相关数据保持一致性(用预演模式模拟执行并检查不变式)
# 安全校验器的关键逻辑
class CompensationSafetyChecker:
    """在执行补偿计划前,验证其安全性"""
    
    def validate(self, plan: dict) -> dict:
        issues = []
        
        for action in plan.get("actions", []):
            if action["type"] == "release_lock":
                # 验证锁的持有者确实是目标Agent
                lock_info = redis_client.get(f"lock:{action['resource']}")
                if lock_info and lock_info.get("holder") != action["agent"]:
                    issues.append(f"锁{action['resource']}不是由{action['agent']}持有,不能强制释放")
                    
            elif action["type"] == "rollback_coupon":
                # 验证优惠券状态
                coupon = db.query("SELECT * FROM coupon_usage WHERE coupon_id=?", 
                                  action["coupon_id"])
                if coupon and coupon["status"] == "used_by_other":
                    issues.append(f"优惠券{action['coupon_id']}已被其他订单使用,无法回滚")
                    
            elif action["type"] == "restore_stock":
                # 预演库存恢复后的值
                current_stock = db.query("SELECT quantity FROM inventory WHERE sku=?",
                                         action["sku"])
                after_restore = current_stock + action["quantity"]
                if after_restore > MAX_STOCK_PER_WAREHOUSE:
                    issues.append(f"库存恢复后超过仓库容量上限")
                    
        if issues:
            return {"blocked": True, "reasons": issues}
        
        # 预演模式:在沙箱中模拟执行,检查最终状态
        simulation_result = self._simulate(plan)
        if simulation_result["inconsistencies"]:
            return {"blocked": True, "reasons": simulation_result["inconsistencies"]}
            
        return {"blocked": False, "plan": plan}

说实话,安全校验器占了整个解锁模块开发时间的60%。LLM生成计划那部分两天就搞定了,但把所有可能的危险场景枚举出来并建立校验规则,花了整整两周。这个投入是值得的——上线三个月,解锁Agent处理了237次死锁,安全校验器拦截了11次潜在的数据不一致操作,拦截率4.6%。如果没有这层防护,这11次问题都可能演变成严重的线上事故。

心跳机制让死锁发现从被动变主动

前面说的死锁检测依赖中心化的Wait-For Graph分析,每2秒跑一次。这意味着最坏情况下死锁可以持续2秒才被发现。对于高峰期每秒300笔订单的系统,2秒意味着600个订单可能被牵连。能不能更快发现?

我们给每个长时间运行的Agent加了心跳机制。思路是:Agent在执行关键操作(获取锁、等待外部服务、执行补偿)时,定期向心跳服务上报自己的状态。心跳服务维护一个”健康度”计数器,如果某个Agent超过N秒没有心跳,就标记为”疑似死锁”,并主动触发检测。

这个设计参考了分布式系统中常见的lease机制,但做了简化。Agent的心跳不是简单的”我还活着”,而是携带了当前等待的资源信息——这样心跳服务本身就可以做初步的死锁判断,不需要等到定时检测。

# Agent心跳客户端
import asyncio
import time

class AgentHeartbeat:
    """每个Agent实例的心跳管理器"""
    
    def __init__(self, agent_id: str, heartbeat_service_url: str):
        self.agent_id = agent_id
        self.service_url = heartbeat_service_url
        self.heartbeat_interval = 1.0  # 每秒一次心跳
        self.running = False
        # 记录最后一次成功操作的时间,用于外部判断
        self.last_activity = time.time()
        
    async def start(self):
        """启动心跳协程"""
        self.running = True
        while self.running:
            try:
                await self._send_heartbeat()
                await asyncio.sleep(self.heartbeat_interval)
            except Exception as e:
                # 心跳失败不能导致Agent崩溃
                logging.warning(f"心跳发送失败: {e}")
                
    async def _send_heartbeat(self):
        """发送心跳,携带当前状态"""
        payload = {
            "agent_id": self.agent_id,
            "timestamp": time.time(),
            "current_operation": self._get_current_operation(),
            "waiting_for": self._get_waiting_resources(),
            "idle_time": time.time() - self.last_activity,
            "state": "RUNNING" | "WAITING" | "BLOCKED"
        }
        # 这里用aiohttp发送HTTP心跳到心跳聚合服务
        async with aiohttp.ClientSession() as session:
            await session.post(f"{self.service_url}/heartbeat", json=payload)
            
    def update_activity(self):
        """Agent完成了某个操作,更新活跃时间"""
        self.last_activity = time.time()

心跳聚合服务端维护一个滑动窗口,记录每个Agent最近N次心跳。如果发现某个Agent连续3次心跳都处于WAITING状态且等待的资源没有变化,就判定为”疑似死锁”,立即触发WFG检测。

# 心跳服务端的死锁预判逻辑
class HeartbeatAggregator:
    def __init__(self, wfg: WaitForGraph):
        self.agent_heartbeats = defaultdict(list)  # agent_id -> list of heartbeats
        self.suspicion_threshold = 3  # 连续几次相同等待状态就怀疑死锁
        self.wfg = wfg
        
    def receive_heartbeat(self, heartbeat: dict):
        agent_id = heartbeat["agent_id"]
        self.agent_heartbeats[agent_id].append(heartbeat)
        
        # 只保留最近10次心跳
        if len(self.agent_heartbeats[agent_id]) > 10:
            self.agent_heartbeats[agent_id].pop(0)
            
        # 检查是否连续等待同一资源
        recent = self.agent_heartbeats[agent_id][-self.suspicion_threshold:]
        if len(recent) >= self.suspicion_threshold:
            all_waiting = all(h["state"] == "WAITING" for h in recent)
            same_resource = all(
                h.get("waiting_for") == recent[0].get("waiting_for") 
                for h in recent
            )
            
            if all_waiting and same_resource and recent[0].get("waiting_for"):
                # 疑似死锁!触发立即检测而不是等定时器
                logging.warning(f"Agent {agent_id} 疑似死锁,资源: {recent[0]['waiting_for']}")
                cycles = self.wfg.detect_cycles()
                if cycles:
                    # 触发解锁流程
                    asyncio.create_task(trigger_deadlock_resolution(cycles))

心跳机制上线后,死锁平均发现时间从1.8秒降到了0.6秒。这看起来提升不大,但在高峰期的实际意义是:从影响约540个订单降到影响约180个订单。而且心跳数据还附带了一个好处——我们可以做死锁预警。通过分析历史心跳数据,我们发现某些Agent组合在高峰期有15%的概率发生死锁,于是提前给这些Agent分配独立的资源池,预防性降低了死锁概率。

混沌工程下的韧性测试:主动制造死锁来验证系统免疫力

写完死锁检测和解锁机制后,说实话我心里没底。这些代码在正常情况下跑得挺好,但真实死锁场景往往伴随网络抖动、服务降级、数据库慢查询等并发问题。我需要在可控的混沌实验里验证:死锁检测器会不会误判?解锁Agent在高压下会不会引入新问题?补偿事务执行到一半失败了怎么办?

我们用Chaos Mesh搭建了混沌实验环境。测试集群是一个1:1的生产环境副本,包含32个Agent Pod、Redis集群、MySQL主从、Kafka消息队列。实验设计了三类故障注入:

实验场景 注入方式 预期结果 实际结果
Redis主节点宕机导致锁未释放 kill Redis master进程,观察sentinel切换 检测器在10秒内发现死锁,解锁Agent补偿释放锁 检测耗时6.3秒,解锁成功,但期间有23个订单进入重试队列
网络分区导致Agent间通信中断 iptables阻断特定Pod间的TCP流量 心跳机制触发预警,WFG检测环并解锁 预警触发耗时2.1秒,检测发现3个死锁环全部解锁成功
数据库慢查询导致Agent超时 对MySQL注入200ms延迟 大量Agent进入等待状态但不形成死锁,检测器不会误判 误判率0%(没有把正常等待误判为死锁),但3个Agent触发了超时重试
补偿事务执行到一半时Agent崩溃 在解锁Agent执行补偿第三步时kill Pod 补偿事务的幂等性设计保证重试不会造成双重扣减 翻车了!详见下文

第四个场景翻车了。我们模拟了解锁Agent在执行补偿事务到一半时崩溃——具体来说,它已经回滚了优惠券,但还没来得及释放库存锁就被kill了。重新调度后,解锁Agent重新执行补偿计划,发现优惠券已经被回滚过(我们的补偿执行器记录了操作日志),跳过了这一步。但它不知道原先释放库存锁的操作还没执行,直接标记为”完成”——结果库存锁一直在那没人释放,死锁又复现了。

这个bug的根本原因是:我们的补偿执行器支持幂等(通过操作日志去重),但在”部分执行”场景下,未执行的操作被错误地跳过了。修复方案是改为两阶段执行:

# 改进后的补偿执行器:两阶段提交
class TwoPhaseCompensationExecutor:
    """用两阶段思想保证补偿事务的原子性"""
    
    def execute(self, plan: dict) -> dict:
        plan_id = self._generate_plan_id(plan)
        
        # Phase 1: 准备阶段——检查所有操作是否可执行,预留资源
        prepared = []
        for action in plan["actions"]:
            try:
                # 检查操作是否已经被执行过(通过操作日志的幂等键)
                if self._is_already_done(plan_id, action):
                    continue  # 跳过已执行的操作
                    
                # 预留执行所需资源(例如锁定要回滚的优惠券防止被其他进程使用)
                reservation = self._prepare(action)
                prepared.append((action, reservation))
            except Exception as e:
                # 准备失败,取消所有已准备的操作
                self._cancel_prepared(prepared)
                raise
                
        # Phase 2: 提交阶段——真正执行所有操作
        executed = []
        try:
            for action, reservation in prepared:
                result = self._commit(action, reservation)
                # 记录操作日志,标记为已完成
                self._record_completion(plan_id, action, result)
                executed.append(action)
        except Exception as e:
            # 提交失败——这里就很棘手了
            # 已经执行的操作需要回滚吗?取决于操作是否幂等
            # 对于非幂等操作,需要执行反向补偿
            self._rollback_executed(executed, plan_id)
            raise
            
        return {"status": "completed", "executed_actions": len(executed)}

两阶段提交在分布式事务里是出了名的复杂和性能差,但用在补偿事务上其实还好——补偿事务本身就是低频操作(死锁概率本来就不高),而且我们的操作数量通常不超过5个。牺牲一点性能换来原子性保证,在这个场景下完全值得。

混沌实验跑了三轮,总共触发了47次死锁,自动化解锁成功43次,成功率91.5%。失败的4次中,有2次是网络完全隔离导致解锁Agent也无法访问数据库(这种情况只能人工介入),另外2次是补偿事务中涉及第三方支付回调的超时——支付服务商的回调接口不幂等,我们不敢重复调用。

这些实验数据让我们对系统韧性有了量化的认知。之前PM问”死锁对业务影响有多大”,我只能说”应该不大”。现在我可以给出具体数字:在正常故障率下,死锁引起的订单失败率是0.07%,自动解锁成功率91.5%,人工介入的平均恢复时间是4分钟。这些数据也成为我们SLA承诺的基准。

这些经验不只在LangGraph里有用

写完这套体系后,我发现它解决的其实是多Agent系统的一个通用问题:有状态Agent之间的资源竞争。LangGraph提供了状态图这个抽象,让Agent间依赖关系变得显式化,但同时也让资源等待模式更集中、死锁概率更高——因为所有Agent共享同一个状态对象,任何节点卡住都会阻塞整个图的执行。

如果你也在用类似的框架(AutoGen、CrewAI、甚至自己用Celery搭的Agent流水线),以下几条经验值得参考:

  • 死锁检测必须外挂,别指望Agent自省。 Agent的LLM没有全局视角,它看到的是一个点,而检测器看到的是整个图。
  • 超时值要基于生产数据的百分位延迟来设定。 我们设的是P99延迟的3倍。别用固定值,每两周根据监控数据自动调整一次。
  • 解锁逻辑是业务逻辑,不是基础设施逻辑。 不同的业务有不同的事务边界和补偿策略,把解锁Agent当成一个普通的业务Agent来设计和测试。
  • 安全校验器投入再多时间都不过分。 解锁Agent本质上在做”破坏性操作”——它在强制终止其他Agent的操作。没有校验器,你的系统不是在解决死锁,而是在制造数据不一致。
  • 混沌实验不是可选项,是验证死锁处理的唯一手段。 生产环境的死锁场景往往伴随多重故障叠加,仅靠单元测试和集成测试覆盖不了。

有人问我为什么不直接用无锁设计或者乐观锁来从根本避免死锁。说实话我们试过。订单链路的资源类型太多(库存、优惠、额度、运力),有些资源之间确实存在天然的依赖顺序。乐观锁在低竞争场景下很好用,但双11期间同一个用户的订单可能同时来20多笔,乐观锁的重试风暴能把数据库打挂。分布式锁+死锁检测这套方案在有状态多Agent系统中,是一个现实的折衷——接受了死锁可能发生,但保证了发生后能快速恢复。

未来我们计划把死锁检测器的Wait-For Graph持久化到ETCD,这样即使检测器本身挂了重启也不会丢失等待关系。另外解锁Agent的LLM prompt持续优化空间很大——目前用的还是few-shot,计划把历史上成功的237个解锁案例做成RAG知识库,让LLM在生成补偿计划时参考相似的历史案例。这个工作已经在路上了。

写这篇文章最想传递的是:多Agent系统的可靠性工程是个新领域,很多问题是分布式系统老问题在Agent范式下的重新演绎。死锁、脑裂、惊群、雪崩——这些名词不会因为用了LangGraph或者AutoGen就消失。框架帮你解决了编排问题,但容错设计还得自己扛。

关于作者

发表评论