跳转至

Agent Runtime 架构实现指南

本文档基于 Chameleon 项目现有代码基线,详细说明货代智能 Agent 平台的具体架构实现方案。文档将高层设计与实际代码映射,提供可直接落地的实现路径。


1. 现有架构基线分析

1.1 项目结构映射

当前 Chameleon 项目已具备以下核心能力,可直接作为货代 Agent Runtime 的基础:

ai_service/                          # 业务真相源(唯一)
├── api/app.py                       # FastAPI 服务入口
│   ├── Session/Agent/Knowledge CRUD # 资源管理
│   ├── WebSocket/SSE 实时通信       # 人机协同通道
│   ├── 文档上传与摄入任务            # 单证处理入口
│   └── MCP 服务器/凭证/挂载管理      # 系统互联
├── orchestrator/                    # 运行时编排层
│   ├── graph.py                     # ChameleonOrchestrator
│   │   ├── LangGraph 状态机         # AI/Human 切换
│   │   ├── RAG 检索增强             # 知识召回
│   │   └── MCP 工具循环             # 外部系统调用
│   └── state.py                     # 状态模型定义
├── services/                        # 领域服务层
│   ├── chunking.py                  # 5种分块策略 + 工厂
│   ├── ingestion.py                 # 文档摄入管道
│   ├── parsers/                     # PDF/DOCX/TXT 解析
│   ├── rag_retrieval.py             # 向量检索服务
│   ├── mcp_client.py                # MCP 调用客户端
│   ├── mcp_registry.py              # MCP 挂载解析
│   ├── mcp_policy.py                # 调用策略控制
│   └── mcp_audit.py                 # 审计日志
├── storage/                         # 数据层
│   ├── models.py                    # SQLAlchemy 模型
│   │   ├── Session/Message          # 会话消息
│   │   ├── Agent                    # Agent 配置
│   │   ├── KnowledgeSource          # 知识源
│   │   ├── Document/DocumentChunk   # 文档与分块
│   │   ├── IngestionJob             # 摄入任务
│   │   ├── MCPServer/MCPCredential  # MCP 注册与凭证
│   │   └── AgentMCPLink             # Agent-MCP 挂载
│   ├── minio_client.py              # 对象存储
│   └── qdrant_client.py             # 向量数据库
└── utils/                           # 基础设施
    ├── settings.py                  # Pydantic-Settings
    ├── database.py                  # SQLAlchemy 连接
    ├── model_loader.py              # LLM 模型加载
    └── logger.py                    # 日志配置

1.2 核心设计模式确认

模式 实现位置 货代场景应用
Strategy 策略模式 services/chunking.py:37-76 单证分块策略(提单/发票/装箱单)
Factory + Registry services/chunking.py:876-927 货代专用解析器注册
挂载系统 storage/models.py:AgentMCPLink Agent 动态挂载船公司 API
审计追踪 storage/models.py:MCPCallAudit 每次订舱/查询操作留痕
状态机 orchestrator/graph.py:97-119 客服 AI/Human/CoPilot 切换

2. 货代领域模型扩展实现

2.1 核心领域实体

货代平台需要新增以下核心领域模型:

模型 说明 文件位置
FreightShipment 货运主表,记录端到端货物流转 storage/freight_models.py
FreightDocument 单证扩展表,与 Document 一对一关联 storage/freight_models.py
FreightParty 贸易参与方(收发货人等) storage/freight_models.py
TrackingEvent 物流跟踪事件 storage/freight_models.py
MemoryFact 业务记忆事实 storage/memory_models.py

详细模型定义货代领域模型 - 包含完整 SQLAlchemy 代码、字段说明、关联关系

2.2 模型关系概要

