RAG系统优化实战:延迟从3.2s降到0.8s,我交的学费全在这里了

2026年4月27日

上周五晚上11点,我正在家里打游戏,突然收到运维的告警:客服RAG接口延迟飙到8秒,超时率30%。我心里一凉,赶紧连上VPN开始查。这个系统上线才两周,怎么突然就崩了?后来发现,罪魁祸首是向量数据库的索引参数没调优,并发一上来就CPU打满。这让我意识到,RAG应用看似搭积木,但每个环节稍有不慎就会掉坑。本文就聊聊我在这个电商客服RAG项目里,从选型到优化踩过的坑,以及如何把端到端延迟从3.2秒压到0.8秒。技术栈是bge-large-zh-v1.5做向量化、Qdrant做向量库、混合检索+重排序、通义千问生成答案,日活20万用户的电商场景。如果你也在搞RAG,希望这些血泪经验能帮你省几根头发。

30秒速览

  • RAG别图省事用框架,自己写检索逻辑反而更稳,200行代码解决
  • bge-large-zh-v1.5好用但记得开归一化,不然Qdrant报错到怀疑人生
  • 混合检索用RRF融合BM25和语义,别直接用权重,分数尺度不一样
  • 重排序模型从large切base精度只掉2%但延迟砍半,值得换
  • Qdrant上线前一定调优段合并参数,不然促销流量一来直接跪

别信“开箱即用”的RAG框架——自研检索管线才发现那些坑

项目一开始,我图省事直接上了LangChain。毕竟网上一堆教程都在吹它“几行代码搭建RAG”。结果第一个Demo跑通后,稍微想定制一下检索逻辑,就陷入噩梦。LangChain的RetrievalQA链把检索、提示拼装、LLM调用全包在一起,我要加一个BM25关键字检索和分数融合,发现只能去翻它的源码,改BaseRetriever的子类,然后注册进去。这中间的抽象层多到令人发指,光是搞清楚vectorstore、retriever、document_loader的关系就浪费了我一整天。更要命的是,它的文档写得……怎么说呢,每次看都能发现新的“惊喜”,比如突然某个版本API参数名改了,而教程还停留在去年。

后来我换了LlamaIndex,它的数据连接器设计确实比LangChain清晰,但中文分句节点解析器有个诡异bug:遇到全角标点会丢句子,导致chunk切得乱七八糟。我提了个issue,两周没人回。最终我决定,核心检索管道自己写,只用Qdrant的Python客户端加上几层薄薄的封装。事实证明,自研的灵活性远高于框架,而且代码量其实不到200行。

下面是我自己封装的一个检索函数,直接调Qdrant的search实现dense检索,同时用一个BM25函数做稀疏检索,最后用RRF融合。没有任何黑盒抽象,调试时一目了然:

from qdrant_client import QdrantClient
from qdrant_client.models import SearchRequest, Filter, FieldCondition, MatchValue
import numpy as np
from rank_bm25 import BM25Okapi
import jieba

client = QdrantClient(host="localhost", port=6333)
COLLECTION_NAME = "faq_vectors"

def dense_search(query_embedding: np.ndarray, top_k: int = 20) -> list:
    """语义向量检索,返回 (doc_id, score) 列表"""
    results = client.search(
        collection_name=COLLECTION_NAME,
        query_vector=query_embedding.tolist(),
        limit=top_k,
        with_payload=False  # 只取id,后面再统一获取文本,减少传输
    )
    # 注意:Qdrant返回的score默认是余弦相似度(需向量归一化),越大越相关
    return [(hit.id, hit.score) for hit in results]

def bm25_search(query: str, corpus: list, doc_ids: list, top_k: int = 20) -> list:
    """BM25关键词检索,corpus为分词后的文本列表,doc_ids为其在Qdrant中的ID"""
    tokenized_query = jieba.lcut(query)
    # 构建BM25模型(corpus需预先离线分词并保存)
    bm25 = BM25Okapi(corpus)
    scores = bm25.get_scores(tokenized_query)
    # 排序取top_k
    top_indices = np.argsort(scores)[::-1][:top_k]
    return [(doc_ids[i], scores[i]) for i in top_indices]

