Skip to content

:material-robot-group: Multi-Agent Collaboration

The system uses a LangGraph state machine to orchestrate the multi-Agent collaboration flow, assembling "intent recognition → routing → Agent execution → dialog polish" into a directed graph. When LangGraph is unavailable, it auto-degrades to a synchronous orchestrator with fully consistent behavior.


LangGraph State Machine

Node Flow Diagram

flowchart TD
    Start([User Request]) --> Intent["intent_node<br/>Intent Recognition"]
    Intent --> Route["route_node<br/>Routing Decision"]
    Route --> Cond1{"Escalate to human?<br/>_route_after_route"}
    Cond1 -- "User active request / sentiment sensitive / consecutive failures ≥ 2" --> Escalate["escalate_node<br/>Escalate to Human"]
    Cond1 -- "No" --> Agent["agent_node<br/>Parallel subtask execution"]
    Agent --> Cond2{"Escalate to human?<br/>_route_after_agent"}
    Cond2 -- "Consecutive failures ≥ 2" --> Escalate
    Cond2 -- "No" --> Dialog["dialog_node<br/>Dialog Polish"]
    Dialog --> End([Return Reply])
    Escalate --> End

    style Intent fill:#e3f2fd,stroke:#2196f3
    style Agent fill:#e8f5e9,stroke:#4caf50
    style Escalate fill:#ffebee,stroke:#f44336
    style Dialog fill:#fff3e0,stroke:#ff9800

Node Definitions

Node Function Responsibility
intent intent_node Intent recognition + sentiment scoring + intent switch detection
route route_node Pure decision anchor, does not modify state, branches by conditional edge
agent agent_node Parallel execution of subtasks, updates failure count
dialog dialog_node Result merging + LLM polish (skipped for non-knowledge Q&A)
escalate escalate_node Mark escalation to human + generate EscalationCard

Transition Conditions

The system uses two conditional edges to decide the flow:

# Escalation trigger conditions (by priority)
# 1. User actively requests escalation → highest priority, ignores other conditions
if _is_user_request_human(message):
    return "escalate"
# 2. Sentiment-sensitive intent → escalate directly, to avoid escalating conflict
if intent == "emotion_sensitive":
    return "escalate"
# 3. Consecutive failures reach threshold (≥2) → escalate
if failed_attempts >= ESCALATE_FAILED_THRESHOLD:
    return "escalate"
return "agent"
# If accumulated failures reach threshold, escalate to human
if failed_attempts >= ESCALATE_FAILED_THRESHOLD:
    return "escalate"
# Otherwise proceed to dialog polish
return "dialog"

ESCALATE_FAILED_THRESHOLD = 2

Escalation to human triggers after 2 consecutive unresolved turns, to avoid the user looping in automated replies. The threshold is defined in orchestrator.py and can be adjusted as needed.


AgentState Structure

AgentState is the state object shared between LangGraph nodes, using TypedDict(total=False) to make all fields optional, allowing each node to update locally.

class AgentState(TypedDict, total=False):
    # Session and user identifiers
    session_id: Optional[str]
    user_id: Optional[str]
    # User input for this turn
    message: str
    # Recognized intent (Intent enum string)
    intent: str
    # Decomposed subtask list
    sub_tasks: List[Dict[str, Any]]
    # Sentiment score (0-1, lower is more negative)
    emotion_score: float
    # Orchestration state counters
    turn_count: int
    failed_attempts: int
    # Conversation history, for DialogAgent to keep context
    history: List[Dict[str, Any]]
    # Raw output of each agent, indexed by agent_name
    raw_results: Dict[str, Any]
    # Final reply to the user
    final_reply: str
    # Citation source list
    sources: List[str]
    # Whether to escalate to human
    escalate_to_human: bool
    # Escalation context card (dict form, easy to serialize)
    escalation_card: Optional[Dict[str, Any]]
    # Monitoring trace ID
    trace_id: Optional[str]
    # Layered summary context text (reduces token consumption)
    layered_context_text: Optional[str]
    # Intent switch detection result
    intent_switch: Optional[Dict[str, Any]]
Why total=False?

total=False makes all fields optional, so each node only needs to update the fields it is responsible for, without constructing a full state. The entry run_graph initializes core fields, and nodes supplement as needed. When tests call a single node function directly, they also do not need to construct all fields.


Three-Level Intent Recognition

The Orchestrator's intent recognition uses a three-level mechanism, degrading level-by-level to ensure availability and performance:

