a2a.server.events.event_queue module¶
- a2a.server.events.event_queue.Event = a2a_pb2.Message | a2a_pb2.Task | a2a_pb2.TaskStatusUpdateEvent | a2a_pb2.TaskArtifactUpdateEvent¶
Type alias for events that can be enqueued.
- class a2a.server.events.event_queue.EventQueue(*args: Any, **kwargs: Any)¶
Bases:
ABCProducer-side interface passed to AgentExecutor.execute/cancel.
Exposes only enqueue_event. The consumer is framework-managed and not part of the public surface.
Default request handlers construct the queue and pass it in; executors should accept it and not construct one. To run an executor outside the framework, write a custom subclass or use EventQueueLegacy (deprecated, will be removed in a future release).
- abstractmethod async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None¶
Pushes an event into the queue.
Only main queue can enqueue events. Child queues can only dequeue events.
- class a2a.server.events.event_queue.EventQueueLegacy(*args: Any, **kwargs: Any)¶
Bases:
EventQueueEvent queue for A2A responses from agent.
Acts as a buffer between the agent’s asynchronous execution and the server’s response handling (e.g., streaming via SSE). Supports tapping to create child queues that receive the same events.
- async close(immediate: bool = False) None¶
Closes the queue for future push events and also closes all child queues.
- Parameters:
immediate – If True, immediately flushes the queue, discarding all pending events, and causes any currently blocked dequeue_event calls to raise QueueShutDown. If False (default), the queue is marked as closed to new events, but existing events can still be dequeued and processed until the queue is fully drained.
- async dequeue_event() Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent¶
Dequeues an event from the queue.
This implementation expects that dequeue to raise an exception when the queue has been closed. In python 3.13+ this is naturally provided by the QueueShutDown exception generated when the queue has closed and the user is awaiting the queue.get method. Python<=3.12 this needs to manage this lifecycle itself. The current implementation can lead to blocking if the dequeue_event is called before the EventQueue has been closed but when there are no events on the queue. One way to avoid this is to use an async Task management solution to cancel the get task if the queue has closed or some other condition is met. The implementation of the EventConsumer uses an async.wait with a timeout to abort the dequeue_event call and retry, when it will return with a closed error.
- Returns:
The next event from the queue.
- Raises:
asyncio.QueueShutDown – If the queue has been closed and is empty.
- async enqueue_event(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) None¶
Enqueues an event to this queue and all its children.
- Parameters:
event – The event object to enqueue.
- is_closed() bool¶
Checks if the queue is closed.
- property queue: Queue[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]¶
[DEPRECATED] Returns the underlying asyncio.Queue.
- async tap(max_queue_size: int = 1024) EventQueueLegacy¶
Taps the event queue to create a new child queue that receives future events.
- Returns:
A new EventQueue instance that will receive all events enqueued to this parent queue from this point forward.
- task_done() None¶
Signals that a formerly enqueued task is complete.
Used in conjunction with dequeue_event to track processed items.