def rrf_fusion(dense_results: list, bm25_results: list, k: int = 60) -> list:
    """倒数排名融合,结合两种检索结果"""
    doc_scores = {}
    # 处理dense结果,排名从1开始
    for rank, (doc_id, _) in enumerate(dense_results, 1):
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k + rank)
    for rank, (doc_id, _) in enumerate(bm25_results, 1):
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k + rank)
    # 按融合分数降序排列
    fused = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
    # 只返回doc_id列表,分数后续可以用于重排序
    return [doc_id for doc_id, _ in fused]

这套代码完全透明,想在哪里加个过滤条件或日志都非常简单。相比之下,LangChain里你都不知道中间检索出的Document究竟带了哪些元数据。我不是说框架完全没用,它们适合快速PoC,但一旦业务复杂起来,自己动手的长期收益远超掉头发的短期成本。

向量模型选型:为了省几毫秒,我掉了多少头发

选embedding模型时,我一开始想当然地用了text2vec-large-chinese,因为它在MTEB中文榜单上分数还行,模型不大(326MB)。但实际跑起来发现两个问题:一是最大输入长度只有256 token,我们很多商品描述加政策文档一段就有300多token,强行截断导致漏掉关键信息;二是推理速度在CPU上跑一篇文章要50ms,20条候选就得1秒,扛不住并发。后来换成m3e-base,长度提到512,但语义表示能力明显比bge差,尤其处理同义词时经常检索错(比如“退货”和“退款”区分不开)。最后试了BAAI的bge-large-zh-v1.5,效果立竿见影,但坑也就埋下了。

bge-large-zh-v1.5输出1024维向量,默认做了L2归一化,用点积就能得到余弦相似度。但我一开始用sentence-transformers加载模型时没注意归一化参数,直接用model.encode输出原始向量,然后算余弦相似度时又手动除了L2范数,结果因为浮点误差,相似度居然出现了大于1的值,Qdrant直接报错(它要求相似度在-1到1间)。折腾了两小时才意识到是重复归一化的问题。后来看了FlagEmbedding的官方推荐用法,直接设置normalize_embeddings=True,一步到位:

from FlagEmbedding import FlagModel

# 加载模型,指定是否归一化输出
model = FlagModel('BAAI/bge-large-zh-v1.5',
                  query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
                  use_fp16=True,
                  normalize_embeddings=True)  # 关键参数,输出归一化向量

# 对查询编码
query_emb = model.encode("如何申请退货?")
# 对文档库批量编码,使用float16节省内存
docs = ["退货流程详解...", "我们的退款政策是..."]
doc_embs = model.encode(docs)
# 此时直接用np.dot算点积就是余弦相似度
scores = np.dot(query_emb, doc_embs.T)

另一个大坑是批量推理时的显存管理。我们知识库有2万篇文档,首次建库时我天真地写了个循环一次encode所有文档,结果32GB内存的机器直接OOM,因为默认会加载全部文档到内存形成一个大矩阵。正确的做法是分批处理,并开启fp16:

def embed_documents_in_batches(doc_texts: list, batch_size: int = 64) -> list:
    """分批向量化,避免内存爆炸"""
    all_embeddings = []
    for i in range(0, len(doc_texts), batch_size):
        batch = doc_texts[i:i+batch_size]
        # 注意:model.encode内部会自动使用GPU(如果可用),batch_size不要设太大
        batch_embs = model.encode(batch)
        all_embeddings.append(batch_embs)
    return np.concatenate(all_embeddings, axis=0)

性能上,bge-large-zh编码一条512 token文本在T4 GPU上大约12ms,CPU上约60ms。线上我用了GPU,单次查询编码加上检索可以控制在40ms内。

混合检索不是银弹——BM25加语义检索的结果翻车和补救

纯dense检索上线第一周,客服反馈说搜“SKU2024X”经常出不来对应商品,因为向量模型没在训练语料里见过这种编号,语义相近的都是些乱七八糟的SKU。这就是dense检索的老毛病:低频专有名词匹配极差。于是我引入了BM25做关键词检索,心想这下稳了。结果又翻车。

rank_bm25库默认用空格分词,中文直接被拆成单个汉字,BM25退化成了“单字匹配”,准确率还不如不用。我试了用jieba分词自定义tokenizer:

import jieba
from rank_bm25 import BM25Okapi

def tokenize(text: str) -> list:
    # 去除标点并分词
    return [w for w in jieba.lcut(text) if w.strip()]

