30秒速览
- 死锁检测别塞进LLM的prompt里,它分不清慢和卡死的区别,得用Wait-For Graph做确定性检测
- 心跳机制把死锁发现从2秒压到0.6秒,关键是心跳要携带等待资源信息,不只是"我还活着"
- 解锁Agent用LLM生成补偿计划挺靠谱,但必须挂安全校验器,不然容易出数据不一致的骚操作
- 混沌实验真的会翻车——两阶段提交在补偿事务里虽然重但值得,部分执行导致的漏操作坑了我两天
- 安全校验器投入了60%的开发时间,拦截了4.6%的危险操作,这个ROI比写业务代码高多了
凌晨三点,订单系统突然不干活了
那是上周四凌晨,我被运维电话叫醒。打开监控面板一看,订单处理吞吐量从每秒300笔掉到了3笔,几乎停滞。不是数据库挂了,不是Redis炸了,而是我们的多Agent订单处理系统陷入了大规模死锁——16个Agent互相等待对方释放资源,像早高峰十字路口四个方向的车都往前顶了一米,结果谁也动不了。
这事让我回想起十年前刚入行时,在分布式数据库里第一次碰到两阶段锁死锁的场景。当时Oracle检测到死锁就随机杀掉一个事务,粗暴但有效。这么多年过去,我们在LangGraph上搭建的订单处理流水线,本质上遇到了同一个问题,只是这次”事务”变成了Agent之间通过图状态传递的上下文,”锁”变成了对共享资源(库存、优惠券、用户额度)的独占访问。
我们的订单系统每天处理十几万笔订单,长链路包括验仓、风控、优惠计算、支付路由、库存扣减、物流分配六个阶段,每个阶段由独立的Agent负责。这些Agent通过LangGraph的状态图串联,正常情况下走得很顺:
# 简化版订单处理图——看起来人畜无害对吧?错了
from langgraph.graph import StateGraph, END
from typing import TypedDict
class OrderState(TypedDict):
order_id: str
user_id: str
items: list
warehouse_locked: bool
coupon_applied: bool
payment_authorized: bool
stock_reserved: bool
graph = StateGraph(OrderState)
# 六个节点按顺序执行,看起来是完美的DAG
graph.add_node("warehouse_check", warehouse_agent)
graph.add_node("risk_assessment", risk_agent)
graph.add_node("coupon_apply", coupon_agent)
graph.add_node("payment_route", payment_agent)
graph.add_node("stock_reserve", stock_agent)
graph.add_node("logistics_assign", logistics_agent)
# 线性依赖,没毛病
graph.add_edge("warehouse_check", "risk_assessment")
graph.add_edge("risk_assessment", "coupon_apply")
graph.add_edge("coupon_apply", "payment_route")
graph.add_edge("payment_route", "stock_reserve")
graph.add_edge("stock_reserve", "logistics_assign")
graph.add_edge("logistics_assign", END)
问题出在哪?出在优惠券Agent和支付路由Agent都需要访问用户额度服务,而我们为了性能给这个服务加了分布式锁——谁先拿到谁先用,用完释放。正常情况下Agent按顺序执行,A用完B才用,相安无事。但一旦有异常(网络抖动导致锁没释放、Agent内部异常没走到finally、或者超时配置不合理),下一个Agent就开始傻等。更可怕的是,LangGraph默认的节点执行是同步阻塞的——一个节点卡住,整条链路瘫痪。
更头疼的是,我们还做了并发优化:为了提高吞吐,我们对同一用户的不同订单允许并行处理(毕竟验仓和风控可以独立做)。这意味着两个订单的Agent可能同时争抢同一个用户的额度锁。死锁概率直接指数级上升。
那天凌晨就是典型的死锁场景:订单A的优惠券Agent拿到了用户额度锁,在等支付路由的响应;订单B的支付路由Agent在等同一个用户额度锁,而优惠券Agent已经提前占了一个库存锁。两个订单互相等对方释放锁,加上中间有个Redis节点短暂不可用导致锁超时没触发——经典的环形等待。
别指望LLM自己去解死锁,它连自己卡住了都不知道
我最开始的想法很天真:让Agent自己判断是不是死锁了,然后做决策。我们在每个Agent的prompt里加了一段:”如果发现等待某个资源超过5秒,检查是否是死锁,如果是则释放自己持有的锁并重试。”
这个方案上线两天就翻车了。LLM对”死锁”的判断极其不稳定,有时候网络慢个2秒它就觉得锁死了,主动释放锁——结果另一个Agent还在用那个锁,数据一致性直接炸裂。更离谱的是,有一次GPT-4产生了一段”死锁分析报告”,洋洋洒洒分析了2000字为什么它认为自己在死锁、应该如何预防,唯独没有真正去释放锁。prompt里写的”释放自己持有的锁”被它理解成了”写一篇关于释放锁的文章”。
踩坑记录:不要试图把分布式系统的核心可靠性问题交给LLM的推理能力。LLM在Agent里的职责是理解业务意图和做决策,但死锁检测这类确定性逻辑必须用确定性的代码实现。LLM可以参与补偿事务的设计(比如理解订单失败后需要回滚哪些操作),但检测和触发机制必须外挂。
最终我们的方案是:在LangGraph图执行引擎外部挂载一个独立的死锁检测器,它不依赖LLM,纯用超时计时器和资源等待图来判断。一旦检测到死锁,触发专门的”解锁Agent”执行补偿事务。这个解锁Agent可以用LLM(因为它只需要理解补偿逻辑,不需要实时性),但检测必须是确定性的。
基于超时的死锁检测器:用Wait-For Graph搞定环形等待
分布式系统经典的死锁检测算法就那么几个:超时法、Wait-For Graph(WFG)、路径推算法(Path-Pushing)、边缘追踪法(Edge-Chasing)。我们场景的特点是有中心化的图执行引擎(LangGraph本身),这意味着我们可以全局观察到所有Agent的等待关系——这恰好是构建全局Wait-For Graph的前提。
核心思路:每个Agent在请求共享资源前,向检测器注册”我在等什么资源和锁”;获取资源后注册”我现在持有哪些锁”。检测器维护两个映射表:
waits_for: Agent A 在等资源 R 被释放holds: 资源 R 当前被 Agent B 持有
然后每2秒跑一次环检测——在有向图中找环,有环就是死锁。这个在数据结构课上学过,DFS着色法O(V+E)复杂度,对于几十个Agent几百个资源完全够用。
# 死锁检测器核心实现
import threading
import time
from collections import defaultdict
from enum import Enum
class ResourceType(Enum):
USER_CREDIT_LOCK = "user_credit"
COUPON_LOCK = "coupon"
STOCK_LOCK = "stock"
WAREHOUSE_LOCK = "warehouse"
class WaitForGraph:
"""维护Agent之间的等待关系,定期检测环"""
def __init__(self):
# agent_name -> set of resources it's waiting for
self.waits_for = defaultdict(set)
# resource_key -> agent_name that holds it
self.holds = {}
self.lock = threading.Lock()
# 死锁历史记录,用于避免重复处理
self.deadlock_history = set()
def register_wait(self, agent_id: str, resource_key: str):
"""Agent说自己开始等某个资源"""
with self.lock:
self.waits_for[agent_id].add(resource_key)
def register_hold(self, agent_id: str, resource_key: str):
"""Agent说自己持有了某个资源"""
with self.lock:
self.holds[resource_key] = agent_id
def release(self, agent_id: str, resource_key: str = None):
"""Agent释放资源(正常释放或强制释放都走这个)"""
with self.lock:
if resource_key:
if resource_key in self.holds:
del self.holds[resource_key]
self.waits_for[agent_id].discard(resource_key)
else:
# 释放该Agent所有相关记录
self.waits_for.pop(agent_id, None)
to_delete = [k for k, v in self.holds.items() if v == agent_id]
for k in to_delete:
del self.holds[k]
def detect_cycles(self) -> list:
"""DFS着色法检测有向图中的环,返回死锁环的Agent列表"""
with self.lock:
# 构建邻接表:如果A在等资源R,且R被B持有,则存在A->B的边
graph = defaultdict(list)
for agent_a, resources in self.waits_for.items():
for res in resources:
if res in self.holds:
agent_b = self.holds[res]
if agent_b != agent_a: # 避免自环(自己等自己,虽然不是死锁但也是问题)
graph[agent_a].append(agent_b)
# DFS检测环
WHITE, GRAY, BLACK = 0, 1, 2
color = {node: WHITE for node in graph}
cycles = []
def dfs(node, path):
color[node] = GRAY
path.append(node)
for neighbor in graph[node]:
if color[neighbor] == GRAY:
# 找到环,从path中提取环路
cycle_start = path.index(neighbor)
cycle = tuple(sorted(path[cycle_start:]))
if cycle not in self.deadlock_history:
cycles.append(cycle)
elif color[neighbor] == WHITE:
dfs(neighbor, path)
path.pop()
color[node] = BLACK
for node in graph:
if color[node] == WHITE:
dfs(node, [])
# 记录已处理的死锁环,避免重复处理
for cycle in cycles:
self.deadlock_history.add(cycle)
return cycles
这里有个细节:deadlock_history 用来去重。因为检测器每2秒跑一次,同一个死锁环可能连续几轮都被检测到,如果每次都触发解锁逻辑会造成冗余操作。但这里有个坑——如果解锁失败(比如网络问题导致补偿事务没执行成功),死锁还在,但已经被历史记录过滤了。所以加了个TTL逻辑,超过30秒的死锁记录自动过期:
# 加上过期机制的改进版
from datetime import datetime, timedelta
class WaitForGraph:
def __init__(self):
# ... 前面的代码 ...
# deadlock_history 改成带时间戳的
self.deadlock_history = {} # cycle_tuple -> detected_at
self.history_ttl = timedelta(seconds=30)
def detect_cycles(self) -> list:
# ... 省略DFS代码 ...
now = datetime.now()
# 清理过期的历史记录
expired = [k for k, v in self.deadlock_history.items()
if now - v > self.history_ttl]
for k in expired:
del self.deadlock_history[k]
for cycle in cycles:
if cycle not in self.deadlock_history:
self.deadlock_history[cycle] = now
return cycles
这个检测器作为独立线程跑在LangGraph引擎旁边,不侵入业务逻辑。Agent通过一个轻量的客户端库来注册等待和持有事件:
# Agent使用的资源管理客户端
class ResourceManager:
"""每个Agent节点调用这个来管理锁资源"""
def __init__(self, agent_id: str, wfg: WaitForGraph):
self.agent_id = agent_id
self.wfg = wfg
self.held_resources = []
def acquire(self, resource_type: ResourceType, resource_id: str,
timeout_ms: int = 5000) -> bool:
"""尝试获取资源锁,带超时"""
resource_key = f"{resource_type.value}:{resource_id}"
# 先注册等待意向——这很重要,让检测器知道我在等
self.wfg.register_wait(self.agent_id, resource_key)
# 实际获取锁(这里用Redis分布式锁举例)
lock = redis_client.lock(resource_key, timeout=timeout_ms / 1000)
acquired = lock.acquire(blocking=True, blocking_timeout=timeout_ms / 1000)
if acquired:
# 拿到锁了,更新状态:不再等,改为持有
self.wfg.register_hold(self.agent_id, resource_key)
self.held_resources.append((resource_key, lock))
else:
# 超时没拿到,从等待列表中移除
self.wfg.release(self.agent_id, resource_key)
return acquired
def release_all(self):
"""释放所有持有的资源"""
for resource_key, lock in self.held_resources:
try:
lock.release()
except:
pass # 锁可能已经过期了
self.wfg.release(self.agent_id, resource_key)
self.held_resources.clear()
超时值的设置是个经验活。设太短,正常网络抖动就被判定为死锁;设太长,死锁持续时间长,影响面大。我们的订单链路峰值QPS是300,每个Agent处理平均耗时200ms,所以超时设5秒——给10倍正常处理时间的buffer。这个数字不是拍脑袋的,是根据P99延迟(1.8秒)乘以3算的。后面混沌测试验证了这个值在大多数场景下合理。
自动化解锁Agent:让补偿事务不是事后补救而是自动触发
检测到死锁只是第一步,关键是解锁。粗暴的做法是随机kill掉环中的一个Agent并回滚它的操作。但我们做的是电商订单,有些操作不能简单回滚——优惠券已经核销了、库存已经预占了、支付已经授权了。必须执行补偿事务,而这些补偿逻辑本身就有业务复杂度。
最开始的解锁Agent是一个硬编码的规则引擎:根据死锁环中每个Agent的操作类型,查表执行对应的补偿操作。维护到第3个版本时已经难以管理了——我们的业务规则每周都在变,补偿逻辑也跟着变,规则引擎的配置文件变成了3000行的YAML地狱。
后来我们把解锁Agent改成了LLM驱动的,但加了三道保险:
- 解锁Agent不直接操作数据库,只生成补偿操作列表
- 补偿操作列表经过安全校验器(检查是否会导致余额负数、重复退款等)
- 实际执行由确定性的补偿执行器来做
这样LLM的职责被限定在”理解死锁上下文并制定补偿计划”,而数据安全性由代码保证。实际效果出乎意料好:LLM生成的补偿计划比人工维护的规则覆盖了更多边缘情况,比如它能理解”虽然A订单的优惠券应该回滚,但如果B订单可以复用这张优惠券(同一用户),就不需要回滚,直接转移锁的持有者更高效”——这种灵活的决策靠规则引擎很难穷举。
# 解锁Agent的核心逻辑
import json
from langgraph.prebuilt import ToolNode
class DeadlockResolver:
"""检测到死锁后,生成并执行补偿计划"""
def __init__(self, llm, safety_checker, executor):
self.llm = llm
self.safety_checker = safety_checker
self.executor = executor
def resolve(self, deadlock_cycle: list, wfg: WaitForGraph) -> dict:
"""
deadlock_cycle: 死锁环中的Agent列表
返回: 解锁结果
"""
# Step 1: 收集死锁上下文
context = self._collect_context(deadlock_cycle)
# Step 2: 让LLM生成补偿计划
compensation_plan = self._generate_plan(context)
# Step 3: 安全校验(这一层绝对不能跳过!)
validated_plan = self.safety_checker.validate(compensation_plan)
if validated_plan["blocked"]:
# 如果校验不通过,走fallback:随机选一个Agent完整回滚
return self._fallback_rollback(deadlock_cycle[0])
# Step 4: 执行补偿计划
result = self.executor.execute(validated_plan)
# Step 5: 清理死锁检测器状态
for agent_id in deadlock_cycle:
wfg.release(agent_id)
return result
def _generate_plan(self, context: dict) -> dict:
"""用LLM生成补偿计划,注意prompt的设计"""
prompt = f"""你是订单系统的死锁解锁专家。当前发现以下Agent形成死锁:
死锁上下文:
{json.dumps(context, indent=2, ensure_ascii=False)}
请制定补偿计划,目标是打破死锁的同时最小化业务损失。
规则:
1. 优先考虑转移锁而不是回滚(例如优惠券可以转给死锁环中的另一个订单)
2. 如果必须回滚,优先回滚进度最少的订单(减少补偿成本)
3. 库存锁如果已经预占,尽量保留(因为库存是稀缺资源)
4. 支付授权是最重的操作,尽量避免回滚已授权的支付
输出JSON格式的补偿计划:
{{
"strategy": "lock_transfer" | "partial_rollback" | "full_rollback",
"target_agent": "被选中的Agent名",
"actions": [
{{"type": "release_lock", "resource": "...", "agent": "..."}},
{{"type": "rollback_coupon", "coupon_id": "...", "order_id": "..."}}
]
}}
"""
response = self.llm.invoke(prompt)
# 这里用LangChain的结构化输出解析器保证格式正确
return json.loads(response.content)
安全校验器是整个解锁链路里最重要的组件。LLM有时候会”创造性”地提出一些危险操作,比如有一次它建议”把库存锁直接删除”来解决死锁——这会导致超卖。校验器会检查:
- 释放锁的操作,对应的Agent确实持有该锁
- 回滚优惠券时,优惠券确实被该订单使用且未过期
- 库存回滚不会导致库存数超过仓库容量
- 支付退款金额不超过原支付金额
- 任何操作执行后,相关数据保持一致性(用预演模式模拟执行并检查不变式)
# 安全校验器的关键逻辑
class CompensationSafetyChecker:
"""在执行补偿计划前,验证其安全性"""
def validate(self, plan: dict) -> dict:
issues = []
for action in plan.get("actions", []):
if action["type"] == "release_lock":
# 验证锁的持有者确实是目标Agent
lock_info = redis_client.get(f"lock:{action['resource']}")
if lock_info and lock_info.get("holder") != action["agent"]:
issues.append(f"锁{action['resource']}不是由{action['agent']}持有,不能强制释放")
elif action["type"] == "rollback_coupon":
# 验证优惠券状态
coupon = db.query("SELECT * FROM coupon_usage WHERE coupon_id=?",
action["coupon_id"])
if coupon and coupon["status"] == "used_by_other":
issues.append(f"优惠券{action['coupon_id']}已被其他订单使用,无法回滚")
elif action["type"] == "restore_stock":
# 预演库存恢复后的值
current_stock = db.query("SELECT quantity FROM inventory WHERE sku=?",
action["sku"])
after_restore = current_stock + action["quantity"]
if after_restore > MAX_STOCK_PER_WAREHOUSE:
issues.append(f"库存恢复后超过仓库容量上限")
if issues:
return {"blocked": True, "reasons": issues}
# 预演模式:在沙箱中模拟执行,检查最终状态
simulation_result = self._simulate(plan)
if simulation_result["inconsistencies"]:
return {"blocked": True, "reasons": simulation_result["inconsistencies"]}
return {"blocked": False, "plan": plan}
说实话,安全校验器占了整个解锁模块开发时间的60%。LLM生成计划那部分两天就搞定了,但把所有可能的危险场景枚举出来并建立校验规则,花了整整两周。这个投入是值得的——上线三个月,解锁Agent处理了237次死锁,安全校验器拦截了11次潜在的数据不一致操作,拦截率4.6%。如果没有这层防护,这11次问题都可能演变成严重的线上事故。
心跳机制让死锁发现从被动变主动
前面说的死锁检测依赖中心化的Wait-For Graph分析,每2秒跑一次。这意味着最坏情况下死锁可以持续2秒才被发现。对于高峰期每秒300笔订单的系统,2秒意味着600个订单可能被牵连。能不能更快发现?
我们给每个长时间运行的Agent加了心跳机制。思路是:Agent在执行关键操作(获取锁、等待外部服务、执行补偿)时,定期向心跳服务上报自己的状态。心跳服务维护一个”健康度”计数器,如果某个Agent超过N秒没有心跳,就标记为”疑似死锁”,并主动触发检测。
这个设计参考了分布式系统中常见的lease机制,但做了简化。Agent的心跳不是简单的”我还活着”,而是携带了当前等待的资源信息——这样心跳服务本身就可以做初步的死锁判断,不需要等到定时检测。
# Agent心跳客户端
import asyncio
import time
class AgentHeartbeat:
"""每个Agent实例的心跳管理器"""
def __init__(self, agent_id: str, heartbeat_service_url: str):
self.agent_id = agent_id
self.service_url = heartbeat_service_url
self.heartbeat_interval = 1.0 # 每秒一次心跳
self.running = False
# 记录最后一次成功操作的时间,用于外部判断
self.last_activity = time.time()
async def start(self):
"""启动心跳协程"""
self.running = True
while self.running:
try:
await self._send_heartbeat()
await asyncio.sleep(self.heartbeat_interval)
except Exception as e:
# 心跳失败不能导致Agent崩溃
logging.warning(f"心跳发送失败: {e}")
async def _send_heartbeat(self):
"""发送心跳,携带当前状态"""
payload = {
"agent_id": self.agent_id,
"timestamp": time.time(),
"current_operation": self._get_current_operation(),
"waiting_for": self._get_waiting_resources(),
"idle_time": time.time() - self.last_activity,
"state": "RUNNING" | "WAITING" | "BLOCKED"
}
# 这里用aiohttp发送HTTP心跳到心跳聚合服务
async with aiohttp.ClientSession() as session:
await session.post(f"{self.service_url}/heartbeat", json=payload)
def update_activity(self):
"""Agent完成了某个操作,更新活跃时间"""
self.last_activity = time.time()
心跳聚合服务端维护一个滑动窗口,记录每个Agent最近N次心跳。如果发现某个Agent连续3次心跳都处于WAITING状态且等待的资源没有变化,就判定为”疑似死锁”,立即触发WFG检测。
# 心跳服务端的死锁预判逻辑
class HeartbeatAggregator:
def __init__(self, wfg: WaitForGraph):
self.agent_heartbeats = defaultdict(list) # agent_id -> list of heartbeats
self.suspicion_threshold = 3 # 连续几次相同等待状态就怀疑死锁
self.wfg = wfg
def receive_heartbeat(self, heartbeat: dict):
agent_id = heartbeat["agent_id"]
self.agent_heartbeats[agent_id].append(heartbeat)
# 只保留最近10次心跳
if len(self.agent_heartbeats[agent_id]) > 10:
self.agent_heartbeats[agent_id].pop(0)
# 检查是否连续等待同一资源
recent = self.agent_heartbeats[agent_id][-self.suspicion_threshold:]
if len(recent) >= self.suspicion_threshold:
all_waiting = all(h["state"] == "WAITING" for h in recent)
same_resource = all(
h.get("waiting_for") == recent[0].get("waiting_for")
for h in recent
)
if all_waiting and same_resource and recent[0].get("waiting_for"):
# 疑似死锁!触发立即检测而不是等定时器
logging.warning(f"Agent {agent_id} 疑似死锁,资源: {recent[0]['waiting_for']}")
cycles = self.wfg.detect_cycles()
if cycles:
# 触发解锁流程
asyncio.create_task(trigger_deadlock_resolution(cycles))
心跳机制上线后,死锁平均发现时间从1.8秒降到了0.6秒。这看起来提升不大,但在高峰期的实际意义是:从影响约540个订单降到影响约180个订单。而且心跳数据还附带了一个好处——我们可以做死锁预警。通过分析历史心跳数据,我们发现某些Agent组合在高峰期有15%的概率发生死锁,于是提前给这些Agent分配独立的资源池,预防性降低了死锁概率。
混沌工程下的韧性测试:主动制造死锁来验证系统免疫力
写完死锁检测和解锁机制后,说实话我心里没底。这些代码在正常情况下跑得挺好,但真实死锁场景往往伴随网络抖动、服务降级、数据库慢查询等并发问题。我需要在可控的混沌实验里验证:死锁检测器会不会误判?解锁Agent在高压下会不会引入新问题?补偿事务执行到一半失败了怎么办?
我们用Chaos Mesh搭建了混沌实验环境。测试集群是一个1:1的生产环境副本,包含32个Agent Pod、Redis集群、MySQL主从、Kafka消息队列。实验设计了三类故障注入:
| 实验场景 | 注入方式 | 预期结果 | 实际结果 |
|---|---|---|---|
| Redis主节点宕机导致锁未释放 | kill Redis master进程,观察sentinel切换 | 检测器在10秒内发现死锁,解锁Agent补偿释放锁 | 检测耗时6.3秒,解锁成功,但期间有23个订单进入重试队列 |
| 网络分区导致Agent间通信中断 | iptables阻断特定Pod间的TCP流量 | 心跳机制触发预警,WFG检测环并解锁 | 预警触发耗时2.1秒,检测发现3个死锁环全部解锁成功 |
| 数据库慢查询导致Agent超时 | 对MySQL注入200ms延迟 | 大量Agent进入等待状态但不形成死锁,检测器不会误判 | 误判率0%(没有把正常等待误判为死锁),但3个Agent触发了超时重试 |
| 补偿事务执行到一半时Agent崩溃 | 在解锁Agent执行补偿第三步时kill Pod | 补偿事务的幂等性设计保证重试不会造成双重扣减 | 翻车了!详见下文 |
第四个场景翻车了。我们模拟了解锁Agent在执行补偿事务到一半时崩溃——具体来说,它已经回滚了优惠券,但还没来得及释放库存锁就被kill了。重新调度后,解锁Agent重新执行补偿计划,发现优惠券已经被回滚过(我们的补偿执行器记录了操作日志),跳过了这一步。但它不知道原先释放库存锁的操作还没执行,直接标记为”完成”——结果库存锁一直在那没人释放,死锁又复现了。
这个bug的根本原因是:我们的补偿执行器支持幂等(通过操作日志去重),但在”部分执行”场景下,未执行的操作被错误地跳过了。修复方案是改为两阶段执行:
# 改进后的补偿执行器:两阶段提交
class TwoPhaseCompensationExecutor:
"""用两阶段思想保证补偿事务的原子性"""
def execute(self, plan: dict) -> dict:
plan_id = self._generate_plan_id(plan)
# Phase 1: 准备阶段——检查所有操作是否可执行,预留资源
prepared = []
for action in plan["actions"]:
try:
# 检查操作是否已经被执行过(通过操作日志的幂等键)
if self._is_already_done(plan_id, action):
continue # 跳过已执行的操作
# 预留执行所需资源(例如锁定要回滚的优惠券防止被其他进程使用)
reservation = self._prepare(action)
prepared.append((action, reservation))
except Exception as e:
# 准备失败,取消所有已准备的操作
self._cancel_prepared(prepared)
raise
# Phase 2: 提交阶段——真正执行所有操作
executed = []
try:
for action, reservation in prepared:
result = self._commit(action, reservation)
# 记录操作日志,标记为已完成
self._record_completion(plan_id, action, result)
executed.append(action)
except Exception as e:
# 提交失败——这里就很棘手了
# 已经执行的操作需要回滚吗?取决于操作是否幂等
# 对于非幂等操作,需要执行反向补偿
self._rollback_executed(executed, plan_id)
raise
return {"status": "completed", "executed_actions": len(executed)}
两阶段提交在分布式事务里是出了名的复杂和性能差,但用在补偿事务上其实还好——补偿事务本身就是低频操作(死锁概率本来就不高),而且我们的操作数量通常不超过5个。牺牲一点性能换来原子性保证,在这个场景下完全值得。
混沌实验跑了三轮,总共触发了47次死锁,自动化解锁成功43次,成功率91.5%。失败的4次中,有2次是网络完全隔离导致解锁Agent也无法访问数据库(这种情况只能人工介入),另外2次是补偿事务中涉及第三方支付回调的超时——支付服务商的回调接口不幂等,我们不敢重复调用。
这些实验数据让我们对系统韧性有了量化的认知。之前PM问”死锁对业务影响有多大”,我只能说”应该不大”。现在我可以给出具体数字:在正常故障率下,死锁引起的订单失败率是0.07%,自动解锁成功率91.5%,人工介入的平均恢复时间是4分钟。这些数据也成为我们SLA承诺的基准。
这些经验不只在LangGraph里有用
写完这套体系后,我发现它解决的其实是多Agent系统的一个通用问题:有状态Agent之间的资源竞争。LangGraph提供了状态图这个抽象,让Agent间依赖关系变得显式化,但同时也让资源等待模式更集中、死锁概率更高——因为所有Agent共享同一个状态对象,任何节点卡住都会阻塞整个图的执行。
如果你也在用类似的框架(AutoGen、CrewAI、甚至自己用Celery搭的Agent流水线),以下几条经验值得参考:
- 死锁检测必须外挂,别指望Agent自省。 Agent的LLM没有全局视角,它看到的是一个点,而检测器看到的是整个图。
- 超时值要基于生产数据的百分位延迟来设定。 我们设的是P99延迟的3倍。别用固定值,每两周根据监控数据自动调整一次。
- 解锁逻辑是业务逻辑,不是基础设施逻辑。 不同的业务有不同的事务边界和补偿策略,把解锁Agent当成一个普通的业务Agent来设计和测试。
- 安全校验器投入再多时间都不过分。 解锁Agent本质上在做”破坏性操作”——它在强制终止其他Agent的操作。没有校验器,你的系统不是在解决死锁,而是在制造数据不一致。
- 混沌实验不是可选项,是验证死锁处理的唯一手段。 生产环境的死锁场景往往伴随多重故障叠加,仅靠单元测试和集成测试覆盖不了。
有人问我为什么不直接用无锁设计或者乐观锁来从根本避免死锁。说实话我们试过。订单链路的资源类型太多(库存、优惠、额度、运力),有些资源之间确实存在天然的依赖顺序。乐观锁在低竞争场景下很好用,但双11期间同一个用户的订单可能同时来20多笔,乐观锁的重试风暴能把数据库打挂。分布式锁+死锁检测这套方案在有状态多Agent系统中,是一个现实的折衷——接受了死锁可能发生,但保证了发生后能快速恢复。
未来我们计划把死锁检测器的Wait-For Graph持久化到ETCD,这样即使检测器本身挂了重启也不会丢失等待关系。另外解锁Agent的LLM prompt持续优化空间很大——目前用的还是few-shot,计划把历史上成功的237个解锁案例做成RAG知识库,让LLM在生成补偿计划时参考相似的历史案例。这个工作已经在路上了。
写这篇文章最想传递的是:多Agent系统的可靠性工程是个新领域,很多问题是分布式系统老问题在Agent范式下的重新演绎。死锁、脑裂、惊群、雪崩——这些名词不会因为用了LangGraph或者AutoGen就消失。框架帮你解决了编排问题,但容错设计还得自己扛。