Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergence.ai/llms.txt

Use this file to discover all available pages before exploring further.

Multi-Agent Patterns

CRAFT’s multi-agent coordination happens at the protocol layer through A2A. Agents discover each other via the registry, delegate work via JSON-RPC, and stream results via SSE — without sharing process memory or tight framework coupling. This page covers the three main coordination patterns used in production CRAFT deployments.

The CRAFT Multi-Agent Architecture

CRAFT’s analytics solution is a good reference for multi-agent patterns. A typical layout chains three specialized agents:
  1. Orchestrator — receives user requests, plans the workflow, delegates to sub-agents
  2. Text2SQL — converts natural language to SQL, executes against the target database, returns query results as Parquet artifacts
  3. Insights — analyses query results, generates charts, produces natural-language insights and visualizations
Each agent is an independently deployed microservice. They communicate exclusively through the A2A protocol.

Pattern 1: Sequential Delegation

The orchestrator delegates tasks to sub-agents one at a time, using the output of one as the input to the next. This is the most common pattern on CRAFT.

Google ADK — RemoteA2aAgent Sub-Agents

ADK makes sequential delegation transparent: RemoteA2aAgent instances look like local sub-agents. The orchestrator LLM routes to them via the transfer_to_agent tool. Resolve sub-agent URLs from the CRAFT Assets API rather than hardcoding hostnames.
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.agents.llm_agent import Agent

# Resolve agent card URLs from the Assets API at startup
text2sql = RemoteA2aAgent(
    name="text2sql_agent",
    description="Converts natural language to SQL queries and executes them.",
    agent_card="https://<text2sql-agent-host>/.well-known/agent-card.json",
)

insights = RemoteA2aAgent(
    name="insights_agent",
    description="Analyses query results, generates charts, and produces insights.",
    agent_card="https://<insights-agent-host>/.well-known/agent-card.json",
)

orchestrator = Agent(
    model="gemini-2.5-flash",
    name="orchestrator",
    instruction="""
    You are a data analysis orchestrator.

    When the user asks a data question:
    1. Delegate to text2sql_agent to retrieve the data
    2. Delegate to insights_agent to analyse and visualise it
    3. Synthesize the results for the user

    Always pass the datasource context (resource_uri, schema) to text2sql_agent.
    """,
    sub_agents=[text2sql, insights],
)

Claude Agent SDK — Sequential Delegation

import httpx
import anthropic
from a2a.client import A2AClient
from a2a.types import SendMessageRequest, MessageSendParams, Message, Part, TextPart, Role
from uuid import uuid4

claude = anthropic.Anthropic()

async def claude_orchestrator(user_question: str, sub_agent_url: str, context_id: str) -> str:
    async with httpx.AsyncClient() as http:
        a2a = A2AClient(http, url=sub_agent_url)
        # Delegate to sub-agent via A2A
        request = SendMessageRequest(
            id=str(uuid4()),
            params=MessageSendParams(
                message=Message(
                    role=Role.user,
                    message_id=str(uuid4()),
                    context_id=context_id,
                    parts=[Part(root=TextPart(text=user_question))]
                )
            )
        )
        result = await a2a.send_message(request)
        sub_result = ""
        if result.root.result and result.root.result.artifacts:
            for artifact in result.root.result.artifacts:
                for part in artifact.parts:
                    if hasattr(part.root, "text"):
                        sub_result += part.root.text

    # Claude synthesises the final response
    response = claude.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Original question: {user_question}\nSub-agent result: {sub_result}\n\nSynthesize a final answer."
        }],
    )
    return response.content[0].text

LangGraph — Sequential Delegation

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import httpx
from a2a.client import A2AClient
from a2a.types import SendMessageRequest, MessageSendParams, Message, Part, TextPart, Role
from uuid import uuid4

class OrchestratorState(TypedDict):
    question: str
    context_id: str
    sub_agent_result: str
    final_answer: str

async def delegate_to_sub_agent(state: OrchestratorState) -> OrchestratorState:
    async with httpx.AsyncClient() as http:
        a2a = A2AClient(http, url="https://<agent-host>")
        request = SendMessageRequest(
            id=str(uuid4()),
            params=MessageSendParams(
                message=Message(
                    role=Role.user,
                    message_id=str(uuid4()),
                    context_id=state["context_id"],
                    parts=[Part(root=TextPart(text=state["question"]))]
                )
            )
        )
        result = await a2a.send_message(request)
    sub_text = ""
    if result.root.result and result.root.result.artifacts:
        for artifact in result.root.result.artifacts:
            for part in artifact.parts:
                if hasattr(part.root, "text"):
                    sub_text += part.root.text
    return {"sub_agent_result": sub_text}

builder = StateGraph(OrchestratorState)
builder.add_node("delegate", delegate_to_sub_agent)
builder.set_entry_point("delegate")
builder.add_edge("delegate", END)
graph = builder.compile()

How ADK Delegation Works

When the orchestrator LLM decides to delegate:
  1. LLM emits a transfer_to_agent tool call with the target agent name and message
  2. ADK resolves the remote agent’s card (lazy, cached after first fetch)
  3. ADK constructs an A2A message/stream JSON-RPC request from the session context
  4. Sends via httpx.AsyncClient to the sub-agent’s URL
  5. Processes the SSE response stream back into ADK internal events
  6. Returns the final artifact to the orchestrator as if it were a local sub-agent result

Passing Context Between Agents

