把AI SDK 3.0搬上2000并发流式战场:我用连接池、背压和流量染色死磕了三周才没翻车

30秒速览

  • Serverless跑AI SDK流式接口有硬伤,冷启动和长连接限制能把体验拉跨,得上Redis Stream做消息中枢,还得用长进程跑生成服务。
  • 单用户很容易打爆大模型额度,必须做个感知流式会话生命周期的配额系统,用Redis原子计数管住活跃流数量,甚至能根据消费lag给慢客户端降速。
  • 模型切换不能赌运气,得用一致性哈希染色分流,再搭个影子生成做实时流式diff,发现语义漂移或输出截断立刻告警回滚,不然用户骂了你都不知道。

从演示到崩溃:为什么我在Serverless上跑AI SDK流式接口3分钟就跪了

第一次在本地把AI SDK 3.0的流式生成跑通时,我还觉得这事儿太简单了。就一个useChat hook,几行Route Handler,浏览器里逐字冒出来的回复简直像魔法。可当我把它部署到Vercel,连上公司的生产数据库,准备做个内部客服机器人小范围试用的时候,刚过三分钟监控就炸了——不是模型返回错误,是服务器直接503,而且所有在途的流式连接几乎同时断开。Vercel的函数日志里只有一行冷冰冰的“FUNCTION_INVOCATION_TIMEOUT”,连个堆栈都没留下。

那次我大概只放了十几个同事进去测试,并发撑死不到50,完全不是预想的2000并发。可Serverless环境的限制比我想的狠得多。Vercel的Edge Function或者普通Serverless Function都有硬性的执行时长上限,专业版也是60秒,而我用的GPT-4o经常一个长回复要吐一分半。更要命的是,每个HTTP请求被抽象成一个独立的函数调用,根本没有长连接的概念。AI SDK 3.0默认用的是Server-Sent Events(SSE),在本地开发时就是个长连接一直挂着,但上了Serverless,每发一个token底层都可能触发一个新的函数实例冷启动。这就导致冷启动的延迟直接叠加在流式延迟上,用户那边看到的就是:先白屏等1.2秒,然后唰一下冒出一堆字,卡住,再冒一堆字。体验稀烂。

最让我头疼的是连接池的缺失。在前端,AI SDK的useChat默认会为每个聊天会话创建一个EventSource连接,如果用户刷新页面或者关掉标签页,连接就断了,但服务端的生成可能还在跑——白白浪费大模型额度。而如果我用WebSocket来做,就又得自己造一套连接管理的东西,维护心跳、重连、背压,跟AI SDK 3.0内置的流式机制完全割裂。于是我陷入了两难:要么硬改AI SDK的底层传输层,要么就在Serverless上强行维持“看起来像长连接”的东西。

后来我选了后者,但做了一层适配:在Route Handler里不再直接返回streamText的响应,而是把生成流先接进一个内存队列,通过一个集中的消息分发服务(其实就是用Redis Stream搭了个简易的消息总线)往外推。这样每个Serverless函数实例只负责一段时间的生成,而不是整个会话的生命周期。具体来说,我在next.js的route handler里这样写:

import { StreamingTextResponse, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { Redis } from '@upstash/redis';

const redis = new Redis({ url: process.env.REDIS_URL, token: process.env.REDIS_TOKEN });

export async function POST(req: Request) {
  const { messages, sessionId } = await req.json();
  const result = await streamText({ model: openai('gpt-4o'), messages });
  
  const stream = 应使用 result.toDataStreamResponse() 或 result.toTextStreamResponse();
  const reader = stream.getReader();
  let chunk;
  while (!(chunk = await reader.read()).done) {
    await redis.xadd(`stream:${sessionId}`, '*', { chunk: chunk.value });
  }
  await redis.xadd(`stream:${sessionId}`, '*', { done: 'true' });
  return new Response('OK');
}

这个方案虽然把连接管理和生成过程解耦了,但本质上还是每次生成要独占一个函数实例,直到生成结束。如果并发上来,冷启动开销依然会吃掉不少性能。我后来把这套东西从Vercel迁到了Kubernetes,用长进程来跑生成服务,才真正解决了冷启动问题,但这就是另一个故事了。总之,从演示到生产,中间差了至少一个连接池和一轮架构重设计。别信那些五分钟部署就能抗高并发的鬼话。

我被迫给每个用户上了把锁——基于Redis Stream的全局流配额系统

既然决定用Redis Stream做消息中枢,我就顺手把另一颗定时炸弹拆了:大模型额度被单用户打爆。早期我根本没做任何用户级别的速率限制,想着反正公司内部用,大家自觉就好。结果产品经理为了测试,连发了三十条“给我生成一个完整的产品需求文档范文,要求5000字”,GPT-4o傻乎乎地一条条生成,十分钟烧掉了17美元。这还不是最可怕的,如果某个内部应用的token意外泄露,恶意调用能在两分钟内就干透一整个月的预算。

我一开始想用简单的令牌桶算法,每个用户一个Redis key记录剩余token数。但这只能做粗粒度的限流,没法感知到大模型正在进行的流式会话。因为流式生成是持续消耗额度,一个会话可能持续60秒,它占用的资源不是瞬时请求能衡量的。于是我用Redis Stream搭了一个全局流配额系统,核心思路是:把每个用户的活跃生成会话数当作分布式资源来管理,超过上限就直接拒绝新请求,而不是等生成到一半才发现超了。

实现上,我在用户发起生成请求时,先去Redis里用INCR原子性地增加他当前活跃流计数,并且设置一个过期时间作为兜底。如果计数超过阈值(比如普通用户最多同时3个流),就直接返回429和一段友好的降级提示。同时,在Redis Stream的消费端,每个流结束时(收到done标记),我会再DECR一次。这里有个坑:如果生成中途因为网络原因连接断了,Stream可能永远不会收到done,这个计数器就永远不会减下去,导致用户后续请求都被误杀。所以我给每个流都打了一个基于sessionId的存活标记,起一个后台定时任务扫描所有超过90秒没有新chunk的流,主动释放配额。

更细粒度的是按token消耗速率来控制。我从AI SDK的streamText返回的流里可以解析出每个chunk的token数量(用的是result.usage累积值),然后把这些信息写到Redis Stream,再起一个消费者专门做实时统计。如果某个用户的token消耗速率在15秒窗口内超过了预设阈值(比如每秒1000 token),我就往他的Stream里强行塞一条系统消息“您的生成频率过高,系统已自动降速”,同时在服务端对他的生成流主动注入延迟。这个延迟不是阻塞式的,而是在Redis Stream的消费端做XREAD时人为降低消费速率,相当于给这个用户的流降级到了慢车道。代码示意:

// 消费端伪代码
const tokenLimit = 500; // tokens per 15s
while (true) {
  const entries = await redis.xread(
    [{ key: `stream:${sessionId}`, id: lastId }],
    { count: 1, block: 1000 }
  );
  if (entries) {
    const currentTokens = await getTokenUsageInWindow(userId, 15000);
    if (currentTokens > tokenLimit) {
      await new Promise(resolve => setTimeout(resolve, 200)); // 人为降速
    }
    // 正常推送给客户端
    sendToClient(entries[0][1].chunk);
  }
}

这套机制上线后,财务部再也没找过我。更重要的是,它给了产品侧一个清晰的资源视图:每个用户的生成能力变成了一个可配置的资源配额,不同角色(免费用户、付费用户、内部测试)可以分开设置。比起直接用API层限流,这种感知流式会话生命周期的做法,才真正把大模型当成了一种持续消耗的资源来管理。

当用户的网速比我的生成还慢时,我让模型学会了暂停呼吸

配额问题刚解决,新的问题又冒出来了:移动端用户在电梯里用我们的AI客服,信号一断,服务端还在傻乎乎地拼命生成token,等到用户重新连上时,可能已经白白生成了三十秒的内容——这些内容永远不会被看到,却实实在在烧了钱。一开始我试图在前端用AbortController来取消请求,但移动端断开往往是底层TCP连接直接重置,根本来不及发abort信号。服务端那头,AI SDK 3.0的流没有任何办法感知到下游已经断开,它只会继续从OpenAI的HTTP流里读token,然后往已经关闭的Response里写,然后报一个write after end的错误。

我需要一种更底层的背压机制:让上游大模型的生成速度,能自动跟随下游客户端的消费速度。如果客户端读取慢了(意味着网络变差或用户切换了页面),生成就应该暂停或者减速;如果连接完全断开,生成就该立即终止。

在Next.js的Route Handler里,我本来可以直接用ReadableStream的pipeThrough加一个TransformStream来做背压,但AI SDK 3.0返回的应使用 result.toDataStreamResponse() 或 result.toTextStreamResponse()已经是一个ReadableStream了,我没办法再给它加背压控制,除非我不走SDK自己手动请求OpenAI API。但我又不想丢失SDK提供的错误处理和重试逻辑。最后我折中了一下:不碰SDK那一层,而是在把chunk写入Redis Stream之前,去检查一下下游客户端的消费lag。

具体做法是,在生成端(生产者)每写入一个chunk到Redis Stream后,就检查该Stream的lag(即未消费的消息数量)。如果lag超过了设定的阈值(比如20条),说明客户端消费太慢了,我就主动暂停从OpenAI流读取新的chunk,用一个while循环阻塞住,直到lag降下来。这个阻塞不是死等的,我每隔100毫秒检查一次,如果连续阻塞超过10秒(可能客户端已经彻底掉线),就直接终止生成,释放配额。这相当于用Redis Stream的消息积压作为背压信号,把下游的消费能力反向传递给了上游的生成速度。

const MAX_LAG = 20;
let stallCount = 0;
while (true) {
  const chunk = await reader.read();
  if (chunk.done) break;
  await redis.xadd(`stream:${sessionId}`, '*', { chunk: chunk.value });
  const lag = await redis.xlen(`stream:${sessionId}`) - consumedOffset;
  if (lag > MAX_LAG) {
    stallCount++;
    if (stallCount > 100) { // 10秒
      console.log(`Client disconnected, stopping generation for ${sessionId}`);
      break;
    }
    await sleep(100);
    continue;
  }
  stallCount = 0;
}

这个方法虽然有点粗糙,但确实有效。我后来还加了一个优化:当检测到客户端lag持续过大时,不只是暂停生成,还主动压缩消息——把多个小chunk合并成一个大chunk,减少写入次数,加快客户端的追赶速度。这样一来,即使是在地铁里信号飘忽的移动端用户,也不会因为流式响应本身的设计缺陷而被浪费太多额度。说实话,AI SDK 3.0本身提供了很棒的流式抽象,但在真实世界的网络环境下,这种背压处理还是得自己补一层。

多模型灰度上线那天,我用流量染色和实时diff监控守住了最后一条防线

最刺激的事情发生在一次模型切换。我们想从GPT-4o切到Claude 3.5 Sonnet,因为内部评测显示Sonnet在某些业务场景下回复质量更高,而且价格便宜40%。但问题来了:直接全量切换风险太大,万一某些query的表现反而变差,用户投诉会像潮水一样涌来。我需要一个丝滑的灰度发布方案:让一小部分用户先跑新模型,同时能实时对比新老模型的流式回复质量,而且不能让用户感知到任何延迟变化。

流式响应的灰度比普通API灰度复杂得多。普通请求只要看最终返回的JSON对不对就行了,但流式是动态的,同样的问题,两个模型可能第一个token出现的时机不同,token的速度不同,输出的文字内容也不同。怎么对比?用最终完整文本做相似度比较太滞后了,等全部生成完用户早就看到结果了。我需要的是在流式传输过程中就能发现质量劣化的信号。

我的方案是这样的:在网关层(我用的Envoy,但用nginx也能做)根据用户ID的哈希值做流量染色,将5%的用户路由到Claude模型组,其余95%照旧用GPT-4o。这里注意不能用随机路由,必须用用户ID一致性哈希,确保同一个用户在整个会话期间始终落在一组模型上,否则对话上下文会错乱。染色逻辑很简单:

const userIdHash = createHash('md5').update(userId).digest('hex');
const hashInt = parseInt(userIdHash.slice(0, 8), 16);
if (hashInt % 100 < 5) {
  model = 'claude-3-5-sonnet';
} else {
  model = 'gpt-4o';
}

接下来是质量对比的难点。我不想让灰度用户当小白鼠,所以我搭了一个“影子生成”管道:对于灰度用户的每个请求,除了让Claude正常生成并返回给用户之外,同时并行用GPT-4o生成一份影子回复,但这份回复不发给用户,只用来做实时对比。为了不额外增加延迟,影子生成完全异步,不阻塞主流程。我把两个模型的流式输出都打上标记(model: claudemodel: gpt4o)写入同一个Redis Stream,然后单独起一个对比消费者订阅这个Stream。

对比消费者做的事情是:实时缓存两个模型输出的前半部分文本(比如前200个token),然后跑一个轻量级的语义漂移检测。我用的不是昂贵的BLEU或BERTScore,因为那需要完整文本,而是用了一个滑动窗口内的困惑度差异,外加一个简单的情感倾向对比(用fastText做,模型文件才几MB)。如果Claude的输出在前200个token内频繁出现负面语气词,或者生成的文本明显比GPT-4o短很多(可能意味着提前中止),就立刻触发告警。代码大概是这样:

// 对比消费者伪代码
const baselineTokens = [];
const candidateTokens = [];
while (true) {
  const msg = await redis.xread(...);
  if (msg.model === 'gpt4o') {
    baselineTokens.push(msg.chunk);
  } else {
    candidateTokens.push(msg.chunk);
    const baselineText = baselineTokens.join('');
    const candidateText = candidateTokens.join('');
    const sentimentDiff = analyzeSentiment(candidateText) - analyzeSentiment(baselineText);
    if (Math.abs(sentimentDiff) > 0.5) {
      await sendAlert(`Sentiment drift detected for session ${sessionId}`);
    }
    if (candidateText.length  20) {
      await sendAlert(`Output truncation suspected for session ${sessionId}`);
    }
  }
}

光有告警不够,我还需要快速回滚。我设了一个手动开关,如果五分钟内收到超过3次质量告警,就一键把染色比例从5%降到0,并且让所有正在进行的Claude流立即中断,前端无感切回GPT-4o重试。前端用的useChat hook本身就会在连接断开后自动重连,我只要在服务端切断连接时返回一个特定错误码,前端就会自动发起一条新的completion请求。这样一来,那次模型切换我们最终确实发现Claude在某些法律条款生成上会出现幻觉,但因为灰度比例小,只有两个测试用户发现了异常,在告警后的三秒内我就切了回去,实际影响微乎其微。

这一整套灰度+监控的流式方案,让我们后续接Gemini、接自研模型都变得非常安全。我甚至把影子生成的对比扩展到了多模型并行,用一个dashboard实时展示不同模型在同一批query上的token消耗、首token延迟、中断率,简直像给模型开了个CT扫描。说实话,流式响应的灰度发布如果只靠用户反馈问卷,那跟盲人摸象没区别,真正有用的还是这种自动化、无感知的质量卫士。

本文由 AI 辅助生成,经人工审核后发布。内容由 林默 基于实战经验指导完成。

觉得有用?

林默

全栈开发者,写了8年代码,从jQuery时代一路写到AI Copilot。目前专注AI编程工具链的深度使用和评测,相信好的工具能让开发者事半功倍。喜欢用实际项目验证技术方案,不写没踩过坑的教程。