集成指南
本页提供 Services 模块的常见集成模式与实践示例。
基础集成模式
模式 1:单文档摄取
from ai_service.services.ingestion import get_ingestion_service
def ingest_document(document_id: str, agent_id: str) -> bool:
"""为指定 Agent 摄取文档。"""
try:
ingestion_service = get_ingestion_service()
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id=document_id,
agent_id=agent_id
)
print(f"Success: {chunks_created} chunks indexed")
return True
except Exception as e:
print(f"Ingestion failed: {e}")
return False
模式 2:RAG 增强查询
from ai_service.services.rag_retrieval import get_rag_service
def answer_with_context(query: str, agent_id: str) -> str:
"""使用检索上下文构造提示词。"""
rag_service = get_rag_service()
context = rag_service.retrieve_and_format(
query=query,
agent_id=agent_id,
top_k=5,
max_context_length=3000
)
if context:
prompt = f"""Context:
{context}
Question: {query}
Answer based on the context above:"""
else:
prompt = f"Question: {query}\n\nAnswer:"
return prompt
模式 3:批量摄取
from ai_service.services.ingestion import get_ingestion_service
from ai_service.storage.models import list_documents_by_status, DocumentStatus
from ai_service.utils.database import SessionLocal
def batch_ingest_source(source_id: str) -> dict:
"""摄取某知识源下的所有已上传文档。"""
db = SessionLocal()
ingestion_service = get_ingestion_service()
try:
documents = list_documents_by_status(
db, source_id=source_id, status=DocumentStatus.UPLOADED.value
)
results = {"success": 0, "failed": 0}
for doc in documents:
try:
ingestion_service.process_document(document_id=doc.id)
results["success"] += 1
except Exception as e:
print(f"Failed {doc.filename}: {e}")
results["failed"] += 1
return results
finally:
db.close()
分块策略集成
使用特定分块策略摄取
import json
from ai_service.services.ingestion import get_ingestion_service
def ingest_with_sentence_chunking(document_id: str, agent_id: str) -> bool:
"""使用句子级分块策略摄取文档。"""
try:
ingestion_service = get_ingestion_service()
# 配置句子级分块参数
chunking_params = {
"target_size": 512,
"max_sentences_per_chunk": 10
}
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id=document_id,
agent_id=agent_id,
chunking_strategy_name="sentence",
chunking_params_json=json.dumps(chunking_params)
)
print(f"Success: {chunks_created} chunks indexed using sentence strategy")
return True
except Exception as e:
print(f"Ingestion failed: {e}")
return False
使用递归分块策略
import json
from ai_service.services.ingestion import get_ingestion_service
def ingest_with_recursive_chunking(document_id: str, agent_id: str) -> bool:
"""使用递归层级分块策略摄取文档。"""
try:
ingestion_service = get_ingestion_service()
# 配置递归分块参数
chunking_params = {
"chunk_size": 512,
"chunk_overlap": 50,
"separators": ["\n\n", "\n", ". ", " ", ""]
}
chunks_created, chunks_indexed = ingestion_service.process_document(
document_id=document_id,
agent_id=agent_id,
chunking_strategy_name="recursive",
chunking_params_json=json.dumps(chunking_params)
)
print(f"Success: {chunks_created} chunks indexed using recursive strategy")
return True
except Exception as e:
print(f"Ingestion failed: {e}")
return False
直接使用分块策略
from ai_service.services.chunking import get_chunking_strategy
def chunk_document(text: str, document_id: str, strategy_name: str = "fixed_size"):
"""使用指定策略分块文档。"""
# 获取策略实例
strategy = get_chunking_strategy(strategy_name)
# 分块
chunks = strategy.chunk_text(
text=text,
document_id=document_id,
document_name="example.txt",
source_id="source-123",
extra_metadata={"processed_by": "integration-guide"}
)
print(f"Strategy: {strategy.strategy_name}")
print(f"Created {len(chunks)} chunks")
for chunk in chunks:
print(f" Chunk {chunk.chunk_index}: {len(chunk.content)} chars")
return chunks
# 使用不同策略
chunk_document("Long text here...", "doc-1", "sentence")
chunk_document("Long text here...", "doc-1", "paragraph")
chunk_document("Long text here...", "doc-1", "recursive")
知识源默认策略
from ai_service.storage.models import create_knowledge_source_record
from ai_service.utils.database import SessionLocal
import json
def create_source_with_default_strategy():
"""创建带有默认分块策略的知识源。"""
db = SessionLocal()
try:
# 技术文档库使用段落级分块
source = create_knowledge_source_record(
db_session=db,
name="技术文档库",
description="产品技术文档,使用段落级分块",
default_chunking_strategy="paragraph",
default_chunking_params=json.dumps({
"target_size": 1024,
"max_paragraphs_per_chunk": 5
})
)
print(f"Created source: {source.id}")
print(f"Default strategy: {source.default_chunking_strategy}")
return source
finally:
db.close()
依赖注入
使用自定义服务
from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import ChunkingService
from ai_service.services.embedding import EmbeddingService
custom_chunking = ChunkingService(
chunk_size=500,
chunk_overlap=100
)
custom_embedding = EmbeddingService(
model_name="all-MiniLM-L6-v2",
embedding_dim=384
)
ingestion_service = IngestionService(
chunking_service=custom_chunking,
embedding_service=custom_embedding
)
chunks, indexed = ingestion_service.process_document(document_id="doc-123")
使用自定义分块策略
from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import SentenceChunkingStrategy
# 使用句子级分块策略
custom_strategy = SentenceChunkingStrategy(
target_size=512,
max_sentences_per_chunk=8
)
# 注意:IngestionService 内部使用策略工厂
# 如需完全自定义,可通过 chunking_strategy_name 参数
使用 Mock 进行测试
Mock 服务
from unittest.mock import Mock
from ai_service.services.rag_retrieval import RAGRetrievalService, RetrievedChunk
def test_query_handler():
"""使用 Mock RAG 服务进行测试。"""
mock_rag_service = Mock(spec=RAGRetrievalService)
mock_chunks = [
RetrievedChunk(
chunk_id="chunk-1",
source_id="source-1",
document_name="test.pdf",
chunk_index=0,
content="Test content",
score=0.85,
metadata={}
)
]
mock_rag_service.retrieve_context.return_value = mock_chunks
result = mock_rag_service.retrieve_context("test query", "agent-1")
assert len(result) == 1
assert result[0].score == 0.85
Mock 分块策略
from unittest.mock import Mock
from ai_service.services.chunking import BaseChunkingStrategy, ChunkResult
def test_chunking_strategy():
"""使用 Mock 分块策略进行测试。"""
mock_strategy = Mock(spec=BaseChunkingStrategy)
mock_strategy.strategy_name = "mock_strategy"
mock_chunks = [
ChunkResult(
chunk_index=0,
content="Mock chunk 1",
start_char=0,
end_char=100,
metadata={"chunk_index": 0}
)
]
mock_strategy.chunk_text.return_value = mock_chunks
chunks = mock_strategy.chunk_text("test", "doc-1", "test.txt", "source-1")
assert len(chunks) == 1
assert chunks[0].content == "Mock chunk 1"
端到端集成测试
import pytest
from ai_service.services.ingestion import IngestionService
@pytest.fixture
def ingestion_service():
"""测试用摄取服务。"""
return IngestionService()
def test_document_processing(ingestion_service):
"""验证完整摄取流程。"""
# 需要先上传测试文档
# ... upload logic ...
chunks, indexed = ingestion_service.process_document(
document_id="test-doc-123"
)
assert chunks > 0
assert indexed == chunks
常见集成场景
场景 1:API 接口
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from ai_service.services.ingestion import get_ingestion_service
router = APIRouter()
class IngestRequest(BaseModel):
document_id: str
agent_id: str
chunking_strategy: Optional[str] = Field(
None,
pattern="^(fixed_size|sentence|paragraph|semantic|recursive)$"
)
chunking_params: Optional[dict] = None
@router.post("/ingest")
async def ingest_document(request: IngestRequest):
"""摄取文档,支持指定分块策略。"""
try:
ingestion_service = get_ingestion_service()
import json
chunks, indexed = ingestion_service.process_document(
document_id=request.document_id,
agent_id=request.agent_id,
chunking_strategy_name=request.chunking_strategy,
chunking_params_json=json.dumps(request.chunking_params) if request.chunking_params else None
)
return {
"chunks": chunks,
"indexed": indexed,
"strategy": request.chunking_strategy or "default"
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
场景 2:后台任务
from celery import Celery
app = Celery('tasks')
@app.task
def ingest_document_task(
document_id: str,
agent_id: str,
chunking_strategy: str = None,
chunking_params: dict = None
):
"""后台任务摄取文档,支持分块策略。"""
from ai_service.services.ingestion import get_ingestion_service
import json
ingestion_service = get_ingestion_service()
try:
chunks, indexed = ingestion_service.process_document(
document_id=document_id,
agent_id=agent_id,
chunking_strategy_name=chunking_strategy,
chunking_params_json=json.dumps(chunking_params) if chunking_params else None
)
return {
"status": "success",
"chunks": chunks,
"strategy": chunking_strategy or "default"
}
except Exception as e:
return {"status": "failed", "error": str(e)}
场景 3:CLI 工具
import click
import json
from ai_service.services.ingestion import get_ingestion_service
@click.command()
@click.option('--document-id', required=True, help='Document ID to process')
@click.option('--agent-id', required=True, help='Agent ID')
@click.option('--chunking-strategy', default=None,
type=click.Choice(['fixed_size', 'sentence', 'paragraph', 'semantic', 'recursive']),
help='Chunking strategy to use')
@click.option('--chunking-params', default=None,
help='Chunking parameters as JSON string')
def ingest(document_id: str, agent_id: str, chunking_strategy: str, chunking_params: str):
"""通过 CLI 摄取文档。"""
ingestion_service = get_ingestion_service()
try:
click.echo(f"Processing document {document_id}...")
click.echo(f"Chunking strategy: {chunking_strategy or 'default'}")
params = json.loads(chunking_params) if chunking_params else None
chunks, indexed = ingestion_service.process_document(
document_id=document_id,
agent_id=agent_id,
chunking_strategy_name=chunking_strategy,
chunking_params_json=json.dumps(params) if params else None
)
click.echo(f"Success: {chunks} chunks indexed")
except Exception as e:
click.echo(f"Failed: {e}", err=True)
raise click.Abort()
if __name__ == '__main__':
ingest()
最佳实践
1. 优先使用单例
# 推荐
from ai_service.services.embedding import get_embedding_service
service = get_embedding_service()
# 避免:重复加载模型
from ai_service.services.embedding import EmbeddingService
service1 = EmbeddingService()
service2 = EmbeddingService()
2. 异常处理要清晰
try:
result = service.process_document(document_id)
except ValueError as e:
logger.error(f"Invalid document: {e}")
except TimeoutError as e:
logger.error(f"Timeout: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
3. 服务预热
from ai_service.services.embedding import get_embedding_service
def startup():
"""应用启动预热。"""
embedding_service = get_embedding_service()
result = embedding_service.check_readiness()
if not result.is_ready:
raise RuntimeError(f"Embedding model not ready: {result.error_message}")
4. 选择合适的分块策略
def ingest_based_on_content_type(document_id: str, content_type: str):
"""根据内容类型选择分块策略。"""
strategy_map = {
"article": ("sentence", {"target_size": 512}),
"technical_doc": ("paragraph", {"target_size": 1024}),
"code": ("recursive", {"chunk_size": 512}),
}
strategy, params = strategy_map.get(content_type, ("fixed_size", {"chunk_size": 512}))
ingestion_service = get_ingestion_service()
chunks, indexed = ingestion_service.process_document(
document_id=document_id,
chunking_strategy_name=strategy,
chunking_params_json=json.dumps(params)
)
return chunks, indexed
反模式
不要在循环中重复创建服务
# 不推荐
for doc in documents:
service = get_ingestion_service()
service.process_document(document_id=doc.id)
复用单例实例
# 推荐
service = get_ingestion_service()
for doc in documents:
service.process_document(document_id=doc.id)
不要忽略返回值
# 不推荐
ingestion_service.process_document(document_id)
检查结果
chunks, indexed = ingestion_service.process_document(document_id)
if chunks == 0:
logger.warning("No chunks created")
不要混淆策略参数
# 错误:固定大小策略参数用于句子策略
strategy = get_chunking_strategy(
"sentence",
params={"chunk_size": 512} # 错误!句子策略需要 target_size
)
# 正确
strategy = get_chunking_strategy(
"sentence",
params={"target_size": 512, "max_sentences_per_chunk": 10}
)