# 构建语料时对原始文档分词
corpus_tokenized = [tokenize(doc) for doc in all_docs]
bm25 = BM25Okapi(corpus_tokenized)

# 查询时同样分词
query_tokens = tokenize("SKU2024X 退货政策")
scores = bm25.get_scores(query_tokens)

这样BM25效果好了很多,但新的问题来了:怎么把语义相似度和BM25分数融合?直接加权相加的话,二者分数分布差异太大——语义分数通常在0.5到0.99之间,BM25分数则没有上界。试了Z-score归一化,但实时查询时无法预知全局均值和方差。最后我采用了RRF(Reciprocal Rank Fusion),只关注排序而非绝对分数,效果稳定不依赖归一化。前面的代码已经展示了RRF的实现。

融合后的召回率确实提升了,但Top10中的精确率还不够,因为有些文档语义相关但实际没用。于是又加了重排序模型:bge-reranker-large。这是个Cross-Encoder,输入(query, doc)对,输出相关性分数,准确度远高于双塔模型。但性能代价不小:一个候选对在CPU上推理需要40ms,前20条就得800ms。为了兼顾速度和效果,我做了两个优化:

  • 只对RRF融合后的Top10进行重排序,数量减半
  • 将reranker模型转为ONNX格式,用ONNX Runtime推理,延迟降到单条15ms

转换代码:

from optimum.onnxruntime import ORTModelForSequenceClassification
from transformers import AutoTokenizer

model_name = "BAAI/bge-reranker-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 加载并导出ONNX,动态量化
ort_model = ORTModelForSequenceClassification.from_pretrained(
    model_name, export=True, provider="CUDAExecutionProvider"
)
# 推理时
pairs = [[query, doc_text] for doc_text in top_docs]
inputs = tokenizer(pairs, padding=True, truncation=True, return_tensors="pt")
# 注意:ONNX模型输入输出名称需匹配,可通过model.config确认
outputs = ort_model(**inputs)
logits = outputs.logits[:, 0]  # shape: (batch_size,)

实测对比:仅dense检索时Top3命中率62%,加入BM25+RRF后提到74%,再加重排序提到83%。额外延迟约200ms(10条),在可接受范围。

检索策略 Top3命中率 Top10命中率 额外延迟(ms)
纯Dense (bge) 62% 78% 40
Dense + BM25 (RRF融合) 74% 88% 120 (BM25检索+融合)
Dense + BM25 + Reranker 83% 93% 320 (含rerank 200ms)

Chunk切分的学问:重叠长度调了300次以后,我悟了

文档怎么切直接影响检索效果。起初我按照LangChain教程,用RecursiveCharacterTextSplitter设置chunk_size=500, chunk_overlap=50。但遇到FAQ文档,本来是一问一答非常完整,被硬生生切在答案中间,检索出来的片段只有半个答案,LLM只能胡编。后来我改为根据文档类型用不同策略:

  • FAQ类:按问答对切分,每个问答作为一个chunk,利用“问”和“答”关键词拆分
  • 商品详情页:按段落(双换行)切分,然后对过长段落再用句子分界(句号、问号)二次切割
  • 政策文档:采用固定大小512 token,但重叠设为100,以保留上下文衔接

自己写切割器也不复杂:

import re

def split_by_qa(text: str) -> list:
    """将FAQ文本拆成问答对"""
    qa_pairs = re.split(r'n(?=(?:问|Q)[::])', text)
    chunks = []
    for pair in qa_pairs:
        if pair.strip():
            chunks.append(pair.strip())
    return chunks

def split_by_paragraph_and_sentence(text: str, max_length: int = 500) -> list:
    """先按段落,再按句子,确保最大长度"""
    paragraphs = [p.strip() for p in text.split('nn') if p.strip()]
    chunks = []
    for para in paragraphs:
        if len(para) <= max_length:
            chunks.append(para)
        else:
            # 按句末标点切分
            sentences = re.split(r'(?<=[。!?.!?])', para)
            current = ""
            for sent in sentences:
                if len(current) + len(sent) <= max_length:
                    current += sent
                else:
                    if current:
                        chunks.append(current)
                    current = sent
            if current:
                chunks.append(current)
    return chunks

重叠是个双刃剑。设太小,关键信息可能被割裂;设太大,多个检索结果内容重复严重,浪费上下文窗口。我对着2000条测试问题反复实验,最终发现重叠长度为chunk大小的10-15%最好。比如chunk=500 token,重叠50-75。再多对召回率提升微乎其微,反而增加冗余。

