跳转至

IngestionService 参考

IngestionService 是文档摄取编排服务,负责从下载到索引的完整流程。

概览

IngestionService 作为高层编排器,串联以下步骤:

  1. 从 MinIO 下载原始文档
  2. 解析文档提取文本
  3. 文本分块(支持多种策略)
  4. 生成向量
  5. 写入 PostgreSQL
  6. 写入 Qdrant

它同时处理错误恢复、进度更新与超时控制,支持单文档与批量任务。

配置

参数 类型 默认值 说明 环境变量
ingestion_document_seconds int 600 单文档处理超时 TIMEOUT_INGESTION_DOCUMENT_SECONDS
ingestion_job_seconds int 3600 任务整体超时 TIMEOUT_INGESTION_JOB_SECONDS

该服务也会使用其依赖服务(分块、嵌入、存储)的配置。

API 参考

类:IngestionService

class IngestionService:
    """Service for orchestrating document ingestion."""

    def __init__(
        self,
        *,
        minio_service: Optional[MinIOStorageService] = None,
        qdrant_service: Optional[QdrantVectorService] = None,
        chunking_service: Optional[ChunkingService] = None,
        embedding_service: Optional[EmbeddingService] = None,
    ) -> None:
        """Initialize the ingestion service.

        Args:
            minio_service: MinIO service instance
            qdrant_service: Qdrant service instance
            chunking_service: Chunking service instance
            embedding_service: Embedding service instance
        """

方法:process_document

处理单个文档的完整摄取流程。

def process_document(
    self,
    *,
    document_id: str,
    agent_id: Optional[str] = None,
    job_id: Optional[str] = None,
    chunking_strategy_name: Optional[str] = None,
    chunking_params_json: Optional[str] = None,
) -> tuple[int, int]:
    """Process a single document through the ingestion pipeline.

    Args:
        document_id: Document identifier
        agent_id: Optional agent ID for metadata
        job_id: Optional job ID for progress updates
        chunking_strategy_name: Chunking strategy to use
        chunking_params_json: JSON string of chunking parameters

    Returns:
        Tuple of chunks_created and chunks_indexed

    Raises:
        ValueError: If document not found or cannot be processed
    """

处理阶段

  1. 下载 MinIO 文件
  2. 解析文档
  3. 文本分块(使用指定策略)
  4. 生成向量
  5. 写入 PostgreSQL
  6. 写入 Qdrant
  7. 更新文档状态

