Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergence.ai/llms.txt

Use this file to discover all available pages before exploring further.

Streaming Idioms

All CRAFT agents stream their responses via Server-Sent Events (SSE). This page covers the practical patterns for producing well-formed SSE streams, handling partial results, and implementing cooperative cancellation.
A2A SSE vs. MCP Streamable HTTP: This page covers A2A streaming — the JSON-RPC events that flow between agents via Server-Sent Events (SSE). This is different from the MCP Streamable HTTP transport used to connect to the CRAFT tool gateway. A2A’s message/stream uses SSE; MCP tool connections use the MCP Streamable HTTP transport (e.g., ADK’s StreamableHTTPConnectionParams, FastMCP’s StreamableHttpTransport, the MCP Python SDK’s streamablehttp_client, or LangGraph’s MultiServerMCPClient with transport="http"). Both A2A SSE and MCP Streamable HTTP are current standards — they serve different protocol layers.

The Streaming Contract

The A2A protocol requires every message/stream response to follow a strict event ordering:
  1. First event: Task (state = submitted)
  2. Zero or more: TaskStatusUpdateEvent (state = working) with intermediate progress messages
  3. Zero or more: TaskArtifactUpdateEvent with result artifacts
  4. Final event: TaskStatusUpdateEvent with final: true and a terminal state (completed, failed, canceled)
Every event is a JSON-RPC 2.0 response object sent as a data: line in the SSE stream.
Never close the SSE connection without emitting a final event with final: true. Clients that receive an unexpected connection close will attempt to resubscribe via tasks/resubscribe, and if the task store has no record of the task, they will error. Always emit a final status event before the connection closes, even on error.

Streaming Text Output

Pydantic AI — run_stream with stream_text()

The following pattern demonstrates streaming text output from a Pydantic AI agent. Each text chunk is forwarded as a TaskStatusUpdateEvent with state=working:
response_message = updater.new_agent_message(
    parts=[Part(root=TextPart(text=""))]
)

async with self.agent.run_stream(
    user_message,
    deps=agent_deps,
    toolsets=[toolset],
) as run:
    async for text_content in run.stream_text():
        # Update the message in place — each chunk replaces the previous
        response_message.parts[0].root = TextPart(text=text_content)
        await updater.update_status(
            TaskState.working,
            message=response_message,
        )

    # Get the final complete output and emit the terminal working event
    result = await run.get_output()
    response_message.parts[0].root = TextPart(text=result)
    await updater.update_status(
        TaskState.working,
        message=response_message,
        final=True,
        metadata={"timestamp": datetime.now(timezone.utc).isoformat()},
    )

await updater.complete()
stream_text() returns the accumulated text at each yield, not a delta. Each chunk is the full response so far. This means the client always has a coherent partial response it can display immediately, without needing to concatenate chunks.

Google ADK — SSE via to_a2a()

When using to_a2a(), ADK handles all SSE event emission automatically. Your agent code just runs normally:
root_agent = Agent(
    model="gemini-2.0-flash",
    name="my_agent",
    instruction="...",
    tools=[my_tool],
)

# to_a2a() wraps the agent with SSE streaming support.
# Intermediate LLM output appears as TaskStatusUpdateEvent (state=working).
# Final output appears as TaskArtifactUpdateEvent.
a2a_app = to_a2a(root_agent, port=8003)
ADK’s to_a2a() uses TaskStatusUpdateEvent with state=working for streaming intermediate text chunks, then emits a TaskArtifactUpdateEvent for the final text. This differs from the Pydantic AI pattern (which uses only status events). Both are valid A2A; clients must handle both patterns.

Claude Agent SDK — Streaming via A2A

The Claude Agent SDK doesn’t wrap natively in A2A. Build a thin A2A executor that streams from the Anthropic API and emits the correct A2A events:
import anthropic
from uuid import uuid4
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, Part, TextPart

