IngestionService 参考
IngestionService 是文档摄取编排服务,负责从下载到索引的完整流程。
概览
IngestionService 作为高层编排器,串联以下步骤:
- 从 MinIO 下载原始文档
- 解析文档提取文本
- 文本分块(支持多种策略)
- 生成向量
- 写入 PostgreSQL
- 写入 Qdrant
它同时处理错误恢复、进度更新与超时控制,支持单文档与批量任务。
配置
| 参数 | 类型 | 默认值 | 说明 | 环境变量 |
|---|---|---|---|---|
ingestion_document_seconds |
int | 600 | 单文档处理超时 | TIMEOUT_INGESTION_DOCUMENT_SECONDS |
ingestion_job_seconds |
int | 3600 | 任务整体超时 | TIMEOUT_INGESTION_JOB_SECONDS |
该服务也会使用其依赖服务(分块、嵌入、存储)的配置。
API 参考
类:IngestionService
class IngestionService:
"""Service for orchestrating document ingestion."""
def __init__(
self,
*,
minio_service: Optional[MinIOStorageService] = None,
qdrant_service: Optional[QdrantVectorService] = None,
chunking_service: Optional[ChunkingService] = None,
embedding_service: Optional[EmbeddingService] = None,
) -> None:
"""Initialize the ingestion service.
Args:
minio_service: MinIO service instance
qdrant_service: Qdrant service instance
chunking_service: Chunking service instance
embedding_service: Embedding service instance
"""
方法:process_document
处理单个文档的完整摄取流程。
def process_document(
self,
*,
document_id: str,
agent_id: Optional[str] = None,
job_id: Optional[str] = None,
chunking_strategy_name: Optional[str] = None,
chunking_params_json: Optional[str] = None,
) -> tuple[int, int]:
"""Process a single document through the ingestion pipeline.
Args:
document_id: Document identifier
agent_id: Optional agent ID for metadata
job_id: Optional job ID for progress updates
chunking_strategy_name: Chunking strategy to use
chunking_params_json: JSON string of chunking parameters
Returns:
Tuple of chunks_created and chunks_indexed
Raises:
ValueError: If document not found or cannot be processed
"""
处理阶段:
- 下载 MinIO 文件
- 解析文档
- 文本分块(使用指定策略)
- 生成向量
- 写入 PostgreSQL
- 写入 Qdrant
- 更新文档状态
分块策略选择优先级:
process_document参数传入的策略- 知识源的默认策略(
source.default_chunking_strategy) - 系统默认策略(
fixed_size)
方法:run_ingestion_job
批量处理某个知识源的所有已上传文档。
def run_ingestion_job(self, *, job_id: str) -> None:
"""Run an ingestion job to process all uploaded documents.
Args:
job_id: Ingestion job identifier
Raises:
ValueError: If job not found
TimeoutError: If job exceeds timeout
"""
特点:
- 处理所有
UPLOADED文档 - 每个文档处理后更新任务进度
- 单文档超时不会阻断任务
- 任务总超时会终止整体流程
- 使用知识源或任务指定的分块策略
工厂函数:get_ingestion_service
def get_ingestion_service() -> IngestionService:
"""Get or create the singleton ingestion service instance."""
工具函数:cleanup_stale_ingestion_jobs
def cleanup_stale_ingestion_jobs() -> int:
"""Find and mark stale RUNNING jobs as FAILED on startup.
Returns:
Number of stale jobs cleaned up
"""
该方法用于服务重启后清理卡住的任务。
使用示例
处理单个文档(使用指定策略)
import json
from ai_service.services.ingestion import get_ingestion_service
ingestion_service = get_ingestion_service()
# 使用句子级分块策略
chunking_params = {
"target_size": 512,
"max_sentences_per_chunk": 10
}
try:
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id="doc-123",
agent_id="agent-456",
chunking_strategy_name="sentence",
chunking_params_json=json.dumps(chunking_params)
)
print(f"Success: {chunks_created} chunks created")
except ValueError as e:
print(f"Document error: {e}")
except Exception as e:
print(f"Processing failed: {e}")
处理单个文档(使用知识源默认策略)
from ai_service.services.ingestion import get_ingestion_service
ingestion_service = get_ingestion_service()
# 不指定策略,将使用知识源的 default_chunking_strategy
try:
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id="doc-123",
agent_id="agent-456"
)
print(f"Success: {chunks_created} chunks created")
except Exception as e:
print(f"Processing failed: {e}")
运行批量任务
try:
ingestion_service.run_ingestion_job(job_id="job-789")
print("Job completed successfully")
except TimeoutError as e:
print(f"Job timed out: {e}")
except Exception as e:
print(f"Job failed: {e}")
自定义依赖
from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import ChunkingService
custom_chunking = ChunkingService(chunk_size=500, chunk_overlap=100)
ingestion_service = IngestionService(chunking_service=custom_chunking)
chunks_created, _ = ingestion_service.process_document(document_id="doc-123")
流程说明
文档处理状态机
stateDiagram-v2
[*] --> UPLOADED: 文档已上传
UPLOADED --> PROCESSING: 开始处理
PROCESSING --> Download: 从 MinIO 下载
Download --> Parse: 解析文本
Parse --> Chunk: 文本分块(策略选择)
Chunk --> Embed: 生成向量
Embed --> Store: 写入存储
Store --> INDEXED: 成功完成
Download --> FAILED: 失败
Parse --> FAILED: 失败
Chunk --> FAILED: 失败
Embed --> FAILED: 失败
Store --> FAILED: 失败
INDEXED --> [*]
FAILED --> [*]
任务处理流程
sequenceDiagram
participant Job
participant Ingestion
participant Document
participant Strategy
Job->>Ingestion: run_ingestion_job
Ingestion->>Ingestion: 查询 UPLOADED 文档
loop 每个文档
Ingestion->>Ingestion: 解析 chunking_strategy_name
alt 指定了策略
Ingestion->>Strategy: 使用指定策略
else 未指定策略
Ingestion->>Strategy: 使用知识源默认策略
end
Ingestion->>Document: process_document
Document-->>Ingestion: chunks_created, chunks_indexed
Ingestion->>Ingestion: 更新进度
end
Ingestion->>Job: 状态更新为 SUCCEEDED
分块策略选择
在知识源级别设置默认策略
创建知识源时指定默认分块策略:
from ai_service.storage.models import create_knowledge_source_record
from ai_service.utils.database import SessionLocal
import json
db = SessionLocal()
source = create_knowledge_source_record(
db_session=db,
name="技术文档库",
description="产品技术文档",
default_chunking_strategy="paragraph",
default_chunking_params=json.dumps({
"target_size": 1024,
"max_paragraphs_per_chunk": 5
})
)
db.close()
在任务级别覆盖策略
创建摄取任务时覆盖分块策略:
from ai_service.storage.models import create_ingestion_job_record
from ai_service.utils.database import SessionLocal
import json
db = SessionLocal()
job = create_ingestion_job_record(
db_session=db,
source_id="source-123",
chunking_strategy="sentence",
chunking_params=json.dumps({
"target_size": 512,
"max_sentences_per_chunk": 8
})
)
db.close()
在文档级别覆盖策略
处理单个文档时指定策略:
import json
ingestion_service.process_document(
document_id="doc-123",
chunking_strategy_name="recursive",
chunking_params_json=json.dumps({
"chunk_size": 512,
"chunk_overlap": 50
})
)
错误处理
文档级错误
- 文档状态设置为
FAILED - 错误信息写入文档记录
- 任务继续处理其他文档
任务级错误
- 任务状态设置为
FAILED - 已成功的文档保留索引结果
- 失败任务可重新执行
超时控制
# 默认 600 秒
config.timeouts.ingestion_document_seconds = 600
# 默认 3600 秒
config.timeouts.ingestion_job_seconds = 3600
进度更新
任务进度会实时写入数据库:
job.documents_total = 10 # 总文档数
job.documents_done = 5 # 已完成文档数
job.chunks_total = 150 # 总分块数
job.chunks_done = 150 # 已索引分块数
job.status_message = "Processing: document.pdf" # 当前阶段描述
job.chunking_strategy = "sentence" # 使用的分块策略
最佳实践
任务启动前检查
from ai_service.services.embedding import get_embedding_service
result = get_embedding_service().check_readiness()
if not result.is_ready:
print(f"Model not ready: {result.error_message}")
选择合适的分块策略
| 场景 | 推荐策略 | 典型参数 |
|---|---|---|
| 通用文档 | recursive |
chunk_size=512, overlap=50 |
| 新闻/文章 | sentence |
target_size=512, max_sentences=10 |
| 技术文档 | paragraph |
target_size=1024, max_paragraphs=5 |
| 快速处理 | fixed_size |
chunk_size=512, overlap=50 |
任务监控
from ai_service.storage.models import get_ingestion_job_record
from ai_service.utils.database import SessionLocal
db = SessionLocal()
job = get_ingestion_job_record(db, job_id="job-789")
print(f"Progress: {job.documents_done}/{job.documents_total}")
print(f"Status: {job.status_message}")
print(f"Chunking Strategy: {job.chunking_strategy}")
db.close()
失败重试
from ai_service.storage.models import list_documents_by_status, DocumentStatus
failed_docs = list_documents_by_status(
db,
source_id="source-123",
status=DocumentStatus.FAILED.value
)
for doc in failed_docs:
try:
ingestion_service.process_document(document_id=doc.id)
except Exception as e:
print(f"Retry failed: {e}")
性能参考
| 阶段 | 耗时 | 备注 |
|---|---|---|
| 下载 | 0.5-2s | 取决于文件大小 |
| 解析 | 1-5s | 取决于格式与大小 |
| 分块 | <1s | 纯内存操作,策略相关 |
| 嵌入 | 0.1-0.5s / 块 | 批量处理 |
| 写入 | <1s | PostgreSQL + Qdrant |
| 总计 | 2-10s | 常见范围 |
分块策略性能对比(1 万字文档):
| 策略 | 耗时 | 特点 |
|---|---|---|
| fixed_size | <10ms | 最快 |
| sentence | <50ms | NLTK 分句 |
| paragraph | <20ms | 简单分割 |
| recursive | <100ms | 多层级处理 |
| semantic | <100ms | 当前为简化版 |
常见问题
问题:文档处理超时
现象:TimeoutError
原因:超大文档或依赖服务响应慢
解决:
- 提高
ingestion_document_seconds - 上传前拆分文档
- 检查 MinIO 与 Qdrant 网络
问题:任务卡在 RUNNING
原因:服务中途崩溃
解决:
from ai_service.services.ingestion import cleanup_stale_ingestion_jobs
cleaned = cleanup_stale_ingestion_jobs()
print(f"Cleaned up {cleaned} stale jobs")
问题:解析失败
原因:不支持的格式
解决:确认支持的 .txt、.pdf、.docx 与 .md。
问题:分块为空
原因:解析结果为空
解决:检查文档内容与解析日志。
问题:分块策略无效
现象:ValueError: Unknown chunking strategy
解决:检查策略名称,使用以下之一:
fixed_size, sentence, paragraph, semantic, recursive
# 正确的策略名称
valid_strategies = ["fixed_size", "sentence", "paragraph", "semantic", "recursive"]