跳转至

集成指南

本页提供 Services 模块的常见集成模式与实践示例。

基础集成模式

模式 1:单文档摄取

from ai_service.services.ingestion import get_ingestion_service

def ingest_document(document_id: str, agent_id: str) -> bool:
    """为指定 Agent 摄取文档。"""
    try:
        ingestion_service = get_ingestion_service()
        chunks_created, chunks_indexed = ingestion_service.process_document(
            document_id=document_id,
            agent_id=agent_id
        )
        print(f"Success: {chunks_created} chunks indexed")
        return True
    except Exception as e:
        print(f"Ingestion failed: {e}")
        return False

模式 2:RAG 增强查询

from ai_service.services.rag_retrieval import get_rag_service

def answer_with_context(query: str, agent_id: str) -> str:
    """使用检索上下文构造提示词。"""
    rag_service = get_rag_service()

    context = rag_service.retrieve_and_format(
        query=query,
        agent_id=agent_id,
        top_k=5,
        max_context_length=3000
    )

    if context:
        prompt = f"""Context:
{context}

Question: {query}

Answer based on the context above:"""
    else:
        prompt = f"Question: {query}\n\nAnswer:"

    return prompt

模式 3:批量摄取

from ai_service.services.ingestion import get_ingestion_service
from ai_service.storage.models import list_documents_by_status, DocumentStatus
from ai_service.utils.database import SessionLocal

def batch_ingest_source(source_id: str) -> dict:
    """摄取某知识源下的所有已上传文档。"""
    db = SessionLocal()
    ingestion_service = get_ingestion_service()

    try:
        documents = list_documents_by_status(
            db, source_id=source_id, status=DocumentStatus.UPLOADED.value
        )

        results = {"success": 0, "failed": 0}

        for doc in documents:
            try:
                ingestion_service.process_document(document_id=doc.id)
                results["success"] += 1
            except Exception as e:
                print(f"Failed {doc.filename}: {e}")
                results["failed"] += 1

        return results
    finally:
        db.close()

分块策略集成

使用特定分块策略摄取

import json
from ai_service.services.ingestion import get_ingestion_service

def ingest_with_sentence_chunking(document_id: str, agent_id: str) -> bool:
    """使用句子级分块策略摄取文档。"""
    try:
        ingestion_service = get_ingestion_service()

        # 配置句子级分块参数
        chunking_params = {
            "target_size": 512,
            "max_sentences_per_chunk": 10
        }

        chunks_created, chunks_indexed = ingestion_service.process_document(
            document_id=document_id,
            agent_id=agent_id,
            chunking_strategy_name="sentence",
            chunking_params_json=json.dumps(chunking_params)
        )
        print(f"Success: {chunks_created} chunks indexed using sentence strategy")
        return True
    except Exception as e:
        print(f"Ingestion failed: {e}")
        return False

使用递归分块策略

import json
from ai_service.services.ingestion import get_ingestion_service

def ingest_with_recursive_chunking(document_id: str, agent_id: str) -> bool:
    """使用递归层级分块策略摄取文档。"""
    try:
        ingestion_service = get_ingestion_service()

        # 配置递归分块参数
        chunking_params = {
            "chunk_size": 512,
            "chunk_overlap": 50,
            "separators": ["\n\n", "\n", ". ", " ", ""]
        }

        chunks_created, chunks_indexed = ingestion_service.process_document(
            document_id=document_id,
            agent_id=agent_id,
            chunking_strategy_name="recursive",
            chunking_params_json=json.dumps(chunking_params)
        )
        print(f"Success: {chunks_created} chunks indexed using recursive strategy")
        return True
    except Exception as e:
        print(f"Ingestion failed: {e}")
        return False

直接使用分块策略

from ai_service.services.chunking import get_chunking_strategy

def chunk_document(text: str, document_id: str, strategy_name: str = "fixed_size"):
    """使用指定策略分块文档。"""
    # 获取策略实例
    strategy = get_chunking_strategy(strategy_name)

    # 分块
    chunks = strategy.chunk_text(
        text=text,
        document_id=document_id,
        document_name="example.txt",
        source_id="source-123",
        extra_metadata={"processed_by": "integration-guide"}
    )

    print(f"Strategy: {strategy.strategy_name}")
    print(f"Created {len(chunks)} chunks")

    for chunk in chunks:
        print(f"  Chunk {chunk.chunk_index}: {len(chunk.content)} chars")

    return chunks

# 使用不同策略
chunk_document("Long text here...", "doc-1", "sentence")
chunk_document("Long text here...", "doc-1", "paragraph")
chunk_document("Long text here...", "doc-1", "recursive")

