a2a.server.events package

Submodules

Module contents

Event handling components for the A2A server.

class a2a.server.events.EventConsumer(queue: EventQueueLegacy)

Bases: object

Consumer to read events from the agent event queue.

agent_task_callback(agent_task: Task[None]) None

Callback to handle exceptions from the agent’s execution task.

If the agent’s asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.

Parameters:

agent_task – The asyncio.Task that completed.

consume_all() AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Consume all the generated streaming events from the agent.

This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.

Yields:

Events dequeued from the queue.

Raises:

BaseException – If an exception was set by the agent_task_callback.

class a2a.server.events.EventQueue(*args: Any, **kwargs: Any)

Bases: ABC

Base class and factory for EventQueueSource.

EventQueue provides an abstraction for a queue of events that can be tapped by multiple consumers. EventQueue maintain main queue and source and maintain child queues in sync. GUARANTEE: All sinks (including the default one) will receive events in the exact same order.

WARNING (Concurrency): All events from all sinks (both the default queue and any tapped child queues) must be regularly consumed and marked as done. If any single consumer stops processing and its queue reaches capacity, it can block the event dispatcher and stall the entire system, causing a widespread deadlock.

WARNING (Memory Leak): Event queues spawn background tasks. To prevent memory and task leaks, all queue objects (both source and sinks) MUST be explicitly closed via await queue.close() or by using the async context manager (async with queue:). Child queues are automatically closed when parent queue is closed, but you should still close them explicitly to prevent queues from reaching capacity by unconsumed events.

Typical usage: queue = EventQueue() child_queue1 = await queue.tap() child_queue2 = await queue.tap()

async for event in child_queue1:

do_some_work(event) child_queue1.task_done()

abstractmethod async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None

Pushes an event into the queue.

Only main queue can enqueue events. Child queues can only dequeue events.

class a2a.server.events.EventQueueLegacy(*args: Any, **kwargs: Any)

Bases: EventQueue

Event queue for A2A responses from agent.

Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.

async close(immediate: bool = False) None

Closes the queue for future push events and also closes all child queues.

Parameters:

immediate – If True, immediately flushes the queue, discarding all pending events, and causes any currently blocked dequeue_event calls to raise QueueShutDown. If False (default), the queue is marked as closed to new events, but existing events can still be dequeued and processed until the queue is fully drained.

async dequeue_event() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

Dequeues an event from the queue.

This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. One way to avoid this is to use an async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.

Returns:

The next event from the queue.

Raises:

asyncio.QueueShutDown – If the queue has been closed and is empty.

async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None

Enqueues an event to this queue and all its children.

Parameters:

event – The event object to enqueue.

is_closed() bool

Checks if the queue is closed.

property queue: Queue[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

[DEPRECATED] Returns the underlying asyncio.Queue.

async tap(max_queue_size: int = 1024) EventQueueLegacy

Taps the event queue to create a new child queue that receives future events.

Returns:

A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.

task_done() None

Signals that a formerly enqueued task is complete.

Used in conjunction with dequeue_event to track processed items.

class a2a.server.events.InMemoryQueueManager

Bases: QueueManager

InMemoryQueueManager is used for a single binary management.

This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.

This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.

async add(task_id: str, queue: EventQueueLegacy) None

Adds a new event queue for a task ID.

Raises:

TaskQueueExists – If a queue for the given task_id already exists.

async close(task_id: str) None

Closes and removes the event queue for a task ID.

Raises:

NoTaskQueue – If no queue exists for the given task_id.

async create_or_tap(task_id: str) EventQueueLegacy

Creates a new event queue for a task ID if one doesn’t exist, otherwise taps the existing one.

Returns:

A new or child EventQueueLegacy instance for the task_id.

async get(task_id: str) EventQueueLegacy | None

Retrieves the event queue for a task ID.

Returns:

The EventQueueLegacy instance for the task_id, or None if not found.

async tap(task_id: str) EventQueueLegacy | None

Taps the event queue for a task ID to create a child queue.

Returns:

A new child EventQueueLegacy instance, or None if the task ID is not found.

exception a2a.server.events.NoTaskQueue

Bases: Exception

Exception raised when attempting to access or close a queue for a task ID that does not exist.

class a2a.server.events.QueueManager

Bases: ABC

Interface for managing the event queue lifecycles per task.

abstractmethod async add(task_id: str, queue: EventQueueLegacy) None

Adds a new event queue associated with a task ID.

abstractmethod async close(task_id: str) None

Closes and removes the event queue for a task ID.

abstractmethod async create_or_tap(task_id: str) EventQueueLegacy

Creates a queue if one doesn’t exist, otherwise taps the existing one.

abstractmethod async get(task_id: str) EventQueueLegacy | None

Retrieves the event queue for a task ID.

abstractmethod async tap(task_id: str) EventQueueLegacy | None

Creates a child event queue (tap) for an existing task ID.

exception a2a.server.events.TaskQueueExists

Bases: Exception

Exception raised when attempting to add a queue for a task ID that already exists.