跳转至

Services 模块架构

本文档描述 ai_service/services 的整体架构、数据流与服务依赖关系,聚焦已实现的摄取与检索流程。

总体概览

RAG 处理链路由两个核心流程组成:

  1. 摄取流程:把文档转成可检索的文本块与向量
  2. 检索流程:把用户查询转成上下文供 LLM 使用
graph TB
    subgraph 摄取流程
        A[文档上传] --> B[MinIO 存储]
        B --> C[IngestionService]
        C --> D[Parser]
        D --> E[ChunkingStrategy]
        E --> F[EmbeddingService]
        F --> G[PostgreSQL]
        F --> H[Qdrant]
    end

    subgraph 检索流程
        I[用户查询] --> J[RAGRetrievalService]
        J --> K[EmbeddingService]
        K --> L[查询向量]
        L --> M[Qdrant 检索]
        M --> N[PostgreSQL 查询]
        N --> O[上下文格式化]
    end

    style C fill:#e1f5ff
    style J fill:#e1f5ff
    style E fill:#fff4e1
    style F fill:#fff4e1
    style K fill:#fff4e1

摄取流程

流程阶段

摄取流程包含以下阶段:

  1. 文档上传:API 写入文档记录,文件存储在 MinIO。
  2. 下载原文件IngestionService 从 MinIO 拉取文件内容。
  3. 文档解析:Parser 将字节流解析为纯文本与元数据。
  4. 文本分块ChunkingStrategy 生成可检索文本块(支持多种策略)。
  5. 向量生成EmbeddingService 批量生成向量。
  6. 存储写入:文本块写入 PostgreSQL,向量写入 Qdrant。

分块策略选择

系统支持五种分块策略,按优先级选择:

  1. 请求级process_document 传入的 chunking_strategy_name
  2. 知识源级:知识源的 default_chunking_strategy
  3. 系统默认fixed_size
sequenceDiagram
    participant Ingestion
    participant StrategyFactory
    participant ChunkingStrategy

    Ingestion->>Ingestion: 检查 chunking_strategy_name 参数
    alt 指定了策略
        Ingestion->>StrategyFactory: get_chunking_strategy(name, params)
    else 未指定策略
        Ingestion->>Ingestion: 读取知识源 default_chunking_strategy
        Ingestion->>StrategyFactory: get_chunking_strategy(source_default, params)
    end
    StrategyFactory-->>Ingestion: 返回策略实例
    Ingestion->>ChunkingStrategy: chunk_text(text, ...)
    ChunkingStrategy-->>Ingestion: 返回分块结果

数据流时序图

sequenceDiagram
    participant API
    participant MinIO
    participant Ingestion
    participant Parser
    participant StrategyFactory
    participant ChunkingStrategy
    participant Embedding
    participant PostgreSQL
    participant Qdrant

    API->>MinIO: 上传文档
    API->>PostgreSQL: 创建文档记录

    Ingestion->>PostgreSQL: 获取文档记录
    Ingestion->>MinIO: 下载文件
    MinIO-->>Ingestion: 返回文件字节

    Ingestion->>Parser: 解析文档
    Parser-->>Ingestion: 返回文本

    Ingestion->>StrategyFactory: 获取分块策略
    StrategyFactory-->>Ingestion: 返回策略实例
    Ingestion->>ChunkingStrategy: 文本分块
    ChunkingStrategy-->>Ingestion: 返回分块

    Ingestion->>Embedding: 批量生成向量
    Embedding-->>Ingestion: 返回向量

    Ingestion->>PostgreSQL: 写入文本块
    Ingestion->>Qdrant: 写入向量

    Ingestion->>PostgreSQL: 更新文档状态为 INDEXED

错误处理

  • 文档级错误:单个文档失败会标记为 FAILED,记录错误信息。
  • 任务级超时:任务超过总超时会整体失败。
  • 单文档超时:某文档超时不阻塞其他文档继续处理。
  • 无自动重试:需要手工重试失败文档或任务。

性能注意

  • 批量嵌入:按分块批量生成向量以提升效率。
  • 模型缓存:嵌入模型在内存中复用。
  • 顺序处理:当前为串行处理,后续可扩展并行。

检索流程

流程阶段

  1. 查询向量:将用户查询转成向量。
  2. 向量检索:在 Qdrant 中按相似度检索。
  3. 文本回表:根据 chunk_id 从 PostgreSQL 取回文本。
  4. 上下文格式化:拼接并控制上下文长度。

数据流时序图

sequenceDiagram
    participant Orchestrator
    participant RAGRetrieval
    participant Embedding
    participant PostgreSQL
    participant Qdrant

    Orchestrator->>RAGRetrieval: retrieve_context
    RAGRetrieval->>PostgreSQL: 查询 Agent 挂载的知识源
    PostgreSQL-->>RAGRetrieval: 返回 source_id 列表

    RAGRetrieval->>Embedding: 生成查询向量
    Embedding-->>RAGRetrieval: 返回向量

    RAGRetrieval->>Qdrant: 向量检索
    Qdrant-->>RAGRetrieval: 返回候选 chunk_id

    RAGRetrieval->>PostgreSQL: 批量查询文本块
    PostgreSQL-->>RAGRetrieval: 返回文本块

    RAGRetrieval->>RAGRetrieval: 格式化上下文
    RAGRetrieval-->>Orchestrator: 返回上下文字符串

过滤与排序

  • 知识源过滤:只检索 Agent 挂载的 source_id。
  • 阈值过滤score_threshold 过滤低相似度结果。
  • Top-K 限制top_k 控制返回数量。