erDiagram
    FREIGHT_SHIPMENT ||--o{ FREIGHT_DOCUMENT : contains
    FREIGHT_SHIPMENT ||--o{ TRACKING_EVENT : tracks
    FREIGHT_SHIPMENT ||--o{ FREIGHT_PARTY : involves
    FREIGHT_DOCUMENT ||--|| DOCUMENT : extends
    FREIGHT_SHIPMENT }o--|| AGENT : belongs_to
__tablename__ = "freight_party"

id: Mapped[str] = mapped_column(String(32), primary_key=True)
party_type: Mapped[str] = mapped_column(String(32))  # shipper/consignee/notify
name: Mapped[str] = mapped_column(String(256), index=True)
normalized_name: Mapped[str] = mapped_column(String(256), index=True)  # 标准化名称
address: Mapped[Optional[str]] = mapped_column(String(512))
country_code: Mapped[Optional[str]] = mapped_column(String(4))

# 去重标识
entity_key: Mapped[str] = mapped_column(String(64), index=True)  # 名称+国别哈希

# 关联
shipment_id: Mapped[str] = mapped_column(ForeignKey("freight_shipment.id"))

class TrackingEvent(Base): """物流跟踪事件.

聚合多源跟踪数据(船公司 API、港口、GPS).
"""

__tablename__ = "tracking_event"

id: Mapped[str] = mapped_column(String(32), primary_key=True)
shipment_id: Mapped[str] = mapped_column(ForeignKey("freight_shipment.id"), index=True)

# 事件信息
event_code: Mapped[str] = mapped_column(String(32))  # 标准化事件代码
event_description: Mapped[str] = mapped_column(String(256))
event_datetime: Mapped[datetime] = mapped_column(DateTime(timezone=True))

# 数据源
source_type: Mapped[str] = mapped_column(String(32))  # carrier/port/gps/customs
source_reference: Mapped[Optional[str]] = mapped_column(String(128))
raw_payload_json: Mapped[Optional[dict]] = mapped_column(JSON)

# 地点
location_code: Mapped[Optional[str]] = mapped_column(String(16))
location_name: Mapped[Optional[str]] = mapped_column(String(128))

created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True))

__table_args__ = (
    Index("ix_tracking_shipment_time", "shipment_id", "event_datetime"),
)