知识源默认策略

from ai_service.storage.models import create_knowledge_source_record
from ai_service.utils.database import SessionLocal
import json

def create_source_with_default_strategy():
    """创建带有默认分块策略的知识源。"""
    db = SessionLocal()

    try:
        # 技术文档库使用段落级分块
        source = create_knowledge_source_record(
            db_session=db,
            name="技术文档库",
            description="产品技术文档,使用段落级分块",
            default_chunking_strategy="paragraph",
            default_chunking_params=json.dumps({
                "target_size": 1024,
                "max_paragraphs_per_chunk": 5
            })
        )
        print(f"Created source: {source.id}")
        print(f"Default strategy: {source.default_chunking_strategy}")
        return source
    finally:
        db.close()

依赖注入

使用自定义服务

from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import ChunkingService
from ai_service.services.embedding import EmbeddingService

custom_chunking = ChunkingService(
    chunk_size=500,
    chunk_overlap=100
)

custom_embedding = EmbeddingService(
    model_name="all-MiniLM-L6-v2",
    embedding_dim=384
)

ingestion_service = IngestionService(
    chunking_service=custom_chunking,
    embedding_service=custom_embedding
)

chunks, indexed = ingestion_service.process_document(document_id="doc-123")

使用自定义分块策略

from ai_service.services.ingestion import IngestionService
from ai_service.services.chunking import SentenceChunkingStrategy

# 使用句子级分块策略
custom_strategy = SentenceChunkingStrategy(
    target_size=512,
    max_sentences_per_chunk=8
)

# 注意:IngestionService 内部使用策略工厂
# 如需完全自定义,可通过 chunking_strategy_name 参数

使用 Mock 进行测试

Mock 服务

from unittest.mock import Mock
from ai_service.services.rag_retrieval import RAGRetrievalService, RetrievedChunk

def test_query_handler():
    """使用 Mock RAG 服务进行测试。"""
    mock_rag_service = Mock(spec=RAGRetrievalService)

    mock_chunks = [
        RetrievedChunk(
            chunk_id="chunk-1",
            source_id="source-1",
            document_name="test.pdf",
            chunk_index=0,
            content="Test content",
            score=0.85,
            metadata={}
        )
    ]
    mock_rag_service.retrieve_context.return_value = mock_chunks

    result = mock_rag_service.retrieve_context("test query", "agent-1")
    assert len(result) == 1
    assert result[0].score == 0.85

Mock 分块策略

from unittest.mock import Mock
from ai_service.services.chunking import BaseChunkingStrategy, ChunkResult

def test_chunking_strategy():
    """使用 Mock 分块策略进行测试。"""
    mock_strategy = Mock(spec=BaseChunkingStrategy)
    mock_strategy.strategy_name = "mock_strategy"

    mock_chunks = [
        ChunkResult(
            chunk_index=0,
            content="Mock chunk 1",
            start_char=0,
            end_char=100,
            metadata={"chunk_index": 0}
        )
    ]
    mock_strategy.chunk_text.return_value = mock_chunks

    chunks = mock_strategy.chunk_text("test", "doc-1", "test.txt", "source-1")
    assert len(chunks) == 1
    assert chunks[0].content == "Mock chunk 1"

端到端集成测试

import pytest
from ai_service.services.ingestion import IngestionService

@pytest.fixture
def ingestion_service():
    """测试用摄取服务。"""
    return IngestionService()

def test_document_processing(ingestion_service):
    """验证完整摄取流程。"""
    # 需要先上传测试文档
    # ... upload logic ...

    chunks, indexed = ingestion_service.process_document(
        document_id="test-doc-123"
    )

    assert chunks > 0
    assert indexed == chunks

常见集成场景

场景 1:API 接口

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
from ai_service.services.ingestion import get_ingestion_service

router = APIRouter()

class IngestRequest(BaseModel):
    document_id: str
    agent_id: str
    chunking_strategy: Optional[str] = Field(
        None,
        pattern="^(fixed_size|sentence|paragraph|semantic|recursive)$"
    )
    chunking_params: Optional[dict] = None

@router.post("/ingest")
async def ingest_document(request: IngestRequest):
    """摄取文档,支持指定分块策略。"""
    try:
        ingestion_service = get_ingestion_service()

        import json
        chunks, indexed = ingestion_service.process_document(
            document_id=request.document_id,
            agent_id=request.agent_id,
            chunking_strategy_name=request.chunking_strategy,
            chunking_params_json=json.dumps(request.chunking_params) if request.chunking_params else None
        )
        return {
            "chunks": chunks,
            "indexed": indexed,
            "strategy": request.chunking_strategy or "default"
        }
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

