All CRAFT agents stream their responses via Server-Sent Events (SSE). This page covers the practical patterns for producing well-formed SSE streams, handling partial results, and implementing cooperative cancellation.
A2A SSE vs. MCP Streamable HTTP: This page covers A2A streaming — the JSON-RPC events that flow between agents via Server-Sent Events (SSE). This is different from the MCP Streamable HTTP transport used to connect to the CRAFT tool gateway. A2A’s message/stream uses SSE; MCP tool connections use the MCP Streamable HTTP transport (e.g., ADK’s StreamableHTTPConnectionParams, FastMCP’s StreamableHttpTransport, the MCP Python SDK’s streamablehttp_client, or LangGraph’s MultiServerMCPClient with transport="http"). Both A2A SSE and MCP Streamable HTTP are current standards — they serve different protocol layers.
The A2A protocol requires every message/stream response to follow a strict event ordering:
First event: Task (state = submitted)
Zero or more: TaskStatusUpdateEvent (state = working) with intermediate progress messages
Zero or more: TaskArtifactUpdateEvent with result artifacts
Final event: TaskStatusUpdateEvent with final: true and a terminal state (completed, failed, canceled)
Every event is a JSON-RPC 2.0 response object sent as a data: line in the SSE stream.
Never close the SSE connection without emitting a final event with final: true. Clients that receive an unexpected connection close will attempt to resubscribe via tasks/resubscribe, and if the task store has no record of the task, they will error. Always emit a final status event before the connection closes, even on error.
The following pattern demonstrates streaming text output from a Pydantic AI agent. Each text chunk is forwarded as a TaskStatusUpdateEvent with state=working:
response_message = updater.new_agent_message( parts=[Part(root=TextPart(text=""))])async with self.agent.run_stream( user_message, deps=agent_deps, toolsets=[toolset],) as run: async for text_content in run.stream_text(): # Update the message in place — each chunk replaces the previous response_message.parts[0].root = TextPart(text=text_content) await updater.update_status( TaskState.working, message=response_message, ) # Get the final complete output and emit the terminal working event result = await run.get_output() response_message.parts[0].root = TextPart(text=result) await updater.update_status( TaskState.working, message=response_message, final=True, metadata={"timestamp": datetime.now(timezone.utc).isoformat()}, )await updater.complete()
stream_text() returns the accumulated text at each yield, not a delta. Each chunk is the full response so far. This means the client always has a coherent partial response it can display immediately, without needing to concatenate chunks.
When using to_a2a(), ADK handles all SSE event emission automatically. Your agent code just runs normally:
root_agent = Agent( model="gemini-2.0-flash", name="my_agent", instruction="...", tools=[my_tool],)# to_a2a() wraps the agent with SSE streaming support.# Intermediate LLM output appears as TaskStatusUpdateEvent (state=working).# Final output appears as TaskArtifactUpdateEvent.a2a_app = to_a2a(root_agent, port=8003)
ADK’s to_a2a() uses TaskStatusUpdateEvent with state=working for streaming intermediate text chunks, then emits a TaskArtifactUpdateEvent for the final text. This differs from the Pydantic AI pattern (which uses only status events). Both are valid A2A; clients must handle both patterns.
Use stream_mode="messages" which yields (chunk, metadata) tuples for token-level streaming. Use stream_mode="updates" for node-completion events instead.
For graphs without a messages state key, switch to stream_mode="updates" and iterate for node_name, payload in chunk.items(): to detect completed nodes.
Use intermediate status messages to keep users informed during long-running tasks. Emit an intent acknowledgment before the main agent loop:
# Emit a brief acknowledgment before the agent starts processing.# This confirms to the user that the request was received.ack_message = updater.new_agent_message( parts=[Part(root=TextPart(text="Analysing your request..."))])await updater.update_status( TaskState.working, message=ack_message, final=False)# Then run the main agent loop...
For long database queries, emit named step messages:
for step_name, step_fn in pipeline_steps: await updater.update_status( TaskState.working, message=updater.new_agent_message( [Part(root=TextPart(text=f"Step: {step_name}"))] ), final=False, ) await step_fn()
Artifacts (charts, data files, analysis results) are emitted as TaskArtifactUpdateEvent. For multi-artifact agents (like a text-to-SQL agent that produces intermediate SQL plans followed by query results), artifacts are emitted in a specific order:
Implement cancellation via asyncio task cancellation. The cancel() method updates the task state in the database before the running asyncio task receives the cancellation signal:
class MyAgentExecutor(AgentExecutor): async def cancel( self, context: RequestContext, event_queue: EventQueue ) -> None: task_id = context.task_id # Write canceled state to DB immediately so other replicas see it. # The active asyncio task will receive CancelledError on its next await. await self.database.task_repository.update_state(task_id, "CANCELED") updater = TaskUpdater(event_queue, task_id, context.context_id) cancel_message = updater.new_agent_message( parts=[Part(root=TextPart(text="Response stopped."))], ) await updater.update_status( TaskState.canceled, message=cancel_message, final=True, ) async def execute(self, context, event_queue): try: # ... main agent loop ... except asyncio.CancelledError: # CancelledError propagates from run_stream() when the task is cancelled. # Don't emit any more events — the cancel() method already did. logger.info("Agent execution canceled")
In Kubernetes with multiple agent replicas, the cancel request may hit a different pod than the one running the task. Use a background polling watcher to detect cross-instance cancellation:
async def _cancellation_watcher( self, task_id: str, parent_task: asyncio.Task) -> None: """Poll DB every 5s for cancellation from another instance.""" while True: await asyncio.sleep(5) state = await self.database.task_repository.get_state(task_id) if state == "CANCELED": parent_task.cancel() return
Start the watcher at the beginning of execute() and cancel it in the finally block:
If an SSE connection drops mid-stream, the client sends a tasks/resubscribe request. The DefaultRequestHandler from the a2a library handles this automatically when used with a persistent task store.
from a2a.server.tasks import InMemoryTaskStore # for development# Use a persistent task store for production — survives pod restartstask_store = PersistentTaskStore(database) # your database-backed implementationhandler = DefaultRequestHandler( agent_executor=executor, task_store=task_store,)
InMemoryTaskStore does not survive pod restarts. In production, implement a database-backed TaskStore (e.g., backed by PostgreSQL). If the task store loses state, clients that attempt tasks/resubscribe will receive a TaskNotFoundError.