flowchart TD
    Q[User Query] --> L1{"Level 1: Rule Fast Path<br/>keyword hit?"}
    L1 -- "Chitchat/escalation keywords" --> Fast["Return IntentResult directly<br/>skip LLM"]
    L1 -- "No hit" --> Mock{"Mock mode?<br/>LLM_API_KEY empty"}
    Mock -- "Yes" --> Keyword["Keyword rule recognition<br/>sentiment → ticket → business → chitchat → knowledge_qa"]
    Mock -- "No" --> L2{"Level 2: IntentCache<br/>cache hit?"}
    L2 -- "Hit (confidence ≥ 0.7)" --> Cached["Skip LLM<br/>first token ~800ms"]
    L2 -- "No hit" --> L3["Level 3: LLM recognition<br/>ModelRouter routing"]
    L3 --> MR{"Complexity score"}
    MR -- "Simple → small model" --> Small["Doubao/Qwen ~1s"]
    MR -- "Complex → main LLM" --> Main["DeepSeek ~2.7s"]
    Small --> Parse["JSON parse"]
    Main --> Parse
    Parse -- "Parse success" --> Result[IntentResult]
    Parse -- "Parse failure" --> Keyword

    style Fast fill:#c8e6c9,stroke:#4caf50
    style Cached fill:#c8e6c9,stroke:#4caf50
    style Keyword fill:#fff9c4,stroke:#fbc02d

Level 1: Rule Fast Path

High-frequency simple intents match keywords directly, skipping LLM calls:

# Chitchat keywords: hello/thanks/hi/are you there/goodbye/good morning/good night
if any(w in q for w in CHITCHAT_KEYWORDS):
    return IntentResult(intent=Intent.CHITCHAT, confidence=0.95, ...)

# Escalation keywords: escalate to human/human agent/contact human/find human
if any(w in q for w in ("转人工", "人工客服", "联系人工", "找人工")):
    return IntentResult(intent=Intent.TICKET, confidence=0.9, ...)

Level 2: IntentCache

In LLM mode, check IntentCache first; on hit, skip the LLM call:

# Intent is stable (the intent of the same query usually does not change); reuse within 30-min TTL is safe
# Only cache high-confidence results with confidence >= 0.7, to avoid reusing low-quality intents
cached_intent = get_intent_cache().get(query)
if cached_intent is not None:
    return cached_intent  # First token from 2.7s down to ~800ms

Level 3: LLM Recognition + ModelRouter

On cache miss, go through the LLM, routed by ModelRouter by complexity:

# Simple query → small model (Doubao/Qwen-turbo), first token ~1s
# Complex query → main LLM (DeepSeek), to preserve quality
if get_small_llm_client() is not None:
    raw = get_model_router().chat_with_routing(
        messages=messages, query=query, temperature=0.0,
        name="intent_recognition", ...
    )
else:
    # When small model is not configured, use main LLM directly, to avoid dual-Provider model name incompatibility
    raw = self.llm_client.chat(messages, ...)

Fallback: Keyword Rules

When the LLM returns non-JSON or missing fields, degrade to keyword rules:

# Match by priority: sentiment → ticket → business → chitchat → knowledge_qa
if any(word in query for word in OFFENSIVE_KEYWORDS):
    return IntentResult(intent=Intent.EMOTION_SENSITIVE, ...)
if any(word in query for word in TICKET_KEYWORDS):  # return/refund/complaint
    return IntentResult(intent=Intent.BUSINESS_QUERY, ...)  # also generates a ticket subtask
# ... default to knowledge_qa

Dual subtasks for returns/refunds

When ticket keywords (return/refund/exchange/complaint/after-sales) are hit, it is also treated as a business query, generating both business_query + ticket subtasks to run in parallel, querying business status while creating a ticket.


agent_node Parallel Execution

Complex problems (multiple subtasks) are executed in parallel via ThreadPoolExecutor, significantly reducing total latency in IO-intensive scenarios:

# Thread pool size for parallel subtask execution on complex problems
# 4 balances concurrency and resource usage; can be increased for IO-intensive scenarios
_AGENT_PARALLEL_WORKERS = 4

def agent_node(state: AgentState) -> AgentState:
    sub_tasks = state.get("sub_tasks", [])
    tasks = [(task["agent_name"], task["input"]) for task in sub_tasks]

    if len(tasks) <= 1:
        # Single subtask runs synchronously, avoiding thread pool overhead
        for agent_name, task_input in tasks:
            result = _dispatch_to_agent(agent_name, task_input, state)
            raw_results[agent_name] = result
    else:
        # Multiple subtasks run in parallel
        with ThreadPoolExecutor(max_workers=_AGENT_PARALLEL_WORKERS) as executor:
            futures = {
                executor.submit(_dispatch_to_agent, name, inp, state): name
                for name, inp in tasks
            }
            for future in futures:
                agent_name = futures[future]
                try:
                    result = future.result()
                except Exception as exc:
                    # A single subtask failure does not affect others; overall remains available
                    result = {"result": "This capability is under development and cannot be processed yet.", "sources": []}
                raw_results[agent_name] = result

