a2a.server.tasks.result_aggregator module

class a2a.server.tasks.result_aggregator.ResultAggregator(task_manager: TaskManager)

Bases: object

ResultAggregator is used to process the event streams from an AgentExecutor.

There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated

task as the events arrive, and re-emit those events for another consumer

  1. As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object

  2. As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.

async consume_all(consumer: EventConsumer) Task | Message | None

Processes the entire event stream from the consumer and returns the final result.

Blocks until the event stream ends (queue is closed after final event or exception).

Parameters:

consumer – The EventConsumer to read events from.

Returns:

The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True, event_callback: Callable[[], Awaitable[None]] | None = None) tuple[Task | Message | None, bool]

Processes the event stream until completion or an interruptable state is encountered.

If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.

Parameters:
  • consumer – The EventConsumer to read events from.

  • blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.

  • event_callback – Optional async callback function to be called after each event is processed in the background continuation. Mainly used for push notifications currently.

Returns:

  • The current aggregated result (Task or Message) at the point of completion or interruption.

  • A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).

Return type:

A tuple containing

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Processes the event stream from the consumer, updates the task state, and re-emits the same events.

Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.

Parameters:

consumer – The EventConsumer to read events from.

Yields:

The Event objects consumed from the EventConsumer.

property current_result: Task | Message | None

Returns the current aggregated result (Task or Message).

This is the latest state processed from the event stream.

Returns:

The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.