```

2.2 记忆系统实现

业务记忆系统复用核心平台设计,包含以下模型: | 模型 | 说明 | 文件位置 | |------|------|----------| | MemoryFact | 记忆事实主表 | storage/memory_models.py | | MemoryEventLink | 记忆与会话事件关联 | storage/memory_models.py | | MemoryFeedback | 用户反馈(用于记忆强化/衰减) | storage/memory_models.py |

详细模型定义:参见 货代领域模型 - 业务记忆

2.3 记忆检索流程

mermaid flowchart LR A[用户输入] --> B[提取实体ID] B --> C[结构化过滤] C --> D[向量相似度检索] D --> E[重排序] E --> F[Top-5 记忆]

  1. 结构化过滤: entity_id + tenant_id + 时间窗口
  2. 向量检索: Qdrant 相似度检索 Top-20
  3. 重排序: 综合相似度、置信度、重要度、时效性

3. 编排引擎详细实现

3.1 现有 LangGraph 状态机增强

基于 orchestrator/graph.py 扩展货代专用节点:

# ai_service/orchestrator/freight_graph.py
"""货代专用编排图扩展.

在 ChameleonOrchestrator 基础上增加货代业务节点.
"""

from typing import Any, Optional
from langgraph.graph import StateGraph, END, START

from ai_service.orchestrator.graph import ChameleonOrchestrator
from ai_service.orchestrator.state import OrchestratorState, SessionStatus

class FreightOrchestrator(ChameleonOrchestrator):
    """货代专用编排器,扩展基础 AI/Human 切换能力.

    新增节点:
    - intent_classifier: 货代意图识别 (订舱/追踪/单证/咨询)
    - document_processor: 单证处理节点
    - shipment_tracker: 货物跟踪节点
    - memory_injector: 业务记忆注入节点
    """

    def _build_graph(self):
        """构建货代专用状态图.

        继承基础 router/ai_node/human_node,增加货代节点.
        """
        workflow_builder = StateGraph(OrchestratorState)

        # 基础节点(继承)
        workflow_builder.add_node("router", self._router_node)
        workflow_builder.add_node("ai_node", self._ai_node)
        workflow_builder.add_node("human_node", self._human_node)

        # 货代专用节点
        workflow_builder.add_node("intent_classifier", self._intent_classifier_node)
        workflow_builder.add_node("memory_injector", self._memory_injector_node)
        workflow_builder.add_node("document_processor", self._document_processor_node)

        # 边
        workflow_builder.add_edge(START, "intent_classifier")
        workflow_builder.add_edge("intent_classifier", "memory_injector")
        workflow_builder.add_edge("memory_injector", "router")

        workflow_builder.add_conditional_edges(
            "router",
            self._route_next,
            {
                "ai_node": "ai_node",
                "human_node": "human_node",
                "document_processor": "document_processor",
            },
        )

        workflow_builder.add_edge("document_processor", "ai_node")
        workflow_builder.add_edge("ai_node", END)
        workflow_builder.add_edge("human_node", END)

        return workflow_builder.compile()

    def _intent_classifier_node(
        self, raw_state: OrchestratorState | dict[str, Any]
    ) -> dict[str, Any]:
        """意图分类节点.

        识别货代专用意图类型,用于后续路由.
        意图类型: booking/tracking/document/inquiry/exception
        """
        state = self._coerce_state(raw_state)

        # 基于关键词 + LLM 的意图分类
        intent = self._classify_freight_intent(state.incoming_content)

        return {"freight_intent": intent}

    def _memory_injector_node(
        self, raw_state: OrchestratorState | dict[str, Any]
    ) -> dict[str, Any]:
        """业务记忆注入节点.

        在回复前检索相关记忆并注入上下文.
        """
        state = self._coerce_state(raw_state)

        if not state.agent_id:
            return {}

        # 检索货代相关记忆
        memories = self._retrieve_freight_memories(
            agent_id=state.agent_id,
            user_input=state.incoming_content,
            intent=getattr(state, "freight_intent", None),
        )

        if memories:
            memory_context = self._format_memory_context(memories)
            return {"memory_context": memory_context}

        return {}

    def _document_processor_node(
        self, raw_state: OrchestratorState | dict[str, Any]
    ) -> dict[str, Any]:
        """单证处理节点.

        处理上传的单证文件,执行 OCR + 字段提取.
        """
        state = self._coerce_state(raw_state)

        # 检查是否有附件
        attachments = getattr(state, "attachments", [])
        if not attachments:
            return {"response_content": "请上传需要处理的单证文件。"}

        # 调用文档处理服务
        doc_result = self._process_freight_document(
            attachments[0],
            agent_id=state.agent_id,
        )

        # 生成回复
        response = self._format_document_response(doc_result)

        return {
            "response_role": "assistant",
            "response_content": response,
            "document_result": doc_result,
        }

    def _route_next(self, raw_state: OrchestratorState | dict[str, Any]) -> str:
        """扩展路由逻辑.

        基于意图和状态选择下一节点.
        """
        state = self._coerce_state(raw_state)

        # 人工接管优先
        if state.active_status == SessionStatus.HUMAN_ACTIVE:
            return "human_node"

        # 单证处理意图
        intent = getattr(state, "freight_intent", None)
        if intent == "document" and getattr(state, "attachments", []):
            return "document_processor"

        return "ai_node"

    def _classify_freight_intent(self, user_input: str) -> str:
        """分类货代意图.

        简单规则 + LLM 组合分类.
        """
        user_input_lower = user_input.lower()

        # 规则快速匹配
        if any(kw in user_input_lower for kw in ["提单", "发票", "装箱单", "上传"]):
            return "document"
        if any(kw in user_input_lower for kw in ["跟踪", "到哪里", "状态", "eta"]):
            return "tracking"
        if any(kw in user_input_lower for kw in ["订舱", "booking", "预约"]):
            return "booking"

        # 默认咨询
        return "inquiry"

    def _retrieve_freight_memories(
        self,
        agent_id: str,
        user_input: str,
        intent: Optional[str],
    ) -> list[dict]:
        """检索货代相关记忆.

        混合检索:关键词过滤 + 向量相似度.
        """
        from ai_service.services.memory_service import get_memory_service

        memory_service = get_memory_service()

        # 提取可能的发货单号、提单号
        entity_ids = self._extract_entity_ids(user_input)

        return memory_service.search_memories(
            agent_id=agent_id,
            query=user_input,
            entity_ids=entity_ids,
            fact_types=["shipment", "party", "preference"],
            top_k=5,
        )

    def _extract_entity_ids(self, text: str) -> list[str]:
        """从文本提取货代实体标识.

        提取 BL No, Container No, Shipment No 等.
        """
        import re

        patterns = [
            r"[A-Z]{4}\d{7}",  # 集装箱号格式
            r"[A-Z]{3}[A-Z]?\d{8,11}",  # 提单号常见格式
        ]

        entity_ids = []
        for pattern in patterns:
            matches = re.findall(pattern, text.upper())
            entity_ids.extend(matches)

        return entity_ids

    def _format_memory_context(self, memories: list[dict]) -> str:
        """格式化记忆为上下文."""
        context_parts = ["### 相关历史记录"]
        for mem in memories:
            context_parts.append(f"- {mem['fact_summary']} (置信度: {mem['confidence_score']:.2f})")
        return "\n".join(context_parts)

    def _process_freight_document(self, attachment: dict, agent_id: str) -> dict:
        """处理货代单证.

        协调 OCR、分类、字段提取.
        """
        from ai_service.services.freight_document_service import get_document_service

        doc_service = get_document_service()
        return doc_service.process_document(
            file_url=attachment.get("url"),
            file_type=attachment.get("type"),
            agent_id=agent_id,
        )

    def _format_document_response(self, doc_result: dict) -> str:
        """格式化单证处理结果为回复."""
        if doc_result.get("status") != "success":
            return f"单证处理失败: {doc_result.get('error', '未知错误')}"

        fields = doc_result.get("extracted_fields", {})
        doc_type = doc_result.get("document_type", "未知类型")

        response = f"已识别单证类型: **{doc_type}**\n\n提取的关键信息:\n"
        for key, value in fields.items():
            response += f"- **{key}**: {value}\n"

        return response

3.2 MCP 工具扩展(货代专用)

# ai_service/services/freight_mcp_tools.py
"""货代专用 MCP 工具定义.

注册到 MCP Server,供 Agent 调用.
"""

from typing import Any

class FreightMCPTools:
    """货代领域 MCP 工具集合.

    这些工具通过 MCPServer 注册,Agent 通过挂载配置获得调用权限.
    """

    @staticmethod
    def track_shipment(
        bl_no: str,
        carrier_code: str | None = None,
    ) -> dict[str, Any]:
        """跟踪货物状态.

        Args:
            bl_no: 提单号
            carrier_code: 承运人代码(可选,用于指定查询渠道)

        Returns:
            跟踪信息,包含当前状态、位置、时间线
        """
        # 实现:调用船公司 API 或聚合跟踪服务
        pass

    @staticmethod
    def validate_freight_document(
        document_type: str,
        extracted_fields: dict,
    ) -> dict[str, Any]:
        """验证单证字段的合规性.

        Args:
            document_type: 单证类型 (HBL/MBL/AWB/等)
            extracted_fields: AI 提取的字段

        Returns:
            验证结果,包含错误列表和建议
        """
        # 实现:规则引擎验证
        pass

    @staticmethod
    def query_shipping_schedule(
        origin_port: str,
        destination_port: str,
        etd_start: str | None = None,
        etd_end: str | None = None,
    ) -> dict[str, Any]:
        """查询船期.

        Args:
            origin_port: 起运港代码
            destination_port: 目的港代码
            etd_start: 最早离港日期 (ISO 格式)
            etd_end: 最晚离港日期 (ISO 格式)

        Returns:
            可用船期列表
        """
        # 实现:调用船公司 API 或船期数据库
        pass

    @staticmethod
    def check_customs_status(
        declaration_no: str,
        customs_code: str | None = None,
    ) -> dict[str, Any]:
        """查询报关状态.

        Args:
            declaration_no: 报关单号
            customs_code: 海关代码(可选)

        Returns:
            报关状态详情
        """
        # 实现:调用海关 API
        pass

    @staticmethod
    def send_notification(
        recipient: str,
        template_code: str,
        context: dict,
        channels: list[str] | None = None,
    ) -> dict[str, Any]:
        """发送业务通知.

        Args:
            recipient: 接收方(邮箱/手机号/企业微信ID)
            template_code: 通知模板代码
            context: 模板变量
            channels: 发送渠道 ["email", "sms", "wechat"]

        Returns:
            发送结果
        """
        # 实现:调用通知服务
        pass

4. 数据流详细设计

4.1 单证处理流程

sequenceDiagram
    participant User as 业务用户
    participant API as API Service<br/>(app.py)
    participant DocSvc as DocumentService
    participant Parser as DocumentParser<br/>(parsers/)
    participant Chunker as ChunkingStrategy
    participant Embed as EmbeddingService
    participant LLM as LLM Model
    participant DB as PostgreSQL
    participant Vector as Qdrant
    participant Minio as MinIO

    User->>API: POST /upload (单证PDF)
    API->>Minio: 上传原始文件
    Minio-->>API: object_key
    API->>DB: create_document_record
    API-->>User: document_id

    User->>API: POST /ingest
    API->>DocSvc: process_document()

    DocSvc->>Minio: download_file()
    Minio-->>DocSvc: file_bytes

    DocSvc->>Parser: parse(file_bytes)
    Parser-->>DocSvc: ParseResult(text, metadata)

    DocSvc->>LLM: classify_document(text)
    LLM-->>DocSvc: doc_type (HBL/MBL/Invoice)

    DocSvc->>LLM: extract_fields(text, doc_type)
    LLM-->>DocSvc: extracted_fields

    alt 需要向量检索增强
        DocSvc->>Chunker: chunk_text(text)
        Chunker-->>DocSvc: chunks[]

        DocSvc->>Embed: embed_batch(chunks)
        Embed-->>DocSvc: embeddings[]

        DocSvc->>Vector: upsert_vectors(embeddings)
        DocSvc->>DB: batch_create_document_chunks()
    end

    DocSvc->>DB: create_freight_document()
    DocSvc->>DB: create_memory_fact()  // 提取的事实写入记忆
    API-->>User: 提取结果

4.2 客服对话流程(AI/Human 切换)

sequenceDiagram
    participant User as 客户/客服
    participant WS as WebSocket<br/>(/ws/sessions/{id})
    participant Graph as FreightOrchestrator
    participant Intent as IntentClassifier
    participant Memory as MemoryService
    participant RAG as RAGRetrieval
    participant MCP as MCPClient
    participant LLM as LLM Model
    participant Audit as AuditLog

    User->>WS: 发送消息
    WS->>Graph: run(OrchestratorInput)

    Graph->>Intent: classify_intent()
    Intent-->>Graph: intent_type

    Graph->>Memory: search_memories()
    Memory-->>Graph: relevant_memories[]

    alt intent == "tracking"
        Graph->>MCP: invoke_tool("track_shipment")
        MCP-->>Graph: tracking_result
    else intent == "document"
        Graph->>RAG: retrieve_context()
        RAG-->>Graph: doc_context
    end

    Graph->>LLM: generate_reply(context)
    LLM-->>Graph: reply_text

    Graph->>Audit: log_mcp_call()
    Graph->>Memory: create_fact()  // 重要事实写入记忆

    Graph-->>WS: OrchestratorResult
    WS-->>User: 流式返回回复

    alt 需要人工接管
        User->>WS: agent_takeover 事件
        WS->>Graph: event=AGENT_TAKEOVER
        Graph->>DB: update_session_status(HUMAN_ACTIVE)
        Note over User,WS: 后续消息路由到人工客服
    end

5. 扩展实现路线图

5.1 Phase 0: 基础验证(2周)

目标: 验证货代场景在现有架构上的可行性

任务清单:

任务 文件位置 工作量
扩展 Document 模型增加货代字段 storage/models.py 1d
实现 HBL 解析模板 services/parsers/freight/ 3d
增加 intent_classifier 节点 orchestrator/freight_graph.py 2d
端到端测试 (上传→解析→回复) tests/integration/ 4d

验收标准: - [ ] 上传 HBL PDF → 正确识别单证类型 → 提取 Shipper/Consignee/Port 等关键字段 - [ ] 询问 "跟踪 ABCD1234567" → 正确识别 intent → 调用跟踪工具

5.2 Phase 1: 单点突破(6周)

目标: 完成单证 Agent v1 和客服 Agent v1

新增模块:

ai_service/
├── services/
│   ├── freight/                     # 货代专用服务包
│   │   ├── __init__.py
│   │   ├── document_classifier.py   # 单证分类器
│   │   ├── field_extractor.py       # 字段提取器 (基于 LLM)
│   │   ├── validation_engine.py     # 规则验证引擎
│   │   └── tracking_aggregator.py   # 跟踪数据聚合
│   ├── memory_service.py            # 记忆读写服务
│   └── notification_service.py      # 通知服务
├── orchestrator/
│   ├── freight_graph.py             # 货代编排图
│   └── nodes/                       # 编排节点包
│       ├── __init__.py
│       ├── intent_node.py
│       ├── memory_node.py
│       ├── document_node.py
│       └── tracking_node.py
└── mcp_servers/                     # 货代专用 MCP Server 实现
    ├── carrier_apis/                # 船公司 API 适配
    │   ├── maersk_mcp.py
    │   ├── msc_mcp.py
    │   └── cosco_mcp.py
    ├── customs_api.py               # 海关接口
    └── tracking_hub.py              # 跟踪聚合服务

5.3 Phase 2: 平台化(12周)

目标: 多 Agent 编排、低代码控制面、记忆系统

核心实现:

  1. Multi-Agent Router

    # ai_service/orchestrator/multi_agent_router.py
    class MultiAgentRouter:
        """多 Agent 路由决策器."""
    
        def route(self, intent: str, context: dict) -> str:
            """根据意图和上下文选择目标 Agent."""
            routing_map = {
                "booking": "BookingAgent",
                "tracking": "TrackingAgent",
                "document": "DocumentAgent",
                "customs": "CustomsAgent",
                "billing": "BillingAgent",
            }
            return routing_map.get(intent, "FrontDeskAgent")
    

  2. 记忆服务实现

    # ai_service/services/memory_service.py
    class MemoryService:
        """业务记忆读写服务."""
    
        def create_fact_from_document(
            self,
            shipment_id: str,
            extracted_fields: dict,
            confidence: float,
        ) -> MemoryFact:
            """从单证提取结果生成记忆事实."""
            pass
    
        def search_memories(
            self,
            query: str,
            entity_ids: list[str],
            top_k: int = 5,
        ) -> list[MemoryFact]:
            """混合检索记忆."""
            # 1. 结构化过滤 (entity_ids)
            # 2. 向量相似度检索
            # 3. 重排序 (confidence × importance × recency)
            pass
    

  3. 任务调度系统

    # ai_service/services/task_scheduler.py
    class TaskScheduler:
        """自动任务调度器."""
    
        def schedule_etd_reminder(self, shipment_id: str, etd: datetime):
            """订舱后自动创建 ETD 提醒任务."""
            pass
    
        def schedule_arrival_alert(self, shipment_id: str, eta: datetime):
            """预计到港前自动发送提醒."""
            pass
    


6. 关键实现决策

6.1 单证解析策略选择

策略 实现方式 适用场景 在项目中的位置
模板匹配 正则 + 规则引擎 格式固定的单证 services/freight/template_parser.py
LLM 提取 Claude/GPT-4 + Prompt 格式多变、字段语义复杂 services/freight/field_extractor.py
混合模式 先分类 → 再选策略 生产环境推荐 services/freight/document_processor.py

推荐: 混合模式 1. 用轻量模型/规则快速分类单证类型 2. 根据类型选择专用提取器 3. 低置信度结果走人工复核队列

6.2 记忆存储策略

记忆检索流程:

用户输入 → 提取实体ID (BL No, Shipment No)
    │
    ▼
┌─────────────────────────────────────────┐
│ 1. 结构化过滤                           │
│    WHERE entity_id IN (提取的ID)         │
│      AND tenant_id = 当前租户            │
│      AND created_at > NOW() - INTERVAL   │
└─────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────┐
│ 2. 向量相似度检索 (Qdrant)               │
│    查询向量 = embed(用户输入)            │
│    返回 Top-20 候选                      │
└─────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────┐
│ 3. 重排序 (Cross-Encoder)               │
│    Score = α*相似度 + β*置信度 + γ*时效性 │
│    返回 Top-5                            │
└─────────────────────────────────────────┘

6.3 MCP 工具安全控制

复用现有 mcp_policy.py 的安全机制:

# ai_service/services/freight_mcp_policy.py
from ai_service.services.mcp_policy import MCPPolicyService

class FreightMCPPolicy(MCPPolicyService):
    """货代专用 MCP 安全策略."""

    ALLOWED_CARRIER_APIS = ["maersk", "msc", "cosco", "one", "evergreen"]

    def evaluate_carrier_api_call(
        self,
        carrier_code: str,
        action: str,
        agent_id: str,
    ) -> PolicyDecision:
        """评估船公司 API 调用权限."""
        if carrier_code not in self.ALLOWED_CARRIER_APIS:
            return PolicyDecision(
                allowed=False,
                reason=f"未授权的船公司: {carrier_code}",
            )

        # 敏感操作(如订舱)需要额外审批
        if action in ["create_booking", "amend_booking"]:
            return self._require_human_approval(agent_id)

        return PolicyDecision(allowed=True)

7. 与现有代码的集成点

7.1 复用清单

组件 现有实现 货代扩展
配置管理 utils/settings.py 增加 [freight] section
数据库连接 utils/database.py 直接使用
模型加载 utils/model_loader.py 直接使用
文档存储 storage/minio_client.py 直接使用
向量检索 storage/qdrant_client.py 直接使用
分块策略 services/chunking.py 直接使用
嵌入服务 services/embedding.py 直接使用
MCP 客户端 services/mcp_client.py 直接使用
MCP 审计 services/mcp_audit.py 直接使用

7.2 扩展清单

组件 扩展方式
数据库模型 新增 freight_models.py, memory_models.py
编排图 继承 ChameleonOrchestrator 创建 FreightOrchestrator
API 端点 app.py 新增 /freight/* 路由组
MCP Server 新增 mcp_servers/freight/ 目录
前端页面 admin-frontend 新增货代管理页面

8. 性能与扩展性设计

8.1 关键指标目标

指标 目标值 实现策略
单证处理延迟 (P95) < 5s 异步任务 + 流式返回
客服回复延迟 (P95) < 3s 缓存记忆 + 预加载
跟踪查询延迟 (P95) < 2s MCP 结果缓存 5min
并发会话 1000+ 水平扩展 AI Service
向量检索召回率 > 90% Fine-tuned Embedding

8.2 水平扩展策略

部署架构:

                    ┌─────────────────┐
                    │   API Gateway   │
                    │   (Nginx/ALB)   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│  AI Service #1  │ │  AI Service #2  │ │  AI Service #N  │
│  (FastAPI)      │ │  (FastAPI)      │ │  (FastAPI)      │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
         │                   │                   │
         └───────────────────┼───────────────────┘
                             │
         ┌───────────────────┼───────────────────┐
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│   PostgreSQL    │ │    Qdrant       │ │    MinIO        │
│   (Primary)     │ │   (Vector)      │ │  (Object Store) │
└─────────────────┘ └─────────────────┘ └─────────────────┘

9. 风险缓解策略

风险 缓解措施
LLM 幻觉导致错误提取 人机回环 (HITL) + 置信度阈值 + 规则校验
单证格式极度不标准 主动学习 + 小样本微调 + 人工标注反馈
船公司 API 不稳定 熔断降级 + 多源聚合 + 缓存策略
敏感数据泄露 数据脱敏 + 字段级加密 + 审计日志
并发量突增 限流 + 队列 + 自动扩缩容

10. 下一步行动清单

立即开始(本周)

  • [ ] 评审本架构文档,确认技术方案
  • [ ] 创建 feature/freight-v1 分支
  • [ ] 实现 freight_models.py 基础模型
  • [ ] 配置货代开发环境 (添加测试船公司 API)

短期(2周内)

  • [ ] 完成 HBL 解析端到端流程
  • [ ] 集成第一个船公司 MCP Server (Maersk)
  • [ ] 实现基础记忆写入/读取
  • [ ] 编写首批集成测试

中期(6周内)

  • [ ] 上线单证 Agent v1
  • [ ] 上线客服 Agent v1
  • [ ] 完成 AI/Human 协同审计视图
  • [ ] 建立评测数据集与门禁

附录: 关键文件索引

文件 说明 当前状态
ai_service/storage/models.py 核心数据模型 ✅ 已存在
ai_service/orchestrator/graph.py 编排状态机 ✅ 已存在
ai_service/services/chunking.py 分块策略 ✅ 已存在
ai_service/services/ingestion.py 文档摄入 ✅ 已存在
ai_service/utils/settings.py 配置管理 ✅ 已存在
ai_service/storage/freight_models.py 货代模型 📝 待创建
ai_service/orchestrator/freight_graph.py 货代编排 📝 待创建
ai_service/services/memory_service.py 记忆服务 📝 待创建
ai_service/mcp_servers/freight/ 货代 MCP 📝 待创建