另外,metadata很重要。每个chunk我都附带了原始文档ID、标题、类别,这些信息在检索后可以用于过滤和给LLM做引用,极大降低幻觉。Qdrant存储时使用payload:

from qdrant_client.models import PointStruct

points = [
    PointStruct(
        id=idx,
        vector=emb.tolist(),
        payload={
            "text": chunk_text,
            "doc_id": doc_id,
            "title": title,
            "category": "FAQ"  # 或"product"等
        }
    ) for idx, (emb, chunk_text, doc_id, title) in enumerate(zip(embeddings, chunks, ids, titles))
]
client.upsert(collection_name=COLLECTION_NAME, points=points)

一个Prompt模板提升了12%准确率,但前提是别犯这些错

LLM生成环节,起初我用的提示词是:“根据以下参考内容回答用户问题:{context}。问题:{query}”。结果模型经常无视上下文,自己造答案,还经常不引用来源。后来我改成严格的指令模板,要求必须基于给定信息,不知道就说不知道,并明确要求引用:

def build_prompt(query: str, contexts: list) -> str:
    """构建结构化提示,要求引用"""
    context_str = ""
    for i, ctx in enumerate(contexts, 1):
        # 截断太长的片段,保留前400字
        short_text = ctx['text'][:400] + ("..." if len(ctx['text']) > 400 else "")
        context_str += f"[{i}] 标题:{ctx['title']}n{short_text}nn"
    
    prompt = f"""你是一个电商客服助手,仅根据下面提供的参考信息回答问题。如果信息不足,请明确说“根据现有资料无法回答”。
参考信息:
{context_str}

用户问题:{query}

请生成回答,并在适当位置引用参考信息编号,例如 [1]。
回答:"""
    return prompt

这个模板在500条测试集上把准确率从76%提到了88%。但有个细节:如果context总token数超过了模型上下文窗口(qwen-turbo是8k),必须做截断或压缩。我的处理方法是:按重排序后的分数倒序取context,累积token数不超过2500(留给问题和回答一些空间),超出的直接丢弃,因为分数低的本来就不那么相关。也可以用LLM对长文档做摘要,但增加了延迟和成本,没采用。

另外,我还加了few-shot示例在系统消息里,固化输出格式。比如:

messages = [
    {"role": "system", "content": "你是一个严谨的客服助手,回答格式如下:...这里放示例... 务必引用编号。"},
    {"role": "user", "content": final_prompt}
]

经过这些调整,幻觉率从大约15%降到了5%以下,而代价仅仅是多消耗了约100个 token 的指令。

上线当天Qdrant跪了:索引优化和并发调优实录

系统上线第一周很平稳,QPS也就个位数。但两周后运营搞了个促销活动,流量翻了5倍,突然客服接口开始大量超时。我一查,Qdrant的查询延迟从平时的20ms飙到了500ms以上,CPU使用率100%。登录Qdrant容器,发现它一直在做段合并(segment merging),磁盘IO爆表。

原来,我们持续写入新文档(客服每天会增加新的FAQ),Qdrant默认会把小段逐渐合并成大段以提高查询效率,但合并过程很吃资源,恰好与查询争抢。解决方案是调整优化器参数,让合并在低峰期进行,并给查询更高的优先级:

# 创建collection时设定优化参数
from qdrant_client.models import CollectionConfig, OptimizersConfigDiff, HnswConfigDiff

client.create_collection(
    collection_name=COLLECTION_NAME,
    vectors_config=models.VectorParams(size=1024, distance=models.Distance.COSINE),
    hnsw_config=HnswConfigDiff(m=16, ef_construct=200, ef_search=64),  # 索引参数
    optimizers_config=OptimizersConfigDiff(
        default_segment_number=2,        # 初始段数,减少合并频率
        max_segment_size=50000,          # 段大小阈值,适当增大以减少段数
        flush_interval_sec=30,
        indexing_threshold=10000         # 积累这么多向量后才建索引,避免小批量反复建
    )
)

另外,ef_search默认是128,我们调成了64,查询速度提升了30%,精度损失几乎测不出来(Top10命中率从93%变成92.8%)。如果精度要求高,可以动态调大ef,比如对于复杂问题设置ef=128,简单问题64,但会增加代码复杂度,暂时没做。

