Agent Runtime 架构实现指南
本文档基于 Chameleon 项目现有代码基线,详细说明货代智能 Agent 平台的具体架构实现方案。文档将高层设计与实际代码映射,提供可直接落地的实现路径。
1. 现有架构基线分析
1.1 项目结构映射
当前 Chameleon 项目已具备以下核心能力,可直接作为货代 Agent Runtime 的基础:
ai_service/ # 业务真相源(唯一)
├── api/app.py # FastAPI 服务入口
│ ├── Session/Agent/Knowledge CRUD # 资源管理
│ ├── WebSocket/SSE 实时通信 # 人机协同通道
│ ├── 文档上传与摄入任务 # 单证处理入口
│ └── MCP 服务器/凭证/挂载管理 # 系统互联
├── orchestrator/ # 运行时编排层
│ ├── graph.py # ChameleonOrchestrator
│ │ ├── LangGraph 状态机 # AI/Human 切换
│ │ ├── RAG 检索增强 # 知识召回
│ │ └── MCP 工具循环 # 外部系统调用
│ └── state.py # 状态模型定义
├── services/ # 领域服务层
│ ├── chunking.py # 5种分块策略 + 工厂
│ ├── ingestion.py # 文档摄入管道
│ ├── parsers/ # PDF/DOCX/TXT 解析
│ ├── rag_retrieval.py # 向量检索服务
│ ├── mcp_client.py # MCP 调用客户端
│ ├── mcp_registry.py # MCP 挂载解析
│ ├── mcp_policy.py # 调用策略控制
│ └── mcp_audit.py # 审计日志
├── storage/ # 数据层
│ ├── models.py # SQLAlchemy 模型
│ │ ├── Session/Message # 会话消息
│ │ ├── Agent # Agent 配置
│ │ ├── KnowledgeSource # 知识源
│ │ ├── Document/DocumentChunk # 文档与分块
│ │ ├── IngestionJob # 摄入任务
│ │ ├── MCPServer/MCPCredential # MCP 注册与凭证
│ │ └── AgentMCPLink # Agent-MCP 挂载
│ ├── minio_client.py # 对象存储
│ └── qdrant_client.py # 向量数据库
└── utils/ # 基础设施
├── settings.py # Pydantic-Settings
├── database.py # SQLAlchemy 连接
├── model_loader.py # LLM 模型加载
└── logger.py # 日志配置
1.2 核心设计模式确认
| 模式 | 实现位置 | 货代场景应用 |
|---|---|---|
| Strategy 策略模式 | services/chunking.py:37-76 |
单证分块策略(提单/发票/装箱单) |
| Factory + Registry | services/chunking.py:876-927 |
货代专用解析器注册 |
| 挂载系统 | storage/models.py:AgentMCPLink |
Agent 动态挂载船公司 API |
| 审计追踪 | storage/models.py:MCPCallAudit |
每次订舱/查询操作留痕 |
| 状态机 | orchestrator/graph.py:97-119 |
客服 AI/Human/CoPilot 切换 |
2. 货代领域模型扩展实现
2.1 核心领域实体
货代平台需要新增以下核心领域模型:
| 模型 | 说明 | 文件位置 |
|---|---|---|
FreightShipment |
货运主表,记录端到端货物流转 | storage/freight_models.py |
FreightDocument |
单证扩展表,与 Document 一对一关联 |
storage/freight_models.py |
FreightParty |
贸易参与方(收发货人等) | storage/freight_models.py |
TrackingEvent |
物流跟踪事件 | storage/freight_models.py |
MemoryFact |
业务记忆事实 | storage/memory_models.py |
详细模型定义:货代领域模型 - 包含完整 SQLAlchemy 代码、字段说明、关联关系
2.2 模型关系概要
erDiagram
FREIGHT_SHIPMENT ||--o{ FREIGHT_DOCUMENT : contains
FREIGHT_SHIPMENT ||--o{ TRACKING_EVENT : tracks
FREIGHT_SHIPMENT ||--o{ FREIGHT_PARTY : involves
FREIGHT_DOCUMENT ||--|| DOCUMENT : extends
FREIGHT_SHIPMENT }o--|| AGENT : belongs_to
__tablename__ = "freight_party"
id: Mapped[str] = mapped_column(String(32), primary_key=True)
party_type: Mapped[str] = mapped_column(String(32)) # shipper/consignee/notify
name: Mapped[str] = mapped_column(String(256), index=True)
normalized_name: Mapped[str] = mapped_column(String(256), index=True) # 标准化名称
address: Mapped[Optional[str]] = mapped_column(String(512))
country_code: Mapped[Optional[str]] = mapped_column(String(4))
# 去重标识
entity_key: Mapped[str] = mapped_column(String(64), index=True) # 名称+国别哈希
# 关联
shipment_id: Mapped[str] = mapped_column(ForeignKey("freight_shipment.id"))
class TrackingEvent(Base): """物流跟踪事件.
聚合多源跟踪数据(船公司 API、港口、GPS).
"""
__tablename__ = "tracking_event"
id: Mapped[str] = mapped_column(String(32), primary_key=True)
shipment_id: Mapped[str] = mapped_column(ForeignKey("freight_shipment.id"), index=True)
# 事件信息
event_code: Mapped[str] = mapped_column(String(32)) # 标准化事件代码
event_description: Mapped[str] = mapped_column(String(256))
event_datetime: Mapped[datetime] = mapped_column(DateTime(timezone=True))
# 数据源
source_type: Mapped[str] = mapped_column(String(32)) # carrier/port/gps/customs
source_reference: Mapped[Optional[str]] = mapped_column(String(128))
raw_payload_json: Mapped[Optional[dict]] = mapped_column(JSON)
# 地点
location_code: Mapped[Optional[str]] = mapped_column(String(16))
location_name: Mapped[Optional[str]] = mapped_column(String(128))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True))
__table_args__ = (
Index("ix_tracking_shipment_time", "shipment_id", "event_datetime"),
)
```
2.2 记忆系统实现
业务记忆系统复用核心平台设计,包含以下模型:
| 模型 | 说明 | 文件位置 |
|------|------|----------|
| MemoryFact | 记忆事实主表 | storage/memory_models.py |
| MemoryEventLink | 记忆与会话事件关联 | storage/memory_models.py |
| MemoryFeedback | 用户反馈(用于记忆强化/衰减) | storage/memory_models.py |
详细模型定义:参见 货代领域模型 - 业务记忆
2.3 记忆检索流程
mermaid
flowchart LR
A[用户输入] --> B[提取实体ID]
B --> C[结构化过滤]
C --> D[向量相似度检索]
D --> E[重排序]
E --> F[Top-5 记忆]
- 结构化过滤:
entity_id+tenant_id+ 时间窗口 - 向量检索: Qdrant 相似度检索 Top-20
- 重排序: 综合相似度、置信度、重要度、时效性
3. 编排引擎详细实现
3.1 现有 LangGraph 状态机增强
基于 orchestrator/graph.py 扩展货代专用节点:
# ai_service/orchestrator/freight_graph.py
"""货代专用编排图扩展.
在 ChameleonOrchestrator 基础上增加货代业务节点.
"""
from typing import Any, Optional
from langgraph.graph import StateGraph, END, START
from ai_service.orchestrator.graph import ChameleonOrchestrator
from ai_service.orchestrator.state import OrchestratorState, SessionStatus
class FreightOrchestrator(ChameleonOrchestrator):
"""货代专用编排器,扩展基础 AI/Human 切换能力.
新增节点:
- intent_classifier: 货代意图识别 (订舱/追踪/单证/咨询)
- document_processor: 单证处理节点
- shipment_tracker: 货物跟踪节点
- memory_injector: 业务记忆注入节点
"""
def _build_graph(self):
"""构建货代专用状态图.
继承基础 router/ai_node/human_node,增加货代节点.
"""
workflow_builder = StateGraph(OrchestratorState)
# 基础节点(继承)
workflow_builder.add_node("router", self._router_node)
workflow_builder.add_node("ai_node", self._ai_node)
workflow_builder.add_node("human_node", self._human_node)
# 货代专用节点
workflow_builder.add_node("intent_classifier", self._intent_classifier_node)
workflow_builder.add_node("memory_injector", self._memory_injector_node)
workflow_builder.add_node("document_processor", self._document_processor_node)
# 边
workflow_builder.add_edge(START, "intent_classifier")
workflow_builder.add_edge("intent_classifier", "memory_injector")
workflow_builder.add_edge("memory_injector", "router")
workflow_builder.add_conditional_edges(
"router",
self._route_next,
{
"ai_node": "ai_node",
"human_node": "human_node",
"document_processor": "document_processor",
},
)
workflow_builder.add_edge("document_processor", "ai_node")
workflow_builder.add_edge("ai_node", END)
workflow_builder.add_edge("human_node", END)
return workflow_builder.compile()
def _intent_classifier_node(
self, raw_state: OrchestratorState | dict[str, Any]
) -> dict[str, Any]:
"""意图分类节点.
识别货代专用意图类型,用于后续路由.
意图类型: booking/tracking/document/inquiry/exception
"""
state = self._coerce_state(raw_state)
# 基于关键词 + LLM 的意图分类
intent = self._classify_freight_intent(state.incoming_content)
return {"freight_intent": intent}
def _memory_injector_node(
self, raw_state: OrchestratorState | dict[str, Any]
) -> dict[str, Any]:
"""业务记忆注入节点.
在回复前检索相关记忆并注入上下文.
"""
state = self._coerce_state(raw_state)
if not state.agent_id:
return {}
# 检索货代相关记忆
memories = self._retrieve_freight_memories(
agent_id=state.agent_id,
user_input=state.incoming_content,
intent=getattr(state, "freight_intent", None),
)
if memories:
memory_context = self._format_memory_context(memories)
return {"memory_context": memory_context}
return {}
def _document_processor_node(
self, raw_state: OrchestratorState | dict[str, Any]
) -> dict[str, Any]:
"""单证处理节点.
处理上传的单证文件,执行 OCR + 字段提取.
"""
state = self._coerce_state(raw_state)
# 检查是否有附件
attachments = getattr(state, "attachments", [])
if not attachments:
return {"response_content": "请上传需要处理的单证文件。"}
# 调用文档处理服务
doc_result = self._process_freight_document(
attachments[0],
agent_id=state.agent_id,
)
# 生成回复
response = self._format_document_response(doc_result)
return {
"response_role": "assistant",
"response_content": response,
"document_result": doc_result,
}
def _route_next(self, raw_state: OrchestratorState | dict[str, Any]) -> str:
"""扩展路由逻辑.
基于意图和状态选择下一节点.
"""
state = self._coerce_state(raw_state)
# 人工接管优先
if state.active_status == SessionStatus.HUMAN_ACTIVE:
return "human_node"
# 单证处理意图
intent = getattr(state, "freight_intent", None)
if intent == "document" and getattr(state, "attachments", []):
return "document_processor"
return "ai_node"
def _classify_freight_intent(self, user_input: str) -> str:
"""分类货代意图.
简单规则 + LLM 组合分类.
"""
user_input_lower = user_input.lower()
# 规则快速匹配
if any(kw in user_input_lower for kw in ["提单", "发票", "装箱单", "上传"]):
return "document"
if any(kw in user_input_lower for kw in ["跟踪", "到哪里", "状态", "eta"]):
return "tracking"
if any(kw in user_input_lower for kw in ["订舱", "booking", "预约"]):
return "booking"
# 默认咨询
return "inquiry"
def _retrieve_freight_memories(
self,
agent_id: str,
user_input: str,
intent: Optional[str],
) -> list[dict]:
"""检索货代相关记忆.
混合检索:关键词过滤 + 向量相似度.
"""
from ai_service.services.memory_service import get_memory_service
memory_service = get_memory_service()
# 提取可能的发货单号、提单号
entity_ids = self._extract_entity_ids(user_input)
return memory_service.search_memories(
agent_id=agent_id,
query=user_input,
entity_ids=entity_ids,
fact_types=["shipment", "party", "preference"],
top_k=5,
)
def _extract_entity_ids(self, text: str) -> list[str]:
"""从文本提取货代实体标识.
提取 BL No, Container No, Shipment No 等.
"""
import re
patterns = [
r"[A-Z]{4}\d{7}", # 集装箱号格式
r"[A-Z]{3}[A-Z]?\d{8,11}", # 提单号常见格式
]
entity_ids = []
for pattern in patterns:
matches = re.findall(pattern, text.upper())
entity_ids.extend(matches)
return entity_ids
def _format_memory_context(self, memories: list[dict]) -> str:
"""格式化记忆为上下文."""
context_parts = ["### 相关历史记录"]
for mem in memories:
context_parts.append(f"- {mem['fact_summary']} (置信度: {mem['confidence_score']:.2f})")
return "\n".join(context_parts)
def _process_freight_document(self, attachment: dict, agent_id: str) -> dict:
"""处理货代单证.
协调 OCR、分类、字段提取.
"""
from ai_service.services.freight_document_service import get_document_service
doc_service = get_document_service()
return doc_service.process_document(
file_url=attachment.get("url"),
file_type=attachment.get("type"),
agent_id=agent_id,
)
def _format_document_response(self, doc_result: dict) -> str:
"""格式化单证处理结果为回复."""
if doc_result.get("status") != "success":
return f"单证处理失败: {doc_result.get('error', '未知错误')}"
fields = doc_result.get("extracted_fields", {})
doc_type = doc_result.get("document_type", "未知类型")
response = f"已识别单证类型: **{doc_type}**\n\n提取的关键信息:\n"
for key, value in fields.items():
response += f"- **{key}**: {value}\n"
return response
3.2 MCP 工具扩展(货代专用)
# ai_service/services/freight_mcp_tools.py
"""货代专用 MCP 工具定义.
注册到 MCP Server,供 Agent 调用.
"""
from typing import Any
class FreightMCPTools:
"""货代领域 MCP 工具集合.
这些工具通过 MCPServer 注册,Agent 通过挂载配置获得调用权限.
"""
@staticmethod
def track_shipment(
bl_no: str,
carrier_code: str | None = None,
) -> dict[str, Any]:
"""跟踪货物状态.
Args:
bl_no: 提单号
carrier_code: 承运人代码(可选,用于指定查询渠道)
Returns:
跟踪信息,包含当前状态、位置、时间线
"""
# 实现:调用船公司 API 或聚合跟踪服务
pass
@staticmethod
def validate_freight_document(
document_type: str,
extracted_fields: dict,
) -> dict[str, Any]:
"""验证单证字段的合规性.
Args:
document_type: 单证类型 (HBL/MBL/AWB/等)
extracted_fields: AI 提取的字段
Returns:
验证结果,包含错误列表和建议
"""
# 实现:规则引擎验证
pass
@staticmethod
def query_shipping_schedule(
origin_port: str,
destination_port: str,
etd_start: str | None = None,
etd_end: str | None = None,
) -> dict[str, Any]:
"""查询船期.
Args:
origin_port: 起运港代码
destination_port: 目的港代码
etd_start: 最早离港日期 (ISO 格式)
etd_end: 最晚离港日期 (ISO 格式)
Returns:
可用船期列表
"""
# 实现:调用船公司 API 或船期数据库
pass
@staticmethod
def check_customs_status(
declaration_no: str,
customs_code: str | None = None,
) -> dict[str, Any]:
"""查询报关状态.
Args:
declaration_no: 报关单号
customs_code: 海关代码(可选)
Returns:
报关状态详情
"""
# 实现:调用海关 API
pass
@staticmethod
def send_notification(
recipient: str,
template_code: str,
context: dict,
channels: list[str] | None = None,
) -> dict[str, Any]:
"""发送业务通知.
Args:
recipient: 接收方(邮箱/手机号/企业微信ID)
template_code: 通知模板代码
context: 模板变量
channels: 发送渠道 ["email", "sms", "wechat"]
Returns:
发送结果
"""
# 实现:调用通知服务
pass
4. 数据流详细设计
4.1 单证处理流程
sequenceDiagram
participant User as 业务用户
participant API as API Service<br/>(app.py)
participant DocSvc as DocumentService
participant Parser as DocumentParser<br/>(parsers/)
participant Chunker as ChunkingStrategy
participant Embed as EmbeddingService
participant LLM as LLM Model
participant DB as PostgreSQL
participant Vector as Qdrant
participant Minio as MinIO
User->>API: POST /upload (单证PDF)
API->>Minio: 上传原始文件
Minio-->>API: object_key
API->>DB: create_document_record
API-->>User: document_id
User->>API: POST /ingest
API->>DocSvc: process_document()
DocSvc->>Minio: download_file()
Minio-->>DocSvc: file_bytes
DocSvc->>Parser: parse(file_bytes)
Parser-->>DocSvc: ParseResult(text, metadata)
DocSvc->>LLM: classify_document(text)
LLM-->>DocSvc: doc_type (HBL/MBL/Invoice)
DocSvc->>LLM: extract_fields(text, doc_type)
LLM-->>DocSvc: extracted_fields
alt 需要向量检索增强
DocSvc->>Chunker: chunk_text(text)
Chunker-->>DocSvc: chunks[]
DocSvc->>Embed: embed_batch(chunks)
Embed-->>DocSvc: embeddings[]
DocSvc->>Vector: upsert_vectors(embeddings)
DocSvc->>DB: batch_create_document_chunks()
end
DocSvc->>DB: create_freight_document()
DocSvc->>DB: create_memory_fact() // 提取的事实写入记忆
API-->>User: 提取结果
4.2 客服对话流程(AI/Human 切换)
sequenceDiagram
participant User as 客户/客服
participant WS as WebSocket<br/>(/ws/sessions/{id})
participant Graph as FreightOrchestrator
participant Intent as IntentClassifier
participant Memory as MemoryService
participant RAG as RAGRetrieval
participant MCP as MCPClient
participant LLM as LLM Model
participant Audit as AuditLog
User->>WS: 发送消息
WS->>Graph: run(OrchestratorInput)
Graph->>Intent: classify_intent()
Intent-->>Graph: intent_type
Graph->>Memory: search_memories()
Memory-->>Graph: relevant_memories[]
alt intent == "tracking"
Graph->>MCP: invoke_tool("track_shipment")
MCP-->>Graph: tracking_result
else intent == "document"
Graph->>RAG: retrieve_context()
RAG-->>Graph: doc_context
end
Graph->>LLM: generate_reply(context)
LLM-->>Graph: reply_text
Graph->>Audit: log_mcp_call()
Graph->>Memory: create_fact() // 重要事实写入记忆
Graph-->>WS: OrchestratorResult
WS-->>User: 流式返回回复
alt 需要人工接管
User->>WS: agent_takeover 事件
WS->>Graph: event=AGENT_TAKEOVER
Graph->>DB: update_session_status(HUMAN_ACTIVE)
Note over User,WS: 后续消息路由到人工客服
end
5. 扩展实现路线图
5.1 Phase 0: 基础验证(2周)
目标: 验证货代场景在现有架构上的可行性
任务清单:
| 任务 | 文件位置 | 工作量 |
|---|---|---|
| 扩展 Document 模型增加货代字段 | storage/models.py |
1d |
| 实现 HBL 解析模板 | services/parsers/freight/ |
3d |
| 增加 intent_classifier 节点 | orchestrator/freight_graph.py |
2d |
| 端到端测试 (上传→解析→回复) | tests/integration/ |
4d |
验收标准: - [ ] 上传 HBL PDF → 正确识别单证类型 → 提取 Shipper/Consignee/Port 等关键字段 - [ ] 询问 "跟踪 ABCD1234567" → 正确识别 intent → 调用跟踪工具
5.2 Phase 1: 单点突破(6周)
目标: 完成单证 Agent v1 和客服 Agent v1
新增模块:
ai_service/
├── services/
│ ├── freight/ # 货代专用服务包
│ │ ├── __init__.py
│ │ ├── document_classifier.py # 单证分类器
│ │ ├── field_extractor.py # 字段提取器 (基于 LLM)
│ │ ├── validation_engine.py # 规则验证引擎
│ │ └── tracking_aggregator.py # 跟踪数据聚合
│ ├── memory_service.py # 记忆读写服务
│ └── notification_service.py # 通知服务
├── orchestrator/
│ ├── freight_graph.py # 货代编排图
│ └── nodes/ # 编排节点包
│ ├── __init__.py
│ ├── intent_node.py
│ ├── memory_node.py
│ ├── document_node.py
│ └── tracking_node.py
└── mcp_servers/ # 货代专用 MCP Server 实现
├── carrier_apis/ # 船公司 API 适配
│ ├── maersk_mcp.py
│ ├── msc_mcp.py
│ └── cosco_mcp.py
├── customs_api.py # 海关接口
└── tracking_hub.py # 跟踪聚合服务
5.3 Phase 2: 平台化(12周)
目标: 多 Agent 编排、低代码控制面、记忆系统
核心实现:
-
Multi-Agent Router
# ai_service/orchestrator/multi_agent_router.py class MultiAgentRouter: """多 Agent 路由决策器.""" def route(self, intent: str, context: dict) -> str: """根据意图和上下文选择目标 Agent.""" routing_map = { "booking": "BookingAgent", "tracking": "TrackingAgent", "document": "DocumentAgent", "customs": "CustomsAgent", "billing": "BillingAgent", } return routing_map.get(intent, "FrontDeskAgent") -
记忆服务实现
# ai_service/services/memory_service.py class MemoryService: """业务记忆读写服务.""" def create_fact_from_document( self, shipment_id: str, extracted_fields: dict, confidence: float, ) -> MemoryFact: """从单证提取结果生成记忆事实.""" pass def search_memories( self, query: str, entity_ids: list[str], top_k: int = 5, ) -> list[MemoryFact]: """混合检索记忆.""" # 1. 结构化过滤 (entity_ids) # 2. 向量相似度检索 # 3. 重排序 (confidence × importance × recency) pass -
任务调度系统
# ai_service/services/task_scheduler.py class TaskScheduler: """自动任务调度器.""" def schedule_etd_reminder(self, shipment_id: str, etd: datetime): """订舱后自动创建 ETD 提醒任务.""" pass def schedule_arrival_alert(self, shipment_id: str, eta: datetime): """预计到港前自动发送提醒.""" pass
6. 关键实现决策
6.1 单证解析策略选择
| 策略 | 实现方式 | 适用场景 | 在项目中的位置 |
|---|---|---|---|
| 模板匹配 | 正则 + 规则引擎 | 格式固定的单证 | services/freight/template_parser.py |
| LLM 提取 | Claude/GPT-4 + Prompt | 格式多变、字段语义复杂 | services/freight/field_extractor.py |
| 混合模式 | 先分类 → 再选策略 | 生产环境推荐 | services/freight/document_processor.py |
推荐: 混合模式 1. 用轻量模型/规则快速分类单证类型 2. 根据类型选择专用提取器 3. 低置信度结果走人工复核队列
6.2 记忆存储策略
记忆检索流程:
用户输入 → 提取实体ID (BL No, Shipment No)
│
▼
┌─────────────────────────────────────────┐
│ 1. 结构化过滤 │
│ WHERE entity_id IN (提取的ID) │
│ AND tenant_id = 当前租户 │
│ AND created_at > NOW() - INTERVAL │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 2. 向量相似度检索 (Qdrant) │
│ 查询向量 = embed(用户输入) │
│ 返回 Top-20 候选 │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 3. 重排序 (Cross-Encoder) │
│ Score = α*相似度 + β*置信度 + γ*时效性 │
│ 返回 Top-5 │
└─────────────────────────────────────────┘
6.3 MCP 工具安全控制
复用现有 mcp_policy.py 的安全机制:
# ai_service/services/freight_mcp_policy.py
from ai_service.services.mcp_policy import MCPPolicyService
class FreightMCPPolicy(MCPPolicyService):
"""货代专用 MCP 安全策略."""
ALLOWED_CARRIER_APIS = ["maersk", "msc", "cosco", "one", "evergreen"]
def evaluate_carrier_api_call(
self,
carrier_code: str,
action: str,
agent_id: str,
) -> PolicyDecision:
"""评估船公司 API 调用权限."""
if carrier_code not in self.ALLOWED_CARRIER_APIS:
return PolicyDecision(
allowed=False,
reason=f"未授权的船公司: {carrier_code}",
)
# 敏感操作(如订舱)需要额外审批
if action in ["create_booking", "amend_booking"]:
return self._require_human_approval(agent_id)
return PolicyDecision(allowed=True)
7. 与现有代码的集成点
7.1 复用清单
| 组件 | 现有实现 | 货代扩展 |
|---|---|---|
| 配置管理 | utils/settings.py |
增加 [freight] section |
| 数据库连接 | utils/database.py |
直接使用 |
| 模型加载 | utils/model_loader.py |
直接使用 |
| 文档存储 | storage/minio_client.py |
直接使用 |
| 向量检索 | storage/qdrant_client.py |
直接使用 |
| 分块策略 | services/chunking.py |
直接使用 |
| 嵌入服务 | services/embedding.py |
直接使用 |
| MCP 客户端 | services/mcp_client.py |
直接使用 |
| MCP 审计 | services/mcp_audit.py |
直接使用 |
7.2 扩展清单
| 组件 | 扩展方式 |
|---|---|
| 数据库模型 | 新增 freight_models.py, memory_models.py |
| 编排图 | 继承 ChameleonOrchestrator 创建 FreightOrchestrator |
| API 端点 | 在 app.py 新增 /freight/* 路由组 |
| MCP Server | 新增 mcp_servers/freight/ 目录 |
| 前端页面 | 在 admin-frontend 新增货代管理页面 |
8. 性能与扩展性设计
8.1 关键指标目标
| 指标 | 目标值 | 实现策略 |
|---|---|---|
| 单证处理延迟 (P95) | < 5s | 异步任务 + 流式返回 |
| 客服回复延迟 (P95) | < 3s | 缓存记忆 + 预加载 |
| 跟踪查询延迟 (P95) | < 2s | MCP 结果缓存 5min |
| 并发会话 | 1000+ | 水平扩展 AI Service |
| 向量检索召回率 | > 90% | Fine-tuned Embedding |
8.2 水平扩展策略
部署架构:
┌─────────────────┐
│ API Gateway │
│ (Nginx/ALB) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AI Service #1 │ │ AI Service #2 │ │ AI Service #N │
│ (FastAPI) │ │ (FastAPI) │ │ (FastAPI) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PostgreSQL │ │ Qdrant │ │ MinIO │
│ (Primary) │ │ (Vector) │ │ (Object Store) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
9. 风险缓解策略
| 风险 | 缓解措施 |
|---|---|
| LLM 幻觉导致错误提取 | 人机回环 (HITL) + 置信度阈值 + 规则校验 |
| 单证格式极度不标准 | 主动学习 + 小样本微调 + 人工标注反馈 |
| 船公司 API 不稳定 | 熔断降级 + 多源聚合 + 缓存策略 |
| 敏感数据泄露 | 数据脱敏 + 字段级加密 + 审计日志 |
| 并发量突增 | 限流 + 队列 + 自动扩缩容 |
10. 下一步行动清单
立即开始(本周)
- [ ] 评审本架构文档,确认技术方案
- [ ] 创建
feature/freight-v1分支 - [ ] 实现
freight_models.py基础模型 - [ ] 配置货代开发环境 (添加测试船公司 API)
短期(2周内)
- [ ] 完成 HBL 解析端到端流程
- [ ] 集成第一个船公司 MCP Server (Maersk)
- [ ] 实现基础记忆写入/读取
- [ ] 编写首批集成测试
中期(6周内)
- [ ] 上线单证 Agent v1
- [ ] 上线客服 Agent v1
- [ ] 完成 AI/Human 协同审计视图
- [ ] 建立评测数据集与门禁
附录: 关键文件索引
| 文件 | 说明 | 当前状态 |
|---|---|---|
ai_service/storage/models.py |
核心数据模型 | ✅ 已存在 |
ai_service/orchestrator/graph.py |
编排状态机 | ✅ 已存在 |
ai_service/services/chunking.py |
分块策略 | ✅ 已存在 |
ai_service/services/ingestion.py |
文档摄入 | ✅ 已存在 |
ai_service/utils/settings.py |
配置管理 | ✅ 已存在 |
ai_service/storage/freight_models.py |
货代模型 | 📝 待创建 |
ai_service/orchestrator/freight_graph.py |
货代编排 | 📝 待创建 |
ai_service/services/memory_service.py |
记忆服务 | 📝 待创建 |
ai_service/mcp_servers/freight/ |
货代 MCP | 📝 待创建 |