分块策略选择优先级

  1. process_document 参数传入的策略
  2. 知识源的默认策略(source.default_chunking_strategy
  3. 系统默认策略(fixed_size

方法:run_ingestion_job

批量处理某个知识源的所有已上传文档。

def run_ingestion_job(self, *, job_id: str) -> None:
    """Run an ingestion job to process all uploaded documents.

    Args:
        job_id: Ingestion job identifier

    Raises:
        ValueError: If job not found
        TimeoutError: If job exceeds timeout
    """

特点

  • 处理所有 UPLOADED 文档
  • 每个文档处理后更新任务进度
  • 单文档超时不会阻断任务
  • 任务总超时会终止整体流程
  • 使用知识源或任务指定的分块策略

工厂函数:get_ingestion_service

def get_ingestion_service() -> IngestionService:
    """Get or create the singleton ingestion service instance."""

工具函数:cleanup_stale_ingestion_jobs

def cleanup_stale_ingestion_jobs() -> int:
    """Find and mark stale RUNNING jobs as FAILED on startup.

    Returns:
        Number of stale jobs cleaned up
    """

该方法用于服务重启后清理卡住的任务。

使用示例

处理单个文档(使用指定策略)

import json
from ai_service.services.ingestion import get_ingestion_service

ingestion_service = get_ingestion_service()

# 使用句子级分块策略
chunking_params = {
    "target_size": 512,
    "max_sentences_per_chunk": 10
}

try:
    chunks_created, chunks_indexed = ingestion_service.process_document(
        document_id="doc-123",
        agent_id="agent-456",
        chunking_strategy_name="sentence",
        chunking_params_json=json.dumps(chunking_params)
    )
    print(f"Success: {chunks_created} chunks created")
except ValueError as e:
    print(f"Document error: {e}")
except Exception as e:
    print(f"Processing failed: {e}")

处理单个文档(使用知识源默认策略)

from ai_service.services.ingestion import get_ingestion_service

ingestion_service = get_ingestion_service()

# 不指定策略,将使用知识源的 default_chunking_strategy
try:
    chunks_created, chunks_indexed = ingestion_service.process_document(
        document_id="doc-123",
        agent_id="agent-456"
    )
    print(f"Success: {chunks_created} chunks created")
except Exception as e:
    print(f"Processing failed: {e}")

运行批量任务

try:
    ingestion_service.run_ingestion_job(job_id="job-789")
    print("Job completed successfully")
except TimeoutError as e:
    print(f"Job timed out: {e}")
except Exception as e:
    print(f"Job failed: {e}")

自定义依赖

from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import ChunkingService

custom_chunking = ChunkingService(chunk_size=500, chunk_overlap=100)
ingestion_service = IngestionService(chunking_service=custom_chunking)

chunks_created, _ = ingestion_service.process_document(document_id="doc-123")

流程说明

文档处理状态机

stateDiagram-v2
    [*] --> UPLOADED: 文档已上传
    UPLOADED --> PROCESSING: 开始处理
    PROCESSING --> Download: 从 MinIO 下载
    Download --> Parse: 解析文本
    Parse --> Chunk: 文本分块(策略选择)
    Chunk --> Embed: 生成向量
    Embed --> Store: 写入存储
    Store --> INDEXED: 成功完成

    Download --> FAILED: 失败
    Parse --> FAILED: 失败
    Chunk --> FAILED: 失败
    Embed --> FAILED: 失败
    Store --> FAILED: 失败

    INDEXED --> [*]
    FAILED --> [*]

任务处理流程

sequenceDiagram
    participant Job
    participant Ingestion
    participant Document
    participant Strategy

    Job->>Ingestion: run_ingestion_job
    Ingestion->>Ingestion: 查询 UPLOADED 文档

    loop 每个文档
        Ingestion->>Ingestion: 解析 chunking_strategy_name
        alt 指定了策略
            Ingestion->>Strategy: 使用指定策略
        else 未指定策略
            Ingestion->>Strategy: 使用知识源默认策略
        end
        Ingestion->>Document: process_document
        Document-->>Ingestion: chunks_created, chunks_indexed
        Ingestion->>Ingestion: 更新进度
    end

    Ingestion->>Job: 状态更新为 SUCCEEDED

分块策略选择

在知识源级别设置默认策略

创建知识源时指定默认分块策略:

from ai_service.storage.models import create_knowledge_source_record
from ai_service.utils.database import SessionLocal
import json

db = SessionLocal()
source = create_knowledge_source_record(
    db_session=db,
    name="技术文档库",
    description="产品技术文档",
    default_chunking_strategy="paragraph",
    default_chunking_params=json.dumps({
        "target_size": 1024,
        "max_paragraphs_per_chunk": 5
    })
)
db.close()

在任务级别覆盖策略

创建摄取任务时覆盖分块策略:

from ai_service.storage.models import create_ingestion_job_record
from ai_service.utils.database import SessionLocal
import json

db = SessionLocal()
job = create_ingestion_job_record(
    db_session=db,
    source_id="source-123",
    chunking_strategy="sentence",
    chunking_params=json.dumps({
        "target_size": 512,
        "max_sentences_per_chunk": 8
    })
)
db.close()

在文档级别覆盖策略

处理单个文档时指定策略:

import json

ingestion_service.process_document(
    document_id="doc-123",
    chunking_strategy_name="recursive",
    chunking_params_json=json.dumps({
        "chunk_size": 512,
        "chunk_overlap": 50
    })
)

错误处理

文档级错误

  • 文档状态设置为 FAILED
  • 错误信息写入文档记录
  • 任务继续处理其他文档

任务级错误

  • 任务状态设置为 FAILED
  • 已成功的文档保留索引结果
  • 失败任务可重新执行

超时控制

# 默认 600 秒
config.timeouts.ingestion_document_seconds = 600

# 默认 3600 秒
config.timeouts.ingestion_job_seconds = 3600

进度更新

任务进度会实时写入数据库:

job.documents_total = 10      # 总文档数
job.documents_done = 5        # 已完成文档数
job.chunks_total = 150        # 总分块数
job.chunks_done = 150         # 已索引分块数
job.status_message = "Processing: document.pdf"  # 当前阶段描述
job.chunking_strategy = "sentence"  # 使用的分块策略

最佳实践

任务启动前检查

from ai_service.services.embedding import get_embedding_service

result = get_embedding_service().check_readiness()
if not result.is_ready:
    print(f"Model not ready: {result.error_message}")

选择合适的分块策略

场景 推荐策略 典型参数
通用文档 recursive chunk_size=512, overlap=50
新闻/文章 sentence target_size=512, max_sentences=10
技术文档 paragraph target_size=1024, max_paragraphs=5
快速处理 fixed_size chunk_size=512, overlap=50

任务监控

from ai_service.storage.models import get_ingestion_job_record
from ai_service.utils.database import SessionLocal

db = SessionLocal()
job = get_ingestion_job_record(db, job_id="job-789")
print(f"Progress: {job.documents_done}/{job.documents_total}")
print(f"Status: {job.status_message}")
print(f"Chunking Strategy: {job.chunking_strategy}")
db.close()

失败重试

from ai_service.storage.models import list_documents_by_status, DocumentStatus

failed_docs = list_documents_by_status(
    db,
    source_id="source-123",
    status=DocumentStatus.FAILED.value
)

for doc in failed_docs:
    try:
        ingestion_service.process_document(document_id=doc.id)
    except Exception as e:
        print(f"Retry failed: {e}")

性能参考

阶段 耗时 备注
下载 0.5-2s 取决于文件大小
解析 1-5s 取决于格式与大小
分块 <1s 纯内存操作,策略相关
嵌入 0.1-0.5s / 块 批量处理
写入 <1s PostgreSQL + Qdrant
总计 2-10s 常见范围

分块策略性能对比(1 万字文档):

策略 耗时 特点
fixed_size <10ms 最快
sentence <50ms NLTK 分句
paragraph <20ms 简单分割
recursive <100ms 多层级处理
semantic <100ms 当前为简化版

常见问题

问题:文档处理超时

现象TimeoutError

原因:超大文档或依赖服务响应慢

解决

  • 提高 ingestion_document_seconds
  • 上传前拆分文档
  • 检查 MinIO 与 Qdrant 网络

问题:任务卡在 RUNNING

原因:服务中途崩溃

解决

from ai_service.services.ingestion import cleanup_stale_ingestion_jobs
cleaned = cleanup_stale_ingestion_jobs()
print(f"Cleaned up {cleaned} stale jobs")

问题:解析失败

原因:不支持的格式

解决:确认支持的 .txt.pdf.docx.md

问题:分块为空

原因:解析结果为空

解决:检查文档内容与解析日志。

问题:分块策略无效

现象ValueError: Unknown chunking strategy

解决:检查策略名称,使用以下之一: fixed_size, sentence, paragraph, semantic, recursive

# 正确的策略名称
valid_strategies = ["fixed_size", "sentence", "paragraph", "semantic", "recursive"]

关联文档