async def claude_a2a_handler(task_updater: TaskUpdater, user_message: str):
    client = anthropic.AsyncAnthropic()  # AsyncAnthropic for use inside async def

    await task_updater.update_status(
        TaskState.working,
        message=task_updater.new_agent_message([Part(root=TextPart(text="Starting..."))]),
        final=False,
    )

    full_response = ""
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_message}],
    ) as stream:
        async for text_delta in stream.text_stream:
            full_response += text_delta
            # Throttle: emit status every ~200 chars
            if len(full_response) % 200 < len(text_delta):
                await task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(
                        [Part(root=TextPart(text=full_response[:120] + "..."))]
                    ),
                    final=False,
                )

    await task_updater.add_artifact(
        parts=[Part(root=TextPart(text=full_response))],
        artifact_id=str(uuid4()),
        name="response",
        last_chunk=True,
    )
    await task_updater.update_status(
        TaskState.completed,
        message=task_updater.new_agent_message([Part(root=TextPart(text=full_response))]),
        final=True,
    )

LangGraph — Streaming via A2A

Use stream_mode="messages" which yields (chunk, metadata) tuples for token-level streaming. Use stream_mode="updates" for node-completion events instead.
from uuid import uuid4
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, Part, TextPart

async def langgraph_a2a_handler(task_updater: TaskUpdater, graph, user_message: str):
    await task_updater.update_status(
        TaskState.working,
        message=task_updater.new_agent_message([Part(root=TextPart(text="Running graph..."))]),
        final=False,
    )

    full_output = ""
    # stream_mode="messages" yields (AIMessageChunk, metadata) tuples
    async for chunk, metadata in graph.astream(
        {"messages": [{"role": "user", "content": user_message}]},
        stream_mode="messages",
    ):
        if hasattr(chunk, "content") and chunk.content:
            full_output += chunk.content
            if len(full_output) % 200 < len(chunk.content):
                await task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(
                        [Part(root=TextPart(text=full_output[:120] + "..."))]
                    ),
                    final=False,
                )

    await task_updater.add_artifact(
        parts=[Part(root=TextPart(text=full_output))],
        artifact_id=str(uuid4()),
        name="result",
        last_chunk=True,
    )
    await task_updater.update_status(
        TaskState.completed,
        message=task_updater.new_agent_message([Part(root=TextPart(text=full_output))]),
        final=True,
    )
For graphs without a messages state key, switch to stream_mode="updates" and iterate for node_name, payload in chunk.items(): to detect completed nodes.

Streaming Intermediate Status Messages

Use intermediate status messages to keep users informed during long-running tasks. Emit an intent acknowledgment before the main agent loop:
# Emit a brief acknowledgment before the agent starts processing.
# This confirms to the user that the request was received.
ack_message = updater.new_agent_message(
    parts=[Part(root=TextPart(text="Analysing your request..."))]
)
await updater.update_status(
    TaskState.working, message=ack_message, final=False
)

# Then run the main agent loop...
For long database queries, emit named step messages:
for step_name, step_fn in pipeline_steps:
    await updater.update_status(
        TaskState.working,
        message=updater.new_agent_message(
            [Part(root=TextPart(text=f"Step: {step_name}"))]
        ),
        final=False,
    )
    await step_fn()

Artifact Streaming

Artifacts (charts, data files, analysis results) are emitted as TaskArtifactUpdateEvent. For multi-artifact agents (like a text-to-SQL agent that produces intermediate SQL plans followed by query results), artifacts are emitted in a specific order:
  1. sql_plan — intermediate SQL planning artifact (TextPart, JSON)
  2. query_summary — human-readable summary of results (TextPart)
  3. sql_query — the generated SQL (DataPart with artifact URI)
  4. query_results — Parquet results file (DataPart with artifact URI, last_chunk=true)
# Emit artifacts in order using the task updater
await updater.add_artifact(
    parts=[TextPart(text=summary_text)],
    artifact_id=str(uuid4()),
    name="query_summary",
)

await updater.add_artifact(
    parts=[DataPart(data={
        "type": "artifact",
        "uri": parquet_resource_uri,
        "resource_type": "parquet",
        "metadata": {"row_count": row_count, "columns": column_names},
    })],
    artifact_id=str(uuid4()),
    name="query_results",
    last_chunk=True,
)

Cooperative Cancellation

CRAFT supports cooperative cancellation: a client sends tasks/cancel, and the agent should stop its current work and emit a canceled terminal event.

Pydantic AI — asyncio.CancelledError

