a2a.server.tasks.result_aggregator module¶
- class a2a.server.tasks.result_aggregator.ResultAggregator(task_manager: TaskManager)¶
Bases:
objectResultAggregator is used to process the event streams from an AgentExecutor.
There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated
task as the events arrive, and re-emit those events for another consumer
As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object
As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.
- async consume_all(consumer: EventConsumer) Task | Message | None¶
Processes the entire event stream from the consumer and returns the final result.
Blocks until the event stream ends (queue is closed after final event or exception).
- Parameters:
consumer – The EventConsumer to read events from.
- Returns:
The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True, event_callback: Callable[[], Awaitable[None]] | None = None) tuple[Task | Message | None, bool]¶
Processes the event stream until completion or an interruptable state is encountered.
If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.
- Parameters:
consumer – The EventConsumer to read events from.
blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.
event_callback – Optional async callback function to be called after each event is processed in the background continuation. Mainly used for push notifications currently.
- Returns:
The current aggregated result (Task or Message) at the point of completion or interruption.
A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).
- Return type:
A tuple containing
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]¶
Processes the event stream from the consumer, updates the task state, and re-emits the same events.
Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.
- Parameters:
consumer – The EventConsumer to read events from.
- Yields:
The Event objects consumed from the EventConsumer.
- property current_result: Task | Message | None¶
Returns the current aggregated result (Task or Message).
This is the latest state processed from the event stream.
- Returns:
The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.