Services 模块架构
本文档描述 ai_service/services 的整体架构、数据流与服务依赖关系,聚焦已实现的摄取与检索流程。
总体概览
RAG 处理链路由两个核心流程组成:
- 摄取流程:把文档转成可检索的文本块与向量
- 检索流程:把用户查询转成上下文供 LLM 使用
graph TB
subgraph 摄取流程
A[文档上传] --> B[MinIO 存储]
B --> C[IngestionService]
C --> D[Parser]
D --> E[ChunkingStrategy]
E --> F[EmbeddingService]
F --> G[PostgreSQL]
F --> H[Qdrant]
end
subgraph 检索流程
I[用户查询] --> J[RAGRetrievalService]
J --> K[EmbeddingService]
K --> L[查询向量]
L --> M[Qdrant 检索]
M --> N[PostgreSQL 查询]
N --> O[上下文格式化]
end
style C fill:#e1f5ff
style J fill:#e1f5ff
style E fill:#fff4e1
style F fill:#fff4e1
style K fill:#fff4e1
摄取流程
流程阶段
摄取流程包含以下阶段:
- 文档上传:API 写入文档记录,文件存储在 MinIO。
- 下载原文件:
IngestionService从 MinIO 拉取文件内容。 - 文档解析:Parser 将字节流解析为纯文本与元数据。
- 文本分块:
ChunkingStrategy生成可检索文本块(支持多种策略)。 - 向量生成:
EmbeddingService批量生成向量。 - 存储写入:文本块写入 PostgreSQL,向量写入 Qdrant。
分块策略选择
系统支持五种分块策略,按优先级选择:
- 请求级:
process_document传入的chunking_strategy_name - 知识源级:知识源的
default_chunking_strategy - 系统默认:
fixed_size
sequenceDiagram
participant Ingestion
participant StrategyFactory
participant ChunkingStrategy
Ingestion->>Ingestion: 检查 chunking_strategy_name 参数
alt 指定了策略
Ingestion->>StrategyFactory: get_chunking_strategy(name, params)
else 未指定策略
Ingestion->>Ingestion: 读取知识源 default_chunking_strategy
Ingestion->>StrategyFactory: get_chunking_strategy(source_default, params)
end
StrategyFactory-->>Ingestion: 返回策略实例
Ingestion->>ChunkingStrategy: chunk_text(text, ...)
ChunkingStrategy-->>Ingestion: 返回分块结果
数据流时序图
sequenceDiagram
participant API
participant MinIO
participant Ingestion
participant Parser
participant StrategyFactory
participant ChunkingStrategy
participant Embedding
participant PostgreSQL
participant Qdrant
API->>MinIO: 上传文档
API->>PostgreSQL: 创建文档记录
Ingestion->>PostgreSQL: 获取文档记录
Ingestion->>MinIO: 下载文件
MinIO-->>Ingestion: 返回文件字节
Ingestion->>Parser: 解析文档
Parser-->>Ingestion: 返回文本
Ingestion->>StrategyFactory: 获取分块策略
StrategyFactory-->>Ingestion: 返回策略实例
Ingestion->>ChunkingStrategy: 文本分块
ChunkingStrategy-->>Ingestion: 返回分块
Ingestion->>Embedding: 批量生成向量
Embedding-->>Ingestion: 返回向量
Ingestion->>PostgreSQL: 写入文本块
Ingestion->>Qdrant: 写入向量
Ingestion->>PostgreSQL: 更新文档状态为 INDEXED
错误处理
- 文档级错误:单个文档失败会标记为
FAILED,记录错误信息。 - 任务级超时:任务超过总超时会整体失败。
- 单文档超时:某文档超时不阻塞其他文档继续处理。
- 无自动重试:需要手工重试失败文档或任务。
性能注意
- 批量嵌入:按分块批量生成向量以提升效率。
- 模型缓存:嵌入模型在内存中复用。
- 顺序处理:当前为串行处理,后续可扩展并行。
检索流程
流程阶段
- 查询向量:将用户查询转成向量。
- 向量检索:在 Qdrant 中按相似度检索。
- 文本回表:根据 chunk_id 从 PostgreSQL 取回文本。
- 上下文格式化:拼接并控制上下文长度。
数据流时序图
sequenceDiagram
participant Orchestrator
participant RAGRetrieval
participant Embedding
participant PostgreSQL
participant Qdrant
Orchestrator->>RAGRetrieval: retrieve_context
RAGRetrieval->>PostgreSQL: 查询 Agent 挂载的知识源
PostgreSQL-->>RAGRetrieval: 返回 source_id 列表
RAGRetrieval->>Embedding: 生成查询向量
Embedding-->>RAGRetrieval: 返回向量
RAGRetrieval->>Qdrant: 向量检索
Qdrant-->>RAGRetrieval: 返回候选 chunk_id
RAGRetrieval->>PostgreSQL: 批量查询文本块
PostgreSQL-->>RAGRetrieval: 返回文本块
RAGRetrieval->>RAGRetrieval: 格式化上下文
RAGRetrieval-->>Orchestrator: 返回上下文字符串
过滤与排序
- 知识源过滤:只检索 Agent 挂载的 source_id。
- 阈值过滤:
score_threshold过滤低相似度结果。 - Top-K 限制:
top_k控制返回数量。
服务依赖关系
graph TD
A[IngestionService] --> B[ChunkingStrategy]
A --> C[EmbeddingService]
A --> D[MinIO]
A --> E[PostgreSQL]
A --> F[Qdrant]
A --> G[Parser 子系统]
H[RAGRetrievalService] --> C
H --> E
H --> F
B --> B1[FixedSizeStrategy]
B --> B2[SentenceStrategy]
B --> B3[ParagraphStrategy]
B --> B4[RecursiveStrategy]
B --> B5[SemanticStrategy]
style A fill:#e1f5ff
style H fill:#e1f5ff
style B fill:#fff4e1
style C fill:#fff4e1
style G fill:#fff4e1
服务职责
| 服务 | 职责 | 依赖 |
|---|---|---|
| IngestionService | 文档摄取编排 | ChunkingStrategy, EmbeddingService, Parser, MinIO, PostgreSQL, Qdrant |
| ChunkingStrategy | 文本分块策略 | 无(纯逻辑),支持 5 种策略 |
| EmbeddingService | 向量生成 | sentence-transformers |
| RAGRetrievalService | 知识检索 | EmbeddingService, PostgreSQL, Qdrant |
| Parser 子系统 | 文档解析 | 具体格式库(PyPDF2, python-docx 等) |
分块策略层次
graph TD
A[BaseChunkingStrategy] --> B[FixedSizeChunkingStrategy]
A --> C[SentenceChunkingStrategy]
A --> D[ParagraphChunkingStrategy]
A --> E[RecursiveChunkingStrategy]
A --> F[SemanticChunkingStrategy]
G[ChunkingService] --> B
H[get_chunking_strategy] --> A
style A fill:#e1f5ff
style G fill:#fff4e1
style H fill:#fff4e1
| 策略 | 说明 | 适用场景 |
|---|---|---|
| FixedSize | 固定大小分块 | 通用场景,向后兼容 |
| Sentence | 句子边界分块 | 新闻、文章 |
| Paragraph | 段落边界分块 | 技术文档 |
| Recursive | 递归层级分块 | 综合性能最佳 |
| Semantic | 语义相似度分块 | 实验性 |
存储层集成
PostgreSQL
- Documents:文档元信息与状态
- Chunks:文本块与关联信息(含 chunking_strategy 字段)
- Knowledge Sources:知识源与挂载关系(含 default_chunking_strategy)
- Ingestion Jobs:摄取任务状态与进度(含 chunking_strategy)
Qdrant
- Collection:统一集合存储所有向量
- Vectors:默认 384 维向量
- Payload:chunk_id、source_id、document_id 等
MinIO
- Bucket:统一桶存储原始文件
- Object Key:UUID 形式的对象键
一致性策略
- chunk_id 对齐:PostgreSQL 与 Qdrant 共享 chunk_id。
- 事务一致性:PostgreSQL 写入采用事务。
- 最终一致:Qdrant 写入为最终一致。
- 策略追溯:每个 chunk 记录使用的 chunking_strategy 便于追溯。
配置要点
# 分块配置
config.chunking.size = 512 # 每块字符数
config.chunking.overlap = 50 # 重叠字符数
config.chunking.default_strategy = "recursive" # 默认分块策略
# 句子策略配置
config.chunking.sentence_target_size = 512
config.chunking.sentence_max_per_chunk = 10
# 段落策略配置
config.chunking.paragraph_target_size = 1024
config.chunking.paragraph_max_per_chunk = 5
# 语义策略配置
config.chunking.semantic_similarity_threshold = 0.7
config.chunking.semantic_min_chunk_size = 200
config.chunking.semantic_max_chunk_size = 1000
# 嵌入配置
config.embedding.model = "paraphrase-multilingual-MiniLM-L12-v2"
config.embedding.dim = 384
config.embedding.offline_mode = True
# 超时配置
config.timeouts.embedding_model_load_seconds = 300
config.timeouts.ingestion_document_seconds = 600
config.timeouts.ingestion_job_seconds = 3600
性能特性
摄取性能
- 解析:1-5 秒 / 文档(取决于格式和大小)
- 分块:< 1 秒 / 文档(策略相关)
- 嵌入:0.1-0.5 秒 / 分块(批量)
- 写入:< 1 秒(PostgreSQL + Qdrant)
分块策略性能对比
| 策略 | 1 万字耗时 | 特点 |
|---|---|---|
| FixedSize | <10ms | 最快 |
| Sentence | <50ms | NLTK 分句 |
| Paragraph | <20ms | 简单分割 |
| Recursive | <100ms | 多层级处理 |
| Semantic | <100ms | 当前为简化版 |
检索性能
- 查询嵌入:0.1-0.3 秒
- 向量检索:0.01-0.1 秒
- 回表查询:0.01-0.05 秒
设计模式
策略模式(Strategy Pattern)
分块模块采用策略模式,便于扩展新的分块算法:
# 抽象基类
class BaseChunkingStrategy(ABC):
@abstractmethod
def chunk_text(self, text, ...): ...
# 具体策略
class FixedSizeChunkingStrategy(BaseChunkingStrategy): ...
class SentenceChunkingStrategy(BaseChunkingStrategy): ...
# 工厂函数
_strategy_registry = {
"fixed_size": FixedSizeChunkingStrategy,
"sentence": SentenceChunkingStrategy,
...
}
def get_chunking_strategy(strategy_type, params): ...
向后兼容
ChunkingService 作为遗留类保留,包装 FixedSizeChunkingStrategy:
class ChunkingService:
def __init__(self, chunk_size=None, chunk_overlap=None):
self._strategy = FixedSizeChunkingStrategy(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
下一步
- ChunkingService - 分块算法与配置
- EmbeddingService - 嵌入模型加载
- IngestionService - 摄取编排
- RAGRetrievalService - 检索与上下文
- 配置参考 - 全量配置项