Failure count and escalation

After agent_node executes, failed_attempts is updated based on results:

  • Any subtask returns a placeholder message ("under development") or a miss marker ("[knowledge base miss]") → treated as unresolved, failed_attempts += 1
  • All subtasks return normally → treated as resolved, failed_attempts = 0
  • Accumulated failed_attempts >= 2 → next conditional edge triggers escalation

Agent Executor Mapping

_AGENT_EXECUTORS = {
    "knowledge_qa": _execute_knowledge,      # KnowledgeAgent hybrid retrieval + RAG
    "business_query": _execute_business,      # BusinessAgent business query
    "emotion_sensitive": _execute_emotion,    # EmotionAgent sentiment soothing
    "ticket": _execute_ticket,                # TicketAgent ticket processing
    "chitchat": _execute_chitchat,            # chitchat response
}

Intent Switch Detection and Context Management

Before intent recognition, the system first does "intent switch detection", to prevent old-topic slots from polluting the new intent:

def intent_node(state: AgentState) -> AgentState:
    # Pre-step: intent switch detection
    if session_id:
        session = session_manager.get_session(session_id) or {}
        current_intent = session.get("current_intent")
        if current_intent:  # skip on first turn
            detector = get_intent_detector()
            switch_result = detector.detect_switch(message, session_id, current_intent)
            if switch_result.switched:
                # Switched: reset slots, keep history for backtracking
                session_manager.reset_slots(session_id)

Layered summary context: At the run_graph entry, ContextManager generates a refined context text, injected by dialog_node into DialogContext.layered_summary, replacing full history with the refined context to reduce token consumption.


Synchronous Fallback Path

When LangGraph is unavailable or build fails, the system degrades to the _SynchOrchestrator synchronous orchestrator:

flowchart LR
    Entry[run_graph entry] --> Check{"_get_compiled_graph<br/>LangGraph available?"}
    Check -- "Available" --> LG["compiled.invoke(state)<br/>LangGraph state machine"]
    Check -- "Unavailable/build failed" --> Sync["_SynchOrchestrator.run<br/>Synchronous orchestrator"]
    LG -- "Runtime exception" --> Sync
    LG --> Final[Final AgentState]
    Sync --> Final

    style LG fill:#e3f2fd,stroke:#2196f3
    style Sync fill:#fff9c4,stroke:#fbc02d

_SynchOrchestrator Implementation

The synchronous orchestrator reuses the same node functions, calling them in order, with behavior fully consistent with the LangGraph version:

class _SynchOrchestrator:
    """Synchronous orchestrator: fallback when LangGraph is unavailable.

    Directly reuses graph node functions (intent_node / agent_node / dialog_node / escalate_node),
    calling them in order. Behavior is fully consistent with the LangGraph version, just without
    graph-structure scheduling.
    """

    def run(self, initial_state: AgentState) -> AgentState:
        state = initial_state
        state = intent_node(state)          # 1. Intent recognition
        state = route_node(state)           # 2. Routing decision
        next_node = _route_after_route(state)
        if next_node == "escalate":         # 3. Escalate or execute agent
            return escalate_node(state)
        state = agent_node(state)           # 4. Parallel subtask execution
        next_node = _route_after_agent(state)
        if next_node == "escalate":         # 5. Escalate if failures reach threshold
            return escalate_node(state)
        return dialog_node(state)           # 6. Dialog polish

Seamless degradation

The synchronous orchestrator reuses the exact same node functions and conditional logic. failed_attempts accumulation, escalation triggers, and dialog polish behavior are consistent with the LangGraph version. Users and callers perceive no difference; only the log records LangGraph build failed, degrading to synchronous orchestrator.

Fallback Trigger Timing

Timing Behavior
LangGraph package not installed _get_compiled_graph() returns None, goes directly to synchronous orchestration
LangGraph build exception Records _graph_init_error, no retry afterward, goes to synchronous orchestration
LangGraph runtime exception Degrades to synchronous orchestration, ensuring this request is available

Retry mechanism

After LangGraph build fails, it does not retry on every request (to avoid repeated exception overhead). To retry, call reset_graph() to clear the cache, and the next run_graph will rebuild. This is commonly used in tests after switching configs.


Topic Link
RAG retrieval pipeline (inside KnowledgeAgent) RAG Retrieval Pipeline
Fallback strategy (incl. LangGraph fallback) Fallback Strategy
Architecture Architecture