上周五晚上11点,我正在家里打游戏,突然收到运维的告警:客服RAG接口延迟飙到8秒,超时率30%。我心里一凉,赶紧连上VPN开始查。这个系统上线才两周,怎么突然就崩了?后来发现,罪魁祸首是向量数据库的索引参数没调优,并发一上来就CPU打满。这让我意识到,RAG应用看似搭积木,但每个环节稍有不慎就会掉坑。本文就聊聊我在这个电商客服RAG项目里,从选型到优化踩过的坑,以及如何把端到端延迟从3.2秒压到0.8秒。技术栈是bge-large-zh-v1.5做向量化、Qdrant做向量库、混合检索+重排序、通义千问生成答案,日活20万用户的电商场景。如果你也在搞RAG,希望这些血泪经验能帮你省几根头发。
30秒速览
- RAG别图省事用框架,自己写检索逻辑反而更稳,200行代码解决
- bge-large-zh-v1.5好用但记得开归一化,不然Qdrant报错到怀疑人生
- 混合检索用RRF融合BM25和语义,别直接用权重,分数尺度不一样
- 重排序模型从large切base精度只掉2%但延迟砍半,值得换
- Qdrant上线前一定调优段合并参数,不然促销流量一来直接跪
别信“开箱即用”的RAG框架——自研检索管线才发现那些坑
项目一开始,我图省事直接上了LangChain。毕竟网上一堆教程都在吹它“几行代码搭建RAG”。结果第一个Demo跑通后,稍微想定制一下检索逻辑,就陷入噩梦。LangChain的RetrievalQA链把检索、提示拼装、LLM调用全包在一起,我要加一个BM25关键字检索和分数融合,发现只能去翻它的源码,改BaseRetriever的子类,然后注册进去。这中间的抽象层多到令人发指,光是搞清楚vectorstore、retriever、document_loader的关系就浪费了我一整天。更要命的是,它的文档写得……怎么说呢,每次看都能发现新的“惊喜”,比如突然某个版本API参数名改了,而教程还停留在去年。
后来我换了LlamaIndex,它的数据连接器设计确实比LangChain清晰,但中文分句节点解析器有个诡异bug:遇到全角标点会丢句子,导致chunk切得乱七八糟。我提了个issue,两周没人回。最终我决定,核心检索管道自己写,只用Qdrant的Python客户端加上几层薄薄的封装。事实证明,自研的灵活性远高于框架,而且代码量其实不到200行。
下面是我自己封装的一个检索函数,直接调Qdrant的search实现dense检索,同时用一个BM25函数做稀疏检索,最后用RRF融合。没有任何黑盒抽象,调试时一目了然:
from qdrant_client import QdrantClient
from qdrant_client.models import SearchRequest, Filter, FieldCondition, MatchValue
import numpy as np
from rank_bm25 import BM25Okapi
import jieba
client = QdrantClient(host="localhost", port=6333)
COLLECTION_NAME = "faq_vectors"
def dense_search(query_embedding: np.ndarray, top_k: int = 20) -> list:
"""语义向量检索,返回 (doc_id, score) 列表"""
results = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_embedding.tolist(),
limit=top_k,
with_payload=False # 只取id,后面再统一获取文本,减少传输
)
# 注意:Qdrant返回的score默认是余弦相似度(需向量归一化),越大越相关
return [(hit.id, hit.score) for hit in results]
def bm25_search(query: str, corpus: list, doc_ids: list, top_k: int = 20) -> list:
"""BM25关键词检索,corpus为分词后的文本列表,doc_ids为其在Qdrant中的ID"""
tokenized_query = jieba.lcut(query)
# 构建BM25模型(corpus需预先离线分词并保存)
bm25 = BM25Okapi(corpus)
scores = bm25.get_scores(tokenized_query)
# 排序取top_k
top_indices = np.argsort(scores)[::-1][:top_k]
return [(doc_ids[i], scores[i]) for i in top_indices]
def rrf_fusion(dense_results: list, bm25_results: list, k: int = 60) -> list:
"""倒数排名融合,结合两种检索结果"""
doc_scores = {}
# 处理dense结果,排名从1开始
for rank, (doc_id, _) in enumerate(dense_results, 1):
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k + rank)
for rank, (doc_id, _) in enumerate(bm25_results, 1):
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k + rank)
# 按融合分数降序排列
fused = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# 只返回doc_id列表,分数后续可以用于重排序
return [doc_id for doc_id, _ in fused]
这套代码完全透明,想在哪里加个过滤条件或日志都非常简单。相比之下,LangChain里你都不知道中间检索出的Document究竟带了哪些元数据。我不是说框架完全没用,它们适合快速PoC,但一旦业务复杂起来,自己动手的长期收益远超掉头发的短期成本。
向量模型选型:为了省几毫秒,我掉了多少头发
选embedding模型时,我一开始想当然地用了text2vec-large-chinese,因为它在MTEB中文榜单上分数还行,模型不大(326MB)。但实际跑起来发现两个问题:一是最大输入长度只有256 token,我们很多商品描述加政策文档一段就有300多token,强行截断导致漏掉关键信息;二是推理速度在CPU上跑一篇文章要50ms,20条候选就得1秒,扛不住并发。后来换成m3e-base,长度提到512,但语义表示能力明显比bge差,尤其处理同义词时经常检索错(比如“退货”和“退款”区分不开)。最后试了BAAI的bge-large-zh-v1.5,效果立竿见影,但坑也就埋下了。
bge-large-zh-v1.5输出1024维向量,默认做了L2归一化,用点积就能得到余弦相似度。但我一开始用sentence-transformers加载模型时没注意归一化参数,直接用model.encode输出原始向量,然后算余弦相似度时又手动除了L2范数,结果因为浮点误差,相似度居然出现了大于1的值,Qdrant直接报错(它要求相似度在-1到1间)。折腾了两小时才意识到是重复归一化的问题。后来看了FlagEmbedding的官方推荐用法,直接设置normalize_embeddings=True,一步到位:
from FlagEmbedding import FlagModel
# 加载模型,指定是否归一化输出
model = FlagModel('BAAI/bge-large-zh-v1.5',
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=True,
normalize_embeddings=True) # 关键参数,输出归一化向量
# 对查询编码
query_emb = model.encode("如何申请退货?")
# 对文档库批量编码,使用float16节省内存
docs = ["退货流程详解...", "我们的退款政策是..."]
doc_embs = model.encode(docs)
# 此时直接用np.dot算点积就是余弦相似度
scores = np.dot(query_emb, doc_embs.T)
另一个大坑是批量推理时的显存管理。我们知识库有2万篇文档,首次建库时我天真地写了个循环一次encode所有文档,结果32GB内存的机器直接OOM,因为默认会加载全部文档到内存形成一个大矩阵。正确的做法是分批处理,并开启fp16:
def embed_documents_in_batches(doc_texts: list, batch_size: int = 64) -> list:
"""分批向量化,避免内存爆炸"""
all_embeddings = []
for i in range(0, len(doc_texts), batch_size):
batch = doc_texts[i:i+batch_size]
# 注意:model.encode内部会自动使用GPU(如果可用),batch_size不要设太大
batch_embs = model.encode(batch)
all_embeddings.append(batch_embs)
return np.concatenate(all_embeddings, axis=0)
性能上,bge-large-zh编码一条512 token文本在T4 GPU上大约12ms,CPU上约60ms。线上我用了GPU,单次查询编码加上检索可以控制在40ms内。
混合检索不是银弹——BM25加语义检索的结果翻车和补救
纯dense检索上线第一周,客服反馈说搜“SKU2024X”经常出不来对应商品,因为向量模型没在训练语料里见过这种编号,语义相近的都是些乱七八糟的SKU。这就是dense检索的老毛病:低频专有名词匹配极差。于是我引入了BM25做关键词检索,心想这下稳了。结果又翻车。
rank_bm25库默认用空格分词,中文直接被拆成单个汉字,BM25退化成了“单字匹配”,准确率还不如不用。我试了用jieba分词自定义tokenizer:
import jieba
from rank_bm25 import BM25Okapi
def tokenize(text: str) -> list:
# 去除标点并分词
return [w for w in jieba.lcut(text) if w.strip()]
# 构建语料时对原始文档分词
corpus_tokenized = [tokenize(doc) for doc in all_docs]
bm25 = BM25Okapi(corpus_tokenized)
# 查询时同样分词
query_tokens = tokenize("SKU2024X 退货政策")
scores = bm25.get_scores(query_tokens)
这样BM25效果好了很多,但新的问题来了:怎么把语义相似度和BM25分数融合?直接加权相加的话,二者分数分布差异太大——语义分数通常在0.5到0.99之间,BM25分数则没有上界。试了Z-score归一化,但实时查询时无法预知全局均值和方差。最后我采用了RRF(Reciprocal Rank Fusion),只关注排序而非绝对分数,效果稳定不依赖归一化。前面的代码已经展示了RRF的实现。
融合后的召回率确实提升了,但Top10中的精确率还不够,因为有些文档语义相关但实际没用。于是又加了重排序模型:bge-reranker-large。这是个Cross-Encoder,输入(query, doc)对,输出相关性分数,准确度远高于双塔模型。但性能代价不小:一个候选对在CPU上推理需要40ms,前20条就得800ms。为了兼顾速度和效果,我做了两个优化:
- 只对RRF融合后的Top10进行重排序,数量减半
- 将reranker模型转为ONNX格式,用ONNX Runtime推理,延迟降到单条15ms
转换代码:
from optimum.onnxruntime import ORTModelForSequenceClassification
from transformers import AutoTokenizer
model_name = "BAAI/bge-reranker-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 加载并导出ONNX,动态量化
ort_model = ORTModelForSequenceClassification.from_pretrained(
model_name, export=True, provider="CUDAExecutionProvider"
)
# 推理时
pairs = [[query, doc_text] for doc_text in top_docs]
inputs = tokenizer(pairs, padding=True, truncation=True, return_tensors="pt")
# 注意:ONNX模型输入输出名称需匹配,可通过model.config确认
outputs = ort_model(**inputs)
logits = outputs.logits[:, 0] # shape: (batch_size,)
实测对比:仅dense检索时Top3命中率62%,加入BM25+RRF后提到74%,再加重排序提到83%。额外延迟约200ms(10条),在可接受范围。
| 检索策略 | Top3命中率 | Top10命中率 | 额外延迟(ms) |
|---|---|---|---|
| 纯Dense (bge) | 62% | 78% | 40 |
| Dense + BM25 (RRF融合) | 74% | 88% | 120 (BM25检索+融合) |
| Dense + BM25 + Reranker | 83% | 93% | 320 (含rerank 200ms) |
Chunk切分的学问:重叠长度调了300次以后,我悟了
文档怎么切直接影响检索效果。起初我按照LangChain教程,用RecursiveCharacterTextSplitter设置chunk_size=500, chunk_overlap=50。但遇到FAQ文档,本来是一问一答非常完整,被硬生生切在答案中间,检索出来的片段只有半个答案,LLM只能胡编。后来我改为根据文档类型用不同策略:
- FAQ类:按问答对切分,每个问答作为一个chunk,利用“问”和“答”关键词拆分
- 商品详情页:按段落(双换行)切分,然后对过长段落再用句子分界(句号、问号)二次切割
- 政策文档:采用固定大小512 token,但重叠设为100,以保留上下文衔接
自己写切割器也不复杂:
import re
def split_by_qa(text: str) -> list:
"""将FAQ文本拆成问答对"""
qa_pairs = re.split(r'n(?=(?:问|Q)[::])', text)
chunks = []
for pair in qa_pairs:
if pair.strip():
chunks.append(pair.strip())
return chunks
def split_by_paragraph_and_sentence(text: str, max_length: int = 500) -> list:
"""先按段落,再按句子,确保最大长度"""
paragraphs = [p.strip() for p in text.split('nn') if p.strip()]
chunks = []
for para in paragraphs:
if len(para) <= max_length:
chunks.append(para)
else:
# 按句末标点切分
sentences = re.split(r'(?<=[。!?.!?])', para)
current = ""
for sent in sentences:
if len(current) + len(sent) <= max_length:
current += sent
else:
if current:
chunks.append(current)
current = sent
if current:
chunks.append(current)
return chunks
重叠是个双刃剑。设太小,关键信息可能被割裂;设太大,多个检索结果内容重复严重,浪费上下文窗口。我对着2000条测试问题反复实验,最终发现重叠长度为chunk大小的10-15%最好。比如chunk=500 token,重叠50-75。再多对召回率提升微乎其微,反而增加冗余。
另外,metadata很重要。每个chunk我都附带了原始文档ID、标题、类别,这些信息在检索后可以用于过滤和给LLM做引用,极大降低幻觉。Qdrant存储时使用payload:
from qdrant_client.models import PointStruct
points = [
PointStruct(
id=idx,
vector=emb.tolist(),
payload={
"text": chunk_text,
"doc_id": doc_id,
"title": title,
"category": "FAQ" # 或"product"等
}
) for idx, (emb, chunk_text, doc_id, title) in enumerate(zip(embeddings, chunks, ids, titles))
]
client.upsert(collection_name=COLLECTION_NAME, points=points)
一个Prompt模板提升了12%准确率,但前提是别犯这些错
LLM生成环节,起初我用的提示词是:“根据以下参考内容回答用户问题:{context}。问题:{query}”。结果模型经常无视上下文,自己造答案,还经常不引用来源。后来我改成严格的指令模板,要求必须基于给定信息,不知道就说不知道,并明确要求引用:
def build_prompt(query: str, contexts: list) -> str:
"""构建结构化提示,要求引用"""
context_str = ""
for i, ctx in enumerate(contexts, 1):
# 截断太长的片段,保留前400字
short_text = ctx['text'][:400] + ("..." if len(ctx['text']) > 400 else "")
context_str += f"[{i}] 标题:{ctx['title']}n{short_text}nn"
prompt = f"""你是一个电商客服助手,仅根据下面提供的参考信息回答问题。如果信息不足,请明确说“根据现有资料无法回答”。
参考信息:
{context_str}
用户问题:{query}
请生成回答,并在适当位置引用参考信息编号,例如 [1]。
回答:"""
return prompt
这个模板在500条测试集上把准确率从76%提到了88%。但有个细节:如果context总token数超过了模型上下文窗口(qwen-turbo是8k),必须做截断或压缩。我的处理方法是:按重排序后的分数倒序取context,累积token数不超过2500(留给问题和回答一些空间),超出的直接丢弃,因为分数低的本来就不那么相关。也可以用LLM对长文档做摘要,但增加了延迟和成本,没采用。
另外,我还加了few-shot示例在系统消息里,固化输出格式。比如:
messages = [
{"role": "system", "content": "你是一个严谨的客服助手,回答格式如下:...这里放示例... 务必引用编号。"},
{"role": "user", "content": final_prompt}
]
经过这些调整,幻觉率从大约15%降到了5%以下,而代价仅仅是多消耗了约100个 token 的指令。
上线当天Qdrant跪了:索引优化和并发调优实录
系统上线第一周很平稳,QPS也就个位数。但两周后运营搞了个促销活动,流量翻了5倍,突然客服接口开始大量超时。我一查,Qdrant的查询延迟从平时的20ms飙到了500ms以上,CPU使用率100%。登录Qdrant容器,发现它一直在做段合并(segment merging),磁盘IO爆表。
原来,我们持续写入新文档(客服每天会增加新的FAQ),Qdrant默认会把小段逐渐合并成大段以提高查询效率,但合并过程很吃资源,恰好与查询争抢。解决方案是调整优化器参数,让合并在低峰期进行,并给查询更高的优先级:
# 创建collection时设定优化参数
from qdrant_client.models import CollectionConfig, OptimizersConfigDiff, HnswConfigDiff
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=models.VectorParams(size=1024, distance=models.Distance.COSINE),
hnsw_config=HnswConfigDiff(m=16, ef_construct=200, ef_search=64), # 索引参数
optimizers_config=OptimizersConfigDiff(
default_segment_number=2, # 初始段数,减少合并频率
max_segment_size=50000, # 段大小阈值,适当增大以减少段数
flush_interval_sec=30,
indexing_threshold=10000 # 积累这么多向量后才建索引,避免小批量反复建
)
)
另外,ef_search默认是128,我们调成了64,查询速度提升了30%,精度损失几乎测不出来(Top10命中率从93%变成92.8%)。如果精度要求高,可以动态调大ef,比如对于复杂问题设置ef=128,简单问题64,但会增加代码复杂度,暂时没做。
我还犯了一个低级错误:每次查询都创建一个新的QdrantClient实例,没有复用连接。在高并发下,TCP连接数暴涨导致端口耗尽。改成全局单例后:
# 全局客户端,不要每次请求创建
client = QdrantClient(host="localhost", port=6333, timeout=10)
# 或者用异步客户端配合FastAPI
async_client = QdrantClient(host="localhost", port=6333, prefer_grpc=True)
关于向量量化,我开始没开,后来启用了Scalar Quantization(标量量化),将float32向量压缩成uint8,内存和磁盘占用直接降了75%,查询性能也略有提升(因为数据量小了)。代价是极小的精度损失。配置参数:
from qdrant_client.models import ScalarQuantizationConfig, ScalarType
client.update_collection(
collection_name=COLLECTION_NAME,
quantization_config=ScalarQuantizationConfig(
scalar=ScalarType.INT8,
always_ram=True # 量化后的向量也保留在内存中,加快查询
)
)
这一波调整后,Qdrant稳定在了20-40ms查询延迟,CPU峰值不超过60%。
从3.2秒到0.8秒的完整链路优化记录
我记录了整个RAG管道的每个步骤耗时(最初的版本,单次请求):
- Embedding(查询向量化): 520ms(CPU)
- Dense向量检索: 200ms(Qdrant)
- BM25检索: 120ms(含分词)
- 结果融合: <1ms
- 重排序(bge-reranker-large on CPU): 650ms (20条)
- 大模型生成(qwen-plus): 1700ms
- 总计约3190ms
看着这个数字,我知道不优化根本没脸见人。优化过程:
- GPU化:把embedding和reranker都搬到T4 GPU上,embedding降到15ms,reranker降到200ms(批量20条时)。
- 并行化:dense检索和BM25检索互不依赖,直接并发执行。用asyncio包装:
import asyncio
async def parallel_retrieval(query_emb, query_text, corpus, doc_ids):
dense_task = loop.run_in_executor(None, dense_search, query_emb)
bm25_task = loop.run_in_executor(None, bm25_search, query_text, corpus, doc_ids)
dense_res, bm25_res = await asyncio.gather(dense_task, bm25_task)
return dense_res, bm25_res
这样总检索时间从320ms降到 max(200, 120) ≈ 200ms。
import hashlib, redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_cache_or_compute(query: str, compute_func):
key = "rag:" + hashlib.md5(query.encode()).hexdigest()
cached = r.get(key)
if cached:
return cached
result = compute_func(query)
r.setex(key, 3600, result)
return result
优化后的耗时分布:
| 步骤 | 优化前 (ms) | 优化后 (ms) |
|---|---|---|
| 查询向量化 | 520 | 15 (GPU) |
| 检索(并行) | 320 | 200 (并行后取max) |
| 重排序 | 650 | 80 (换base模型) |
| LLM生成 | 1700 | 800 (换turbo) |
| 总计(含少量开销) | ~3190 | ~1100 (冷) / ~20 (缓存命中) |
注意,1100ms 包括了100ms的额外开销(JSON解析、网络等)。如果加上缓存命中,99%分位延迟在20ms。这已经远远超过了业务要求。
说实话,RAG优化是个无底洞,后续我还想引入多模态(图片搜索商品)、Agent查询数据库等,但至少在目前,这个系统已经在生产环境扛住了日活20万用户的正常流量。
最后想说,别迷信任何现成的RAG框架或“最佳实践”,每一个组件都值得你自己去折腾一遍,因为文档里不会告诉你实际数据量下的真实表现。我掉过的坑,希望你能绕开。