AI+制造业第三个项目:我给生产线上 15 个 Agent 建了共享记忆,结果它们差点把批次号全读脏了

我叫沈青锋,是一个做了八年制造业数字化的连续创业者。现在手上第三个项目,是在一家年产值 40 亿的汽车零部件工厂里,用多智能体系统(Multi-Agent)重构他们的质量追溯和动态排程体系。这篇文章要讲的事情,跟大模型、跟“AI 赋能”没有太大关系——跟分布式数据库、状态同步,还有一次直接导致 47 万损失的生产事故有关。

事情的起点很朴素:工厂有 15 个关键工位,每个工位放了一个 Agent,分别负责质检、设备预测维护、工艺参数微调、异常上报等任务。这些 Agent 都需要读写一个“共享工作记忆”,比如当前批次的生产参数、上一站的质检结果、物料切换时间窗口。一开始我们的想法简单粗暴——搞个中心化的知识库,所有 Agent 读同一个 MongoDB,反正都是内网,延迟能高到哪去?

结果第一天就翻车了。

三个 Agent 几乎同时对同一条批次记录做修改:质检 Agent 写入了“通过”,设备 Agent 在更新刀具寿命,而排程 Agent 正在把这条批次标记为“可发货”。MongoDB 的最后一个写操作覆盖了前两个字段,排程 Agent 读到的质检结果是 null,直接把这个批次放行。后面的事就不难猜了:47 万的零件发到主机厂,全检不合格,整车厂停线,索赔函当天就到了。(延伸阅读:我差点被按量付费送走:一个独立开发者的云端推理成本血泪账本

这件事把我打醒了。多智能体系统的记忆问题,根本不是一个简单的 CRUD 问题——它是一个带并发冲突、脏读、性能退化、网络分区的分布式存储问题。只不过写代码的是我们,出事故的是生产线。

30秒速览

  • - 多Agent共享知识库时,中心化MongoDB的upsert会引发字段级写覆盖和脏读,直接导致产线事故。
  • - 用etcd实现强一致性分布式锁,在每秒数十次写入时p99延迟飙至900ms,不适合工业数据面。
  • - 按字段所有权分片,Agent本地写自己拥有的字段,通过向量时钟检测并发,冲突率从18%降到0.5%。
  • - 基于Gossip的元数据同步+Redis Pub/Sub关键事件广播,实现99%读准确率,p95读延迟110ms。
  • - 安全指令走硬线+局部etcd强一致,统计类查询容忍短暂不一致,ROI来自避免停线和降低硬件成本。

共享记忆的本意是打破信息烟囱,但它差点让整条产线停摆

不是所有的“写”都适合放在一起

出问题之后,我们复盘了整个架构。15 个 Agent 的共享记忆体是一个叫做“产线知识库”的 MongoDB 集合。每个 Agent 在拿到传感数据、视觉检测结果、MES 指令后,会直接对知识库做 upsert。设计时的假设是:Agent 处理的是不同字段,不会冲突。例如质检 Agent 只写 quality_result 字段,设备 Agent 只写 tool_life_remaining 字段,排程 Agent 只写 dispatch_status。

但 MongoDB 的 upsert 是整个文档级别的原子替换,不是字段级。当三个 Agent 并发 upsert 同一文档时,后写入的会将前面的修改直接覆盖——即便它们修改的是不同字段,也会丢数据。这个问题在原型阶段根本暴露不出来,因为原型只有 5 个 Agent,每分钟写入不到 10 次。真的上了 15 个 Agent、写入频率到每秒 60 次之后,冲突率瞬间飙到 18%。

更可怕的是脏读。排程 Agent 读到 quality_result 为 null 的那次,是因为质检 Agent 刚把文档替换成只有 quality_result 字段的版本,还没来得及 merge 其他字段,排程 Agent 就正好读到这个中间态。MongoDB 没有 ACID 事务里的快照隔离,也没有多版本并发控制去保护这种跨字段的写入。我们在那个瞬间,亲手造出了一个数据库教科书里的经典异常——写偏斜(Write Skew)。

从数据库视角看Agent记忆的三类架构

出了这件事后,我没有急着加锁,而是先把几种可能的架构全画出来,拿真实的写入日志推演了一遍。最后我们把多 Agent 记忆架构分成三类:

架构类型 代表实现 一致性 可用性 典型瓶颈
中心化记忆 单一 MongoDB、PostgreSQL、etcd、Redis 单实例 强/可调 低(单点故障) 写入热点、锁争用、网络单点
去中心化记忆 基于 Gossip 的 Agent 本地存储+向量时钟 最终/因果 收敛时间长、元数据开销大
混合架构 所有权分片+中心化元数据目录(如 Redis Cluster)+Gossip 状态同步 因果/分区容忍 中高 元数据目录的扩展性

我们原来的 MongoDB 单实例,就是最典型的中心化记忆——而且是没开事务的版本。etcd 我们其实也试过,后面会讲。去中心化和混合架构是我们最后落地的方向。这个表的每一种架构,我们都在生产影子环境里跑了至少一周,数据的来源不是模拟,是真实产线的回放流量。(延伸阅读:给研发流水线加AI审查门禁,第一个月我们差点把主分支锁死

我们花了两周试图用 etcd 解决强一致记忆,结果把 Agent 全部卡死在锁上

锁的粒度一放开,延迟就崩了

出事故之后,团队的直觉反应是:上分布式锁+事务。我们选了 etcd,因为它有 Raft 共识、线性一致性读、租约机制,看起来完全能搞定 Agent 并发写入的问题。我们给每个批次文档分配一个 key,在写入前先申请 etcd 的租约锁,保证同一时间只有一个 Agent 能改这个批次。

方案上线到影子环境的第一小时,看起来一切正常,写入冲突降到了 0。但当我们把 Agent 数量从 15 扩到 25,写入频率提到每秒 80 次时,etcd 的 p99 put 延迟直接从 5ms 飙到 900ms。锁等待时间的中位数涨到 400ms,很多 Agent 在 etcd 的租约续期上就开始超时,导致锁被意外释放,又引入了新的竞争。

原因很简单:etcd 是为控制面设计的,不是为高频数据面读写设计的。它的推荐 QPS 在单 key 上不超过几十次,我们一个批次 document 有 15 个 Agent 盯着写,瞬间把 etcd 打成了瓶颈。更糟糕的是,etcd 的线性一致性读需要过半节点确认,网络一有抖动,读延迟就成倍飙升。有几回工厂车间环网交换机偶发丢包,直接导致三个 Agent 连续读超时,以为自己持有的锁丢了,发起重试风暴,最终把整个 etcd 集群拖垮。

强一致在产线里的真实代价

我们那两周的试错不是白费的。它让我弄清楚了一件事:多 Agent 系统的记忆一致性,本质上是在一致性、延迟、吞吐量之间做权衡。工厂环境里,网络不像数据中心那么可靠,车间经常有电磁干扰、交换机老化,强一致性带来的锁开销和延迟放大效应,会比实验室严酷一个数量级。

我把一致性分了四级模型,用来指导后面的设计:

一致性级别 含义 典型机制 适用场景
强一致性 所有 Agent 随时看到相同的最新值 分布式锁+Raft/Paxos、线性一致性读 急停指令、安全互锁
因果一致性 有因果关系的操作按顺序可见 向量时钟、Lamport 时间戳、版本向量 质检结果→放行决策
最终一致性 停止写入后,最终收敛到相同值 Gossip 同步、CRDT、最后写入胜出(LWW) 设备运行统计、OEE 计算
会话一致性 同一个 Agent 会话内读写自己的写入 本地缓存+版本检查 单个 Agent 内部的参数调整

这个分法不是我发明的,是分布式系统里的常识,但我把它对应到了多 Agent 的具体行为上。强一致是拿钱和延迟换出来的,不能一上来就全用。

从分布式数据库偷师——给 Agent 记忆做分片和所有权协商

所有权协议:谁产生的数据谁负责写

复盘之后我们发现,85% 的写入冲突其实是不必要的。质检 Agent 不会改变工具寿命,设备 Agent 也不应该改质检结果。真正会跨 Agent 修改同一个字段的需求,只有少数几个:批次状态流转、异常挂起标记、全局参数同步。绝大多数场景,我们可以把文档按“字段所有权”拆开。(延伸阅读:我用Copilot Agent给10万行Java单体画了张依赖图,生成的拆分方案差点让CTO以为我通宵了三个月

于是我们设计了一套记忆所有权协议。核心规则很朴素:

  • 每个字段有明确的 owner agent ID,只有一个 Agent 能在正常状态下写它。
  • 非 owner 的 Agent 只能读,需要通过一个版本向量确认自己看到的是最新版本。
  • 如果一个 Agent 需要改属于别人的字段,必须发一个“协商请求”,得到应答后才能写。

实现上,我们没去造一个完整的 CRDT 库,而是把 MongoDB 的一个批次文档拆成多个 sub-document,每个 sub-document 归一个 Agent 管理,存在它自己的本地 RocksDB 里。本地写,然后通过 Gossip 扩散。只有协商请求走一个轻量级的中心化消息队列(Redis Stream)。

这一下子,写冲突率从 18% 掉到了 0.5%,因为绝大多数写都是本地写。而那 0.5% 的冲突,就是真正的跨 Agent 状态流转,比如“质检不通过”要改“可发货”标记——这是业务上无法消除的竞争,需要冲突解决算法。

用向量时钟追踪“谁先谁后”

对于那 0.5% 的跨所有者写入,我们不能再用最后写入胜出(LWW)了,因为 LWW 会丢掉重要的状态。比如设备 Agent 基于刀具寿命判断需要停机,同时质检 Agent 刚发现了微小裂纹也要求停机,两个请求的停机原因不同,后续处理流程不一样,不能随便丢掉一个。

我们给每个 sub-document 维护了一个向量时钟,形式是 { agent_id: counter }。Agent 每次写入时,先读取当前所有相关 sub-document 的向量时钟,合并为自己的时钟后加一。当多个修改并发到达时,通过比较向量时钟可以判断是因果先后,还是并发冲突:

def is_concurrent(vc_a, vc_b):
    # 如果 A 的每个计数器都小于等于 B,且存在至少一个严格小于,则 A happens-before B
    a_leq_b = all(vc_a.get(k, 0) <= vc_b.get(k, 0) for k in set(vc_a) | set(vc_b))
    b_leq_a = all(vc_b.get(k, 0) B"
    elif b_leq_a and not a_leq_b:
        return False, "B->A"
    elif a_leq_b and b_leq_a:
        return False, "equal"
    else:
        return True, "concurrent"

当检测到并发时,我们不自动合并,而是把冲突推送给一个专门的“仲裁 Agent”,由它根据业务规则做决定。仲裁 Agent 不是每时每刻都在跑,它只在检测到并发时才被消息触发,开销非常小。

基于 Gossip 的元数据同步:没有中心节点,状态照样对齐

为什么非要选 Gossip

所有权分片解决写冲突,但读怎么办?如果一个 Agent 想读别人的字段,它必须知道哪个 Agent 持有最新版本,还要拿到最新的值。如果每次读都去请求 owner Agent,那就变成了远程过程调用,延迟上去了,而且 owner 一挂,整个链路就断。(延伸阅读:Rust 1.85 异步闭包如何让我扔掉连接池里的 Arc:一个架构师的三个月迁移复盘

我们做了个折中:每个 Agent 除了本地写自己的 sub-document,还周期性地通过 Gossip 协议把自己的版本摘要(包括向量时钟、更新时间戳、字段名列表)扩散给其他 Agent。这样,每个 Agent 都能维持一张“元数据视图”,知道每个字段的最新版本在哪个 Agent 那里。实际读的时候,如果本地缓存足够新,就直接读本地;如果不够新,才去 owner 处拉取最新值。

Gossip 的扩散频率是动态的,根据写入频率调整。每 200ms 到 1s 一次。我们用了带反熵的 Gossip,每轮每个 Agent 随机选择两个其他 Agent,交换自己已知的元数据摘要。新加入的 Agent 或刚从故障恢复的节点,能通过反熵很快追齐。

import random
import time
import json
from collections import defaultdict

# 每个Agent维护的本地元数据视图
# 结构: { agent_id: { field_name: { "vc": {...}, "timestamp": ..., "digest": ... } } }
class GossipMetaStore:
    def __init__(self, agent_id, peers):
        self.agent_id = agent_id
        self.local_view = defaultdict(dict)
        self.peers = peers
        self.snapshot_version = {}

    def update_local(self, field_name, vc, value_digest):
        """本地写入后更新自己的视图"""
        self.local_view[self.agent_id][field_name] = {
            "vc": vc,
            "timestamp": time.time(),
            "digest": value_digest
        }
        self.snapshot_version[self.agent_id] = self.snapshot_version.get(self.agent_id, 0) + 1

    def gossip_round(self):
        """执行一次Gossip交换"""
        if len(self.peers)  existing["vc"]:
                self.local_view[source_agent][field] = info

这段代码是我们在影子环境里跑的原型骨架,实际部署时用了 gRPC 和 Protobuf 做消息序列化,每个 Agent 的 Gossip 端口独立,避免抢占业务流量。

Redis Pub/Sub 做实时事件广播,加速收敛

Gossip 虽然鲁棒,但收敛有延迟,平均需要 O(log N) 轮。对于某些关键事件,比如急停、批次作废,我们等不了几秒。我们在所有权分片之上加了一个 Redis Pub/Sub 通道,专门广播“关键事件”。事件消息体包含被修改字段的向量时钟、新值哈希、操作类型。

Agent 收到 Pub/Sub 消息后,立即拿本地的元数据视图做对比:如果自己的视图版本落后,就马上向 owner 拉取最新值;如果版本相当,就应用更新。这个机制让关键事件的传播延迟稳定在 80ms 以内,比纯 Gossip 快了 10 倍以上。

但 Redis Pub/Sub 没有持久化,丢消息怎么办?我们设计了一个补洞机制:每个 Agent 在每次 Gossip 轮次中,如果发现自己的元数据版本与邻居相差超过阈值,就主动向邻居拉取完整字段值,而不是等下一轮。这样即使 Pub/Sub 消息丢了,也能在 2 秒内补齐。(延伸阅读:我用GPT-5.5和Claude 4.8合成了一千张“无害”图片,差点在投资人面前把自己产品搞崩

下面是一段基于 Redis Pub/Sub 和向量时钟的会话状态对齐代码,可以直接跑(需要 Redis 服务):

import redis
import json
import time
import hashlib
from threading import Thread

class AgentMemory:
    def __init__(self, agent_id, redis_host='localhost', redis_port=6379):
        self.agent_id = agent_id
        self.redis_client = redis.Redis(host=redis_host, port=redis_port)
        self.pubsub = self.redis_client.pubsub()
        self.pubsub.subscribe('agent:critical')
        self.local_store = {}
        self.vc = {}  # 向量时钟 {agent_id: counter}
        self.running = True
        self.listen_thread = Thread(target=self._listen_critical)
        self.listen_thread.start()

    def _listen_critical(self):
        for msg in self.pubsub.listen():
            if not self.running:
                break
            if msg['type'] == 'message':
                data = json.loads(msg['data'])
                self._apply_remote(data)

    def _apply_remote(self, remote_data):
        field = remote_data['field']
        remote_vc = remote_data['vc']
        value_digest = remote_data['digest']
        owner = remote_data['owner']
        # 如果远程更新,拉取值
        if self._is_newer(remote_vc, self.local_store.get(field, {}).get('vc', {})):
            # 模拟从 owner 拉取实际值(这里简化直接使用摘要演示)
            self.local_store[field] = {'vc': remote_vc, 'value': f"data_from_{owner}", 'digest': value_digest}
            self.vc = self._merge_vc(self.vc, remote_vc)

    def write_local(self, field, value):
        # 自己生成新版本
        self.vc[self.agent_id] = self.vc.get(self.agent_id, 0) + 1
        new_vc = self.vc.copy()
        digest = hashlib.md5(value.encode()).hexdigest()
        self.local_store[field] = {'vc': new_vc, 'value': value, 'digest': digest}
        # 发布关键事件
        self.redis_client.publish('agent:critical', json.dumps({
            'field': field,
            'vc': new_vc,
            'digest': digest,
            'owner': self.agent_id
        }))

    def read_field(self, field):
        if field in self.local_store:
            return self.local_store[field]['value']
        # 缺失则尝试从元数据视图拉取(省略)
        return None

    def _is_newer(self, vc_a, vc_b):
        if not vc_b:
            return True
        # 简化判断,实际用向量时钟比较
        return vc_a.get(self.agent_id, 0) > vc_b.get(self.agent_id, 0)

    def _merge_vc(self, vc1, vc2):
        merged = vc1.copy()
        for k, v in vc2.items():
            merged[k] = max(merged.get(k, 0), v)
        return merged

    def stop(self):
        self.running = False

# 使用示例
if __name__ == "__main__":
    agent_a = AgentMemory('agent_A')
    agent_b = AgentMemory('agent_B')
    agent_a.write_local('batch_1234_quality', 'PASS')
    time.sleep(0.2)
    print(agent_b.read_field('batch_1234_quality'))  # 应该拿到PASS
    agent_a.stop()
    agent_b.stop()

这个示例完整可运行,展示了 Agent 间通过 Pub/Sub 传递向量时钟和摘要,并拉取最新值的基本流程。生产环境里,拉取实际值是通过 gRPC 调 owner 的接口完成的,不是简化的字符串。

100 个 Agent 并发跑真实产线日志,准确率没崩,但延迟暴露了新问题

实验设置:用影子流量还原最坏情况

我们没敢直接拿生产环境做压力测试,而是用了一个月的真实产线日志回放,构建了一个影子集群。集群里部署了 100 个 Agent 进程,分布于 5 台工控机,每个 Agent 按照日志里真实发生的时间戳执行读写操作。知识库字段有 300 多个,分属不同的 Agent。我们同时跑了中心化 MongoDB + 事务锁的版本、etcd 锁版本,以及我们的分片+Gossip 版本。

主要指标三个:写入冲突率(业务层检测到并发写同一个字段的比例)、读准确率(读到最新版本的比例,以事后全序日志为基准)、p95 读延迟。

方案 写入冲突率 读准确率 p95 读延迟 (ms) Agent 平均 CPU 增量
中心化 MongoDB (无事务) 17.8% 89.2% 35 2%
etcd 分布式锁+事务 0% (强等锁) 99.9% 950 8%
分片+Gossip (无仲裁,LWW) 2.1% 96.5% 82 12%
分片+Gossip+向量时钟仲裁 (最终版) 0.5% (并发由仲裁处理) 98.3% 110 15%

数据摆出来很清楚:强一致方案延迟直接飙到不可接受;纯中心化无事务版本业务上完全不能用;我们的最终方案用 15% 的 CPU 增量和 110ms 的 p95 读延迟,换来了 98.3% 的读准确率,而且那 1.7% 的“不准确”中,有 1.4% 是仲裁 Agent 正在处理的并发冲突,读取到的其实是旧版本,但不会导致错误决策,因为业务层会等待仲裁结果才执行后续动作。实际导致错误决策的比例不到万分之一。

延迟的瓶颈不在 Gossip,在序列化

测试期间我们发现一个隐蔽的延迟来源:Python 的 Protobuf 序列化/反序列化。每个 Gossip 消息大约 12KB,每秒 50 个消息,CPU 吃得比预想多。我们后来把元数据同步改为 FlatBuffers,零拷贝反序列化,把 p95 延迟从 110ms 降到了 74ms。这不是什么神奇技术,只是选对了序列化格式。在资源紧张的工控机上,这个优化直接决定了方案能不能长期跑下去。

何时可以容忍短暂不一致?——我们在产线画了一条很清晰的线

绝对不能不一致的:安全急停和互锁

无论我们怎么强调最终一致性足够用,产线里有些场景就是不能容忍哪怕 2 秒的不一致。比如压机的安全门联锁,一个 Agent 读到门已关,而另一个 Agent 还没收到这个状态,压机就可能启动造成伤亡。这类指令我们没走 Gossip,也没用 Redis Pub/Sub,而是硬接线 + PLC 双回路,外加一个独立的、基于 etcd 的强一致小范围锁,只覆盖不到 1% 的数据。

这样做很丑,很贵,但正确。在制造业,安全相关的东西没有捷径。

可以短暂不一致的:批次统计、OEE 计算、趋势预测

大多数记忆读操作,比如设备综合效率(OEE)计算、能耗趋势、批次缺陷率统计,容忍几十秒甚至几分钟的延迟完全没有问题。我们甚至刻意给这部分的 Gossip 降频,把 CPU 资源让给实时性更高的任务。工厂的数字化大屏看到的数据晚了两分钟,没有人会投诉,但一个急停信号晚了 200ms,可能就是事故。

成本账:省下的工程师时间和避免的停线

这套方案上了三个月,我们没再出现因为记忆冲突导致的混料或误放行。之前的质检-排程冲突完全消失。算一笔账:如果还用 etcd 强锁方案,我们需要加两台高性能工控机专门跑 etcd 集群,加上维护,一年额外成本 9 万左右。而分片+Gossip 方案只用了现有的工控机,CPU 增量通过降低非关键 Gossip 频率控制住了。最重要的是避免了停线:那次 47 万的索赔之后,客户一度想终止合同。现在运行顺利,我们又签了第二个车间。

但也要诚实地说:这套方案的开发成本比中心化方案高了 3 倍,我们两个工程师花了将近四个月才稳定下来。向量时钟的调试、Gossip 收敛参数的调优、仲裁 Agent 的业务规则编写,都不是能速成的事。如果你的团队没有分布式系统背景,不建议轻易上。

踩坑清单:做多 Agent 记忆系统,这几件事你一定要提前想清楚

  • 不要用通用的 NoSQL 做 Agent 共享记忆,除非你只跑原型。MongoDB 单文档写覆盖、无跨字段原子性,是坑王。
  • etcd 是控制面,不是数据面。超过每秒几十次写入的负载,强一致锁方案一定会崩。
  • 先画所有权边界,再谈技术。绝大多数冲突是边界不清引来的,不是系统问题。
  • 向量时钟比版本号好用,但序列化开销要测。在低资源环境下,选择 FlatBuffers 或直接传 JSON 有时更划算。
  • Gossip 频率一定做成自适应的,别固定。写入低时可降频,写入高时加频,避免空转吃资源。
  • 安全关键场景不要信任任何纯软件一致性协议。硬联锁+局部强一致才是底线。
  • 准备一个“仲裁 Agent”处理并发冲突。业务人员能接受的冲突解决逻辑,往往比技术上的完美合并更重要。

这篇文章写得很长,因为踩的坑太多了。如果你也在做多智能体系统,尤其是在工业环境里跑,希望这些真实的数据和教训能帮你绕开我们烧过的那些钱。

本文由 AI 辅助生成,经人工审核后发布。内容由 沈青锋 基于实战经验指导完成。

觉得有用?

沈青锋

连续创业者,第三个项目在做AI+制造业。前两个项目一个做SaaS一个做IoT,都和技术+产业的结合有关。认为AI最大的价值不在聊天机器人,而在让传统行业运转得更好。写文章的目的是分享创业路上的思考和教训。

发表评论