Implement cancellation via asyncio task cancellation. The cancel() method updates the task state in the database before the running asyncio task receives the cancellation signal:
class MyAgentExecutor(AgentExecutor):
    async def cancel(
        self, context: RequestContext, event_queue: EventQueue
    ) -> None:
        task_id = context.task_id

        # Write canceled state to DB immediately so other replicas see it.
        # The active asyncio task will receive CancelledError on its next await.
        await self.database.task_repository.update_state(task_id, "CANCELED")

        updater = TaskUpdater(event_queue, task_id, context.context_id)
        cancel_message = updater.new_agent_message(
            parts=[Part(root=TextPart(text="Response stopped."))],
        )
        await updater.update_status(
            TaskState.canceled,
            message=cancel_message,
            final=True,
        )

    async def execute(self, context, event_queue):
        try:
            # ... main agent loop ...
        except asyncio.CancelledError:
            # CancelledError propagates from run_stream() when the task is cancelled.
            # Don't emit any more events — the cancel() method already did.
            logger.info("Agent execution canceled")

Cross-Instance Cancellation

In Kubernetes with multiple agent replicas, the cancel request may hit a different pod than the one running the task. Use a background polling watcher to detect cross-instance cancellation:
async def _cancellation_watcher(
    self, task_id: str, parent_task: asyncio.Task
) -> None:
    """Poll DB every 5s for cancellation from another instance."""
    while True:
        await asyncio.sleep(5)
        state = await self.database.task_repository.get_state(task_id)
        if state == "CANCELED":
            parent_task.cancel()
            return
Start the watcher at the beginning of execute() and cancel it in the finally block:
cancel_watcher = asyncio.create_task(
    self._cancellation_watcher(task_id, asyncio.current_task())
)
try:
    # ... agent loop ...
finally:
    cancel_watcher.cancel()

Reconnection Handling

If an SSE connection drops mid-stream, the client sends a tasks/resubscribe request. The DefaultRequestHandler from the a2a library handles this automatically when used with a persistent task store.
from a2a.server.tasks import InMemoryTaskStore  # for development
# Use a persistent task store for production — survives pod restarts

task_store = PersistentTaskStore(database)  # your database-backed implementation
handler = DefaultRequestHandler(
    agent_executor=executor,
    task_store=task_store,
)
InMemoryTaskStore does not survive pod restarts. In production, implement a database-backed TaskStore (e.g., backed by PostgreSQL). If the task store loses state, clients that attempt tasks/resubscribe will receive a TaskNotFoundError.

LLM Error Retry

Transient LLM errors (HTTP 5xx, 429) should be retried with exponential backoff. A well-behaved executor retries up to 2 times:
MAX_LLM_RETRIES = 2
LLM_RETRY_BACKOFF_BASE = 2.0  # seconds: 2, 4

for attempt in range(MAX_LLM_RETRIES + 1):
    try:
        async with self.agent.run_stream(...) as run:
            async for text in run.stream_text():
                ...
        break  # success — exit retry loop
    except ModelHTTPError as exc:
        if exc.status_code == 429 or exc.status_code >= 500:
            if attempt < MAX_LLM_RETRIES:
                delay = LLM_RETRY_BACKOFF_BASE * (2 ** attempt)
                await asyncio.sleep(delay)
            else:
                await self._handle_agent_failure(exc, response_message, updater)
                raise
        else:
            # 4xx non-429: not retryable
            await self._handle_agent_failure(exc, response_message, updater)
            raise

First-Token Latency

Track time-to-first-token to detect model cold starts and MCP connection overhead:
stream_start = time.monotonic()
first_token_recorded = False

async for text_content in run.stream_text():
    if not first_token_recorded:
        first_token_latency = time.monotonic() - stream_start
        metrics.record_stream_first_token(first_token_latency)
        first_token_recorded = True
    # ... stream the chunk
Typical first-token latencies on CRAFT: 500ms–2s for Gemini Flash, 1s–4s for Gemini Pro with cold MCP connections.

Next Steps

Debugging Agents

Inspect streaming traces and diagnose SSE issues.

Eval Harness

Set up Langfuse to evaluate streaming agent behaviour.