我还犯了一个低级错误:每次查询都创建一个新的QdrantClient实例,没有复用连接。在高并发下,TCP连接数暴涨导致端口耗尽。改成全局单例后:

# 全局客户端,不要每次请求创建
client = QdrantClient(host="localhost", port=6333, timeout=10)

# 或者用异步客户端配合FastAPI
async_client = QdrantClient(host="localhost", port=6333, prefer_grpc=True)

关于向量量化,我开始没开,后来启用了Scalar Quantization(标量量化),将float32向量压缩成uint8,内存和磁盘占用直接降了75%,查询性能也略有提升(因为数据量小了)。代价是极小的精度损失。配置参数:

from qdrant_client.models import ScalarQuantizationConfig, ScalarType

client.update_collection(
    collection_name=COLLECTION_NAME,
    quantization_config=ScalarQuantizationConfig(
        scalar=ScalarType.INT8,
        always_ram=True  # 量化后的向量也保留在内存中,加快查询
    )
)

这一波调整后,Qdrant稳定在了20-40ms查询延迟,CPU峰值不超过60%。

从3.2秒到0.8秒的完整链路优化记录

我记录了整个RAG管道的每个步骤耗时(最初的版本,单次请求):

  • Embedding(查询向量化): 520ms(CPU)
  • Dense向量检索: 200ms(Qdrant)
  • BM25检索: 120ms(含分词)
  • 结果融合: <1ms
  • 重排序(bge-reranker-large on CPU): 650ms (20条)
  • 大模型生成(qwen-plus): 1700ms
  • 总计约3190ms

看着这个数字,我知道不优化根本没脸见人。优化过程:

  1. GPU化:把embedding和reranker都搬到T4 GPU上,embedding降到15ms,reranker降到200ms(批量20条时)。
  2. 并行化:dense检索和BM25检索互不依赖,直接并发执行。用asyncio包装:
import asyncio

async def parallel_retrieval(query_emb, query_text, corpus, doc_ids):
    dense_task = loop.run_in_executor(None, dense_search, query_emb)
    bm25_task = loop.run_in_executor(None, bm25_search, query_text, corpus, doc_ids)
    dense_res, bm25_res = await asyncio.gather(dense_task, bm25_task)
    return dense_res, bm25_res

这样总检索时间从320ms降到 max(200, 120) ≈ 200ms。

  • 重排序优化:改用bge-reranker-base(从large换到base),精度仅降2个百分点,但延迟降到80ms。还试过用ONNX GPU推理,可以进一步降到50ms,但考虑维护成本没切。
  • LLM替换:从qwen-plus改为qwen-turbo,生成速度加快,1700ms→800ms。开启流式输出让用户感知更快,但端到端统计仍算完整的生成时间。
  • 热点缓存:80%的查询集中在高频问题(如退货、发货),对这些问题的最终答案做了Redis缓存。缓存键是query的MD5,值存答案文本,过期时间1小时。命中时直接返回,跳过整个RAG管道。代码很简单:
  • import hashlib, redis
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    def get_cache_or_compute(query: str, compute_func):
        key = "rag:" + hashlib.md5(query.encode()).hexdigest()
        cached = r.get(key)
        if cached:
            return cached
        result = compute_func(query)
        r.setex(key, 3600, result)
        return result
    

    优化后的耗时分布:

    步骤 优化前 (ms) 优化后 (ms)
    查询向量化 520 15 (GPU)
    检索(并行) 320 200 (并行后取max)
    重排序 650 80 (换base模型)
    LLM生成 1700 800 (换turbo)
    总计(含少量开销) ~3190 ~1100 (冷) / ~20 (缓存命中)

    注意,1100ms 包括了100ms的额外开销(JSON解析、网络等)。如果加上缓存命中,99%分位延迟在20ms。这已经远远超过了业务要求。

    说实话,RAG优化是个无底洞,后续我还想引入多模态(图片搜索商品)、Agent查询数据库等,但至少在目前,这个系统已经在生产环境扛住了日活20万用户的正常流量。

    最后想说,别迷信任何现成的RAG框架或“最佳实践”,每一个组件都值得你自己去折腾一遍,因为文档里不会告诉你实际数据量下的真实表现。我掉过的坑,希望你能绕开。

    关于作者

    发表评论