服务依赖关系

graph TD
    A[IngestionService] --> B[ChunkingStrategy]
    A --> C[EmbeddingService]
    A --> D[MinIO]
    A --> E[PostgreSQL]
    A --> F[Qdrant]
    A --> G[Parser 子系统]

    H[RAGRetrievalService] --> C
    H --> E
    H --> F

    B --> B1[FixedSizeStrategy]
    B --> B2[SentenceStrategy]
    B --> B3[ParagraphStrategy]
    B --> B4[RecursiveStrategy]
    B --> B5[SemanticStrategy]

    style A fill:#e1f5ff
    style H fill:#e1f5ff
    style B fill:#fff4e1
    style C fill:#fff4e1
    style G fill:#fff4e1

服务职责

服务 职责 依赖
IngestionService 文档摄取编排 ChunkingStrategy, EmbeddingService, Parser, MinIO, PostgreSQL, Qdrant
ChunkingStrategy 文本分块策略 无(纯逻辑),支持 5 种策略
EmbeddingService 向量生成 sentence-transformers
RAGRetrievalService 知识检索 EmbeddingService, PostgreSQL, Qdrant
Parser 子系统 文档解析 具体格式库(PyPDF2, python-docx 等)

分块策略层次

graph TD
    A[BaseChunkingStrategy] --> B[FixedSizeChunkingStrategy]
    A --> C[SentenceChunkingStrategy]
    A --> D[ParagraphChunkingStrategy]
    A --> E[RecursiveChunkingStrategy]
    A --> F[SemanticChunkingStrategy]

    G[ChunkingService] --> B

    H[get_chunking_strategy] --> A

    style A fill:#e1f5ff
    style G fill:#fff4e1
    style H fill:#fff4e1
策略 说明 适用场景
FixedSize 固定大小分块 通用场景,向后兼容
Sentence 句子边界分块 新闻、文章
Paragraph 段落边界分块 技术文档
Recursive 递归层级分块 综合性能最佳
Semantic 语义相似度分块 实验性

存储层集成

PostgreSQL

  • Documents:文档元信息与状态
  • Chunks:文本块与关联信息(含 chunking_strategy 字段)
  • Knowledge Sources:知识源与挂载关系(含 default_chunking_strategy)
  • Ingestion Jobs:摄取任务状态与进度(含 chunking_strategy)

Qdrant

  • Collection:统一集合存储所有向量
  • Vectors:默认 384 维向量
  • Payload:chunk_id、source_id、document_id 等

MinIO

  • Bucket:统一桶存储原始文件
  • Object Key:UUID 形式的对象键

一致性策略

  • chunk_id 对齐:PostgreSQL 与 Qdrant 共享 chunk_id。
  • 事务一致性:PostgreSQL 写入采用事务。
  • 最终一致:Qdrant 写入为最终一致。
  • 策略追溯:每个 chunk 记录使用的 chunking_strategy 便于追溯。

配置要点

# 分块配置
config.chunking.size = 512                    # 每块字符数
config.chunking.overlap = 50                  # 重叠字符数
config.chunking.default_strategy = "recursive"  # 默认分块策略

# 句子策略配置
config.chunking.sentence_target_size = 512
config.chunking.sentence_max_per_chunk = 10

# 段落策略配置
config.chunking.paragraph_target_size = 1024
config.chunking.paragraph_max_per_chunk = 5

# 语义策略配置
config.chunking.semantic_similarity_threshold = 0.7
config.chunking.semantic_min_chunk_size = 200
config.chunking.semantic_max_chunk_size = 1000

# 嵌入配置
config.embedding.model = "paraphrase-multilingual-MiniLM-L12-v2"
config.embedding.dim = 384
config.embedding.offline_mode = True

# 超时配置
config.timeouts.embedding_model_load_seconds = 300
config.timeouts.ingestion_document_seconds = 600
config.timeouts.ingestion_job_seconds = 3600

性能特性

摄取性能

  • 解析:1-5 秒 / 文档(取决于格式和大小)
  • 分块:< 1 秒 / 文档(策略相关)
  • 嵌入:0.1-0.5 秒 / 分块(批量)
  • 写入:< 1 秒(PostgreSQL + Qdrant)

分块策略性能对比

策略 1 万字耗时 特点
FixedSize <10ms 最快
Sentence <50ms NLTK 分句
Paragraph <20ms 简单分割
Recursive <100ms 多层级处理
Semantic <100ms 当前为简化版

检索性能

  • 查询嵌入:0.1-0.3 秒
  • 向量检索:0.01-0.1 秒
  • 回表查询:0.01-0.05 秒

设计模式

策略模式(Strategy Pattern)

分块模块采用策略模式,便于扩展新的分块算法:

# 抽象基类
class BaseChunkingStrategy(ABC):
    @abstractmethod
    def chunk_text(self, text, ...): ...

# 具体策略
class FixedSizeChunkingStrategy(BaseChunkingStrategy): ...
class SentenceChunkingStrategy(BaseChunkingStrategy): ...

# 工厂函数
_strategy_registry = {
    "fixed_size": FixedSizeChunkingStrategy,
    "sentence": SentenceChunkingStrategy,
    ...
}

def get_chunking_strategy(strategy_type, params): ...

向后兼容

ChunkingService 作为遗留类保留,包装 FixedSizeChunkingStrategy

class ChunkingService:
    def __init__(self, chunk_size=None, chunk_overlap=None):
        self._strategy = FixedSizeChunkingStrategy(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )

下一步