场景 2:后台任务

from celery import Celery

app = Celery('tasks')

@app.task
def ingest_document_task(
    document_id: str,
    agent_id: str,
    chunking_strategy: str = None,
    chunking_params: dict = None
):
    """后台任务摄取文档,支持分块策略。"""
    from ai_service.services.ingestion import get_ingestion_service
    import json

    ingestion_service = get_ingestion_service()
    try:
        chunks, indexed = ingestion_service.process_document(
            document_id=document_id,
            agent_id=agent_id,
            chunking_strategy_name=chunking_strategy,
            chunking_params_json=json.dumps(chunking_params) if chunking_params else None
        )
        return {
            "status": "success",
            "chunks": chunks,
            "strategy": chunking_strategy or "default"
        }
    except Exception as e:
        return {"status": "failed", "error": str(e)}

场景 3:CLI 工具

import click
import json
from ai_service.services.ingestion import get_ingestion_service

@click.command()
@click.option('--document-id', required=True, help='Document ID to process')
@click.option('--agent-id', required=True, help='Agent ID')
@click.option('--chunking-strategy', default=None,
              type=click.Choice(['fixed_size', 'sentence', 'paragraph', 'semantic', 'recursive']),
              help='Chunking strategy to use')
@click.option('--chunking-params', default=None,
              help='Chunking parameters as JSON string')
def ingest(document_id: str, agent_id: str, chunking_strategy: str, chunking_params: str):
    """通过 CLI 摄取文档。"""
    ingestion_service = get_ingestion_service()

    try:
        click.echo(f"Processing document {document_id}...")
        click.echo(f"Chunking strategy: {chunking_strategy or 'default'}")

        params = json.loads(chunking_params) if chunking_params else None

        chunks, indexed = ingestion_service.process_document(
            document_id=document_id,
            agent_id=agent_id,
            chunking_strategy_name=chunking_strategy,
            chunking_params_json=json.dumps(params) if params else None
        )
        click.echo(f"Success: {chunks} chunks indexed")
    except Exception as e:
        click.echo(f"Failed: {e}", err=True)
        raise click.Abort()

if __name__ == '__main__':
    ingest()

最佳实践

1. 优先使用单例

# 推荐
from ai_service.services.embedding import get_embedding_service
service = get_embedding_service()

# 避免:重复加载模型
from ai_service.services.embedding import EmbeddingService
service1 = EmbeddingService()
service2 = EmbeddingService()

2. 异常处理要清晰

try:
    result = service.process_document(document_id)
except ValueError as e:
    logger.error(f"Invalid document: {e}")
except TimeoutError as e:
    logger.error(f"Timeout: {e}")
except Exception as e:
    logger.error(f"Unexpected error: {e}", exc_info=True)

3. 服务预热

from ai_service.services.embedding import get_embedding_service

def startup():
    """应用启动预热。"""
    embedding_service = get_embedding_service()
    result = embedding_service.check_readiness()
    if not result.is_ready:
        raise RuntimeError(f"Embedding model not ready: {result.error_message}")

4. 选择合适的分块策略

def ingest_based_on_content_type(document_id: str, content_type: str):
    """根据内容类型选择分块策略。"""
    strategy_map = {
        "article": ("sentence", {"target_size": 512}),
        "technical_doc": ("paragraph", {"target_size": 1024}),
        "code": ("recursive", {"chunk_size": 512}),
    }

    strategy, params = strategy_map.get(content_type, ("fixed_size", {"chunk_size": 512}))

    ingestion_service = get_ingestion_service()
    chunks, indexed = ingestion_service.process_document(
        document_id=document_id,
        chunking_strategy_name=strategy,
        chunking_params_json=json.dumps(params)
    )
    return chunks, indexed

反模式

不要在循环中重复创建服务

# 不推荐
for doc in documents:
    service = get_ingestion_service()
    service.process_document(document_id=doc.id)

复用单例实例

# 推荐
service = get_ingestion_service()
for doc in documents:
    service.process_document(document_id=doc.id)

不要忽略返回值

# 不推荐
ingestion_service.process_document(document_id)

检查结果

chunks, indexed = ingestion_service.process_document(document_id)
if chunks == 0:
    logger.warning("No chunks created")

不要混淆策略参数

# 错误:固定大小策略参数用于句子策略
strategy = get_chunking_strategy(
    "sentence",
    params={"chunk_size": 512}  # 错误!句子策略需要 target_size
)

# 正确
strategy = get_chunking_strategy(
    "sentence",
    params={"target_size": 512, "max_sentences_per_chunk": 10}
)

关联文档