a2a.server.events package¶
Submodules¶
a2a.server.events.event_consumer module¶
- class a2a.server.events.event_consumer.EventConsumer(queue: EventQueue)¶
Bases:
object
Consumer to read events from the agent event queue.
- agent_task_callback(agent_task: Task[None]) None ¶
Callback to handle exceptions from the agent’s execution task.
If the agent’s asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.
- Parameters:
agent_task – The asyncio.Task that completed.
- consume_all() AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Consume all the generated streaming events from the agent.
This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.
- Yields:
Events dequeued from the queue.
- Raises:
BaseException – If an exception was set by the agent_task_callback.
- async consume_one() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Consume one event from the agent event queue non-blocking.
- Returns:
The next event from the queue.
- Raises:
ServerError – If the queue is empty when attempting to dequeue immediately.
a2a.server.events.event_queue module¶
- a2a.server.events.event_queue.Event = a2a.types.Message | a2a.types.Task | a2a.types.TaskStatusUpdateEvent | a2a.types.TaskArtifactUpdateEvent¶
Type alias for events that can be enqueued.
- class a2a.server.events.event_queue.EventQueue(max_queue_size: int = 1024)¶
Bases:
object
Event queue for A2A responses from agent.
Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.
- async close() None ¶
Closes the queue for future push events.
Once closed, dequeue_event will eventually raise asyncio.QueueShutDown when the queue is empty. Also closes all child queues.
- async dequeue_event(no_wait: bool = False) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won’t block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.
- Parameters:
no_wait – If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.
- Returns:
The next event from the queue.
- Raises:
asyncio.QueueEmpty – If no_wait is True and the queue is empty.
asyncio.QueueShutDown – If the queue has been closed and is empty.
- async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None ¶
Enqueues an event to this queue and all its children.
- Parameters:
event – The event object to enqueue.
- is_closed() bool ¶
Checks if the queue is closed.
- tap() EventQueue ¶
Taps the event queue to create a new child queue that receives all future events.
- Returns:
A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.
- task_done() None ¶
Signals that a formerly enqueued task is complete.
Used in conjunction with dequeue_event to track processed items.
a2a.server.events.in_memory_queue_manager module¶
- class a2a.server.events.in_memory_queue_manager.InMemoryQueueManager¶
Bases:
QueueManager
InMemoryQueueManager is used for a single binary management.
This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.
This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.
- async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue for a task ID.
- Raises:
TaskQueueExists – If a queue for the given task_id already exists.
- async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- Raises:
NoTaskQueue – If no queue exists for the given task_id.
- async create_or_tap(task_id: str) EventQueue ¶
Creates a new event queue for a task ID if one doesn’t exist, otherwise taps the existing one.
- Returns:
A new or child EventQueue instance for the task_id.
- async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- Returns:
The EventQueue instance for the task_id, or None if not found.
- async tap(task_id: str) EventQueue | None ¶
Taps the event queue for a task ID to create a child queue.
- Returns:
A new child EventQueue instance, or None if the task ID is not found.
a2a.server.events.queue_manager module¶
- exception a2a.server.events.queue_manager.NoTaskQueue¶
Bases:
Exception
Exception raised when attempting to access or close a queue for a task ID that does not exist.
- class a2a.server.events.queue_manager.QueueManager¶
Bases:
ABC
Interface for managing the event queue lifecycles per task.
- abstractmethod async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue associated with a task ID.
- abstractmethod async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- abstractmethod async create_or_tap(task_id: str) EventQueue ¶
Creates a queue if one doesn’t exist, otherwise taps the existing one.
- abstractmethod async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- abstractmethod async tap(task_id: str) EventQueue | None ¶
Creates a child event queue (tap) for an existing task ID.
- exception a2a.server.events.queue_manager.TaskQueueExists¶
Bases:
Exception
Exception raised when attempting to add a queue for a task ID that already exists.
Module contents¶
Event handling components for the A2A server.
- class a2a.server.events.EventConsumer(queue: EventQueue)¶
Bases:
object
Consumer to read events from the agent event queue.
- agent_task_callback(agent_task: Task[None]) None ¶
Callback to handle exceptions from the agent’s execution task.
If the agent’s asyncio task raises an exception, this callback is invoked, and the exception is stored to be re-raised by the consumer loop.
- Parameters:
agent_task – The asyncio.Task that completed.
- consume_all() AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Consume all the generated streaming events from the agent.
This method yields events as they become available from the queue until a final event is received or the queue is closed. It also monitors for exceptions set by the agent_task_callback.
- Yields:
Events dequeued from the queue.
- Raises:
BaseException – If an exception was set by the agent_task_callback.
- async consume_one() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Consume one event from the agent event queue non-blocking.
- Returns:
The next event from the queue.
- Raises:
ServerError – If the queue is empty when attempting to dequeue immediately.
- class a2a.server.events.EventQueue(max_queue_size: int = 1024)¶
Bases:
object
Event queue for A2A responses from agent.
Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.
- async close() None ¶
Closes the queue for future push events.
Once closed, dequeue_event will eventually raise asyncio.QueueShutDown when the queue is empty. Also closes all child queues.
- async dequeue_event(no_wait: bool = False) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. Two ways to avoid this are to call this with no_wait = True which won’t block, but is the callers responsibility to retry as appropriate. Alternatively, one can use a async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.
- Parameters:
no_wait – If True, retrieve an event immediately or raise asyncio.QueueEmpty. If False (default), wait until an event is available.
- Returns:
The next event from the queue.
- Raises:
asyncio.QueueEmpty – If no_wait is True and the queue is empty.
asyncio.QueueShutDown – If the queue has been closed and is empty.
- async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None ¶
Enqueues an event to this queue and all its children.
- Parameters:
event – The event object to enqueue.
- is_closed() bool ¶
Checks if the queue is closed.
- tap() EventQueue ¶
Taps the event queue to create a new child queue that receives all future events.
- Returns:
A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.
- task_done() None ¶
Signals that a formerly enqueued task is complete.
Used in conjunction with dequeue_event to track processed items.
- class a2a.server.events.InMemoryQueueManager¶
Bases:
QueueManager
InMemoryQueueManager is used for a single binary management.
This implements the QueueManager interface using in-memory storage for event queues. It requires all incoming interactions for a given task ID to hit the same binary instance.
This implementation is suitable for single-instance deployments but needs a distributed approach for scalable deployments.
- async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue for a task ID.
- Raises:
TaskQueueExists – If a queue for the given task_id already exists.
- async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- Raises:
NoTaskQueue – If no queue exists for the given task_id.
- async create_or_tap(task_id: str) EventQueue ¶
Creates a new event queue for a task ID if one doesn’t exist, otherwise taps the existing one.
- Returns:
A new or child EventQueue instance for the task_id.
- async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- Returns:
The EventQueue instance for the task_id, or None if not found.
- async tap(task_id: str) EventQueue | None ¶
Taps the event queue for a task ID to create a child queue.
- Returns:
A new child EventQueue instance, or None if the task ID is not found.
- exception a2a.server.events.NoTaskQueue¶
Bases:
Exception
Exception raised when attempting to access or close a queue for a task ID that does not exist.
- class a2a.server.events.QueueManager¶
Bases:
ABC
Interface for managing the event queue lifecycles per task.
- abstractmethod async add(task_id: str, queue: EventQueue) None ¶
Adds a new event queue associated with a task ID.
- abstractmethod async close(task_id: str) None ¶
Closes and removes the event queue for a task ID.
- abstractmethod async create_or_tap(task_id: str) EventQueue ¶
Creates a queue if one doesn’t exist, otherwise taps the existing one.
- abstractmethod async get(task_id: str) EventQueue | None ¶
Retrieves the event queue for a task ID.
- abstractmethod async tap(task_id: str) EventQueue | None ¶
Creates a child event queue (tap) for an existing task ID.
- exception a2a.server.events.TaskQueueExists¶
Bases:
Exception
Exception raised when attempting to add a queue for a task ID that already exists.