The Text2SQL agent requires a DataPart with the datasource context. The orchestrator must forward this from the original user message:
{
  "method": "message/stream",
  "params": {
    "message": {
      "parts": [
        { "kind": "text", "text": "Show me top 10 customers by revenue" },
        {
          "kind": "data",
          "data": {
            "type": "datasource",
            "resource_uri": "data:acme-corp:ml-project:analytics-db",
            "datasource_type": "database",
            "datasource_name": "Analytics Database",
            "selected_schemas": [
              {
                "schema_name": "public",
                "schema_fqn": "analytics-db.analytics_db.public"
              }
            ]
          }
        }
      ]
    }
  }
}
The resource_uri must be in the full four-segment format: data:{org_id}:{project_id}:{name}. The simplified format (data:my-db) will fail — the Assets API requires all four segments for connection resolution.

Pattern 2: Parallel Fan-Out

Multiple sub-agents are invoked concurrently and their results are merged. Use this when sub-tasks are independent and latency matters.

Manual Fan-Out with asyncio

For frameworks other than ADK (or for orchestrators that need explicit concurrency control):
import asyncio
import httpx
from a2a.client import A2AClient


async def fan_out_query(
    context_id: str,
    user_message: str,
    agent_urls: list[str],
) -> list[dict]:
    """Invoke multiple agents in parallel and collect their results."""

    async def invoke_agent(url: str) -> dict:
        async with httpx.AsyncClient() as http:
            a2a = A2AClient(http, url=url)
            # Full shape: see a2a-protocol-primer.mdx for the SendMessageRequest + Message/Part structure
            # Simplified for readability — replace with:
            # request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(
            #     message=Message(role=Role.user, message_id=str(uuid4()),
            #                     context_id=context_id, parts=[Part(root=TextPart(text=user_message))])))
            result = await a2a.send_message(
                message=user_message,
                context_id=context_id,
            )
            return result

    tasks = [invoke_agent(url) for url in agent_urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter out failures — fan-out should be resilient to partial failures
    return [r for r in results if not isinstance(r, Exception)]

ADK Fan-Out via Parallel Sub-Agents

Google ADK supports parallel sub-agent invocation when the orchestrator instruction explicitly requests concurrent delegation:
orchestrator = Agent(
    model="gemini-2.0-flash",
    name="orchestrator",
    instruction="""
    You can delegate to multiple agents simultaneously when tasks are independent.
    For data analysis requests that need both SQL results and market context,
    invoke text2sql_agent and context_agent in parallel, then synthesize results.
    """,
    sub_agents=[text2sql, context_agent],
)
ADK’s RemoteA2aAgent A2A support is currently experimental. Parallel invocation of multiple RemoteA2aAgent instances in the same turn may produce duplicate events (tracked in ADK issue #3207). Test thoroughly before using in production.

Pattern 3: Supervision and Retry

An orchestrator supervises sub-agent results and retries failed tasks with corrected inputs.
orchestrator = Agent(
    model="gemini-2.0-flash",
    name="orchestrator",
    instruction="""
    When text2sql_agent returns a failed task:
    1. Inspect the error message
    2. If it's a schema error, call text2sql_agent again with corrected schema context
    3. If it's a permission error, ask the user to check their database access
    4. Maximum 2 retries before reporting failure to the user

    Never retry more than twice — report the issue instead of looping indefinitely.
    """,
    sub_agents=[text2sql],
)

Detecting Sub-Agent Failures

A sub-agent that fails emits a TaskStatusUpdateEvent with state=failed and an error message. ADK surfaces this as an error event in the orchestrator’s event stream. The orchestrator LLM sees the failure message in its context and can decide to retry or escalate.

Registering Sub-Agents with the Orchestrator

When adding a new agent to the platform, you must:
  1. Register it with the Assets API — gives it a resource_uri and makes it discoverable
  2. Add it as a RemoteA2aAgent to the orchestrator (if using Google ADK) — and redeploy
  3. Update the orchestrator instruction — explain what the new agent does and when to use it
# Add to orchestrator's sub_agents list
new_agent = RemoteA2aAgent(
    name="my_new_agent",
    description="Describe what this agent does and when the orchestrator should use it.",
    agent_card="https://<my-new-agent-host>/.well-known/agent-card.json",
)

orchestrator = Agent(
    ...
    sub_agents=[text2sql, insights, new_agent],  # add here
)
The description field on RemoteA2aAgent is what the orchestrator LLM reads to decide routing. Write it as a capability statement: “Use this agent when the user needs X, Y, or Z.”

Carrying Artifacts Across Agent Boundaries

Artifacts (Parquet files, charts, SQL queries) produced by sub-agents are stored in the Assets API and referenced by resource_uri. When the orchestrator passes results from one agent to another, it passes the resource_uri — not the raw data.
# Text2SQL produces query_results as an ArtifactFilePart
{
    "type": "artifact",
    "uri": "data:acme-corp:ml-project:query-results-uuid",
    "resource_type": "parquet",
    "metadata": {"row_count": 100, "columns": ["customer", "revenue"]}
}

# Orchestrator passes this URI to Insights Agent as a DataPart
{
    "kind": "data",
    "data": {
        "type": "artifact_context",
        "uri": "data:acme-corp:ml-project:query-results-uuid",
        "resource_type": "parquet"
    }
}

Context Propagation

Every A2A message must include a context_id — the conversation session identifier. Sub-agents use this to look up conversation history from the task store. In a multi-agent architecture, the orchestrator’s context_id flows through to each sub-agent call. This ensures that sub-agents can retrieve prior turns when needed for multi-step workflows.
# ADK handles context_id propagation automatically via RemoteA2aAgent.
# For manual A2A calls, always forward the parent context_id:
response = await a2a_client.send_message(
    message=user_message,
    context_id=parent_context_id,  # always forward
)

Next Steps

Streaming Idioms

SSE streaming, partial results, and cooperative cancellation.

A2A Protocol Primer

Review the A2A wire format and task lifecycle.