a2a.server.events package¶
Submodules¶
a2a.server.events.event_consumer module¶
- class a2a.server.events.event_consumer.EventConsumer(queue: EventQueue)¶
Bases:
object
Consumer to read events from the agent event queue.
- agent_task_callback(agent_task: Task[None]) None ¶
Callback to handle exceptions from the agent’s execution task.
If the agent’s asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.
- Parameters:
agent_task – The asyncio.Task that completed.
- consume_all() AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Consume all the generated streaming events from the agent.
This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.
- Yields:
Events dequeued from the queue.
- Raises:
BaseException – If an exception was set by the agent_task_callback.
- async consume_one() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Consume one event from the agent event queue non-blocking.
- Returns:
The next event from the queue.
- Raises:
ServerError – If the queue is empty when attempting to dequeue immediately.
a2a.server.events.event_queue module¶
- a2a.server.events.event_queue.Event = a2a.types.Message | a2a.types.Task | a2a.types.TaskStatusUpdateEvent | a2a.types.TaskArtifactUpdateEvent¶
Type alias for events that can be enqueued.
- class a2a.server.events.event_queue.EventQueue(max_queue_size: int = 1024)¶
Bases:
object
Event queue for A2A responses from agent.
Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.
- async clear_events(clear_child_queues: bool = True) None ¶
Clears all events from the current queue and optionally all child queues.
This method removes all pending events from the queue without processing them. Child queues can be optionally cleared based on the clear_child_queues parameter.
- Parameters:
clear_child_queues – If True (default), clear all child queues as well. If False, only clear the current queue, leaving child queues untouched.
- async close(immediate: bool = False) None ¶
Closes the queue for future push events and also closes all child queues.
Once closed, no new events can be enqueued. For Python 3.13+, this will trigger asyncio.QueueShutDown when the queue is empty and a consumer tries to dequeue. For lower versions, the queue will be marked as closed and optionally cleared.
- Parameters:
immediate (bool) –
True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
- async dequeue_event(no_wait: bool = False) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won’t block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.
- Parameters:
no_wait – If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.
- Returns:
The next event from the queue.
- Raises:
asyncio.QueueEmpty – If no_wait is True and the queue is empty.
asyncio.QueueShutDown – If the queue has been closed and is empty.
- async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None ¶
Enqueues an event to this queue and all its children.
- Parameters:
event – The event object to enqueue.
- is_closed() bool ¶
Checks if the queue is closed.
- tap() EventQueue ¶
Taps the event queue to create a new child queue that receives all future events.
- Returns:
A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.
- task_done() None ¶
Signals that a formerly enqueued task is complete.
Used in conjunction with dequeue_event to track processed items.
a2a.server.events.in_memory_queue_manager module¶
- class a2a.server.events.in_memory_queue_manager.InMemoryQueueManager¶
Bases:
QueueManager
InMemoryQueueManager is used for a single binary management.
This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.
This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.
- async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue for a task ID.
- Raises:
TaskQueueExists – If a queue for the given task_id already exists.
- async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- Raises:
NoTaskQueue – If no queue exists for the given task_id.
- async create_or_tap(task_id: str) EventQueue ¶
Creates a new event queue for a task ID if one doesn’t exist, otherwise taps the existing one.
- Returns:
A new or child EventQueue instance for the task_id.
- async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- Returns:
The EventQueue instance for the task_id, or None if not found.
- async tap(task_id: str) EventQueue | None ¶
Taps the event queue for a task ID to create a child queue.
- Returns:
A new child EventQueue instance, or None if the task ID is not found.
a2a.server.events.queue_manager module¶
- exception a2a.server.events.queue_manager.NoTaskQueue¶
Bases:
Exception
Exception raised when attempting to access or close a queue for a task ID that does not exist.
- class a2a.server.events.queue_manager.QueueManager¶
Bases:
ABC
Interface for managing the event queue lifecycles per task.
- abstractmethod async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue associated with a task ID.
- abstractmethod async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- abstractmethod async create_or_tap(task_id: str) EventQueue ¶
Creates a queue if one doesn’t exist, otherwise taps the existing one.
- abstractmethod async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- abstractmethod async tap(task_id: str) EventQueue | None ¶
Creates a child event queue (tap) for an existing task ID.
- exception a2a.server.events.queue_manager.TaskQueueExists¶
Bases:
Exception
Exception raised when attempting to add a queue for a task ID that already exists.
Module contents¶
Event handling components for the A2A server.
- class a2a.server.events.EventConsumer(queue: EventQueue)¶
Bases:
object
Consumer to read events from the agent event queue.
- agent_task_callback(agent_task: Task[None]) None ¶
Callback to handle exceptions from the agent’s execution task.
If the agent’s asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.
- Parameters:
agent_task – The asyncio.Task that completed.
- consume_all() AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Consume all the generated streaming events from the agent.
This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.
- Yields:
Events dequeued from the queue.
- Raises:
BaseException – If an exception was set by the agent_task_callback.
- async consume_one() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Consume one event from the agent event queue non-blocking.
- Returns:
The next event from the queue.
- Raises:
ServerError – If the queue is empty when attempting to dequeue immediately.
- class a2a.server.events.EventQueue(max_queue_size: int = 1024)¶
Bases:
object
Event queue for A2A responses from agent.
Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.
- async clear_events(clear_child_queues: bool = True) None ¶
Clears all events from the current queue and optionally all child queues.
This method removes all pending events from the queue without processing them. Child queues can be optionally cleared based on the clear_child_queues parameter.
- Parameters:
clear_child_queues – If True (default), clear all child queues as well. If False, only clear the current queue, leaving child queues untouched.
- async close(immediate: bool = False) None ¶
Closes the queue for future push events and also closes all child queues.
Once closed, no new events can be enqueued. For Python 3.13+, this will trigger asyncio.QueueShutDown when the queue is empty and a consumer tries to dequeue. For lower versions, the queue will be marked as closed and optionally cleared.
- Parameters:
immediate (bool) –
True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
- async dequeue_event(no_wait: bool = False) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won’t block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.
- Parameters:
no_wait – If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.
- Returns:
The next event from the queue.
- Raises:
asyncio.QueueEmpty – If no_wait is True and the queue is empty.
asyncio.QueueShutDown – If the queue has been closed and is empty.
- async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None ¶
Enqueues an event to this queue and all its children.
- Parameters:
event – The event object to enqueue.
- is_closed() bool ¶
Checks if the queue is closed.
- tap() EventQueue ¶
Taps the event queue to create a new child queue that receives all future events.
- Returns:
A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.
- task_done() None ¶
Signals that a formerly enqueued task is complete.
Used in conjunction with dequeue_event to track processed items.
- class a2a.server.events.InMemoryQueueManager¶
Bases:
QueueManager
InMemoryQueueManager is used for a single binary management.
This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.
This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.
- async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue for a task ID.
- Raises:
TaskQueueExists – If a queue for the given task_id already exists.
- async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- Raises:
NoTaskQueue – If no queue exists for the given task_id.
- async create_or_tap(task_id: str) EventQueue ¶
Creates a new event queue for a task ID if one doesn’t exist, otherwise taps the existing one.
- Returns:
A new or child EventQueue instance for the task_id.
- async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- Returns:
The EventQueue instance for the task_id, or None if not found.
- async tap(task_id: str) EventQueue | None ¶
Taps the event queue for a task ID to create a child queue.
- Returns:
A new child EventQueue instance, or None if the task ID is not found.
- exception a2a.server.events.NoTaskQueue¶
Bases:
Exception
Exception raised when attempting to access or close a queue for a task ID that does not exist.
- class a2a.server.events.QueueManager¶
Bases:
ABC
Interface for managing the event queue lifecycles per task.
- abstractmethod async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue associated with a task ID.
- abstractmethod async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- abstractmethod async create_or_tap(task_id: str) EventQueue ¶
Creates a queue if one doesn’t exist, otherwise taps the existing one.
- abstractmethod async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- abstractmethod async tap(task_id: str) EventQueue | None ¶
Creates a child event queue (tap) for an existing task ID.
- exception a2a.server.events.TaskQueueExists¶
Bases:
Exception
Exception raised when attempting to add a queue for a task ID that already exists.