a2a.server.tasks package

Submodules

a2a.server.tasks.base_push_notification_sender module

class a2a.server.tasks.base_push_notification_sender.BasePushNotificationSender(httpx_client: AsyncClient, config_store: PushNotificationConfigStore)

Bases: PushNotificationSender

Base implementation of PushNotificationSender interface.

async send_notification(task: Task) None

Sends a push notification for a task if configuration exists.

a2a.server.tasks.database_push_notification_config_store module

a2a.server.tasks.database_task_store module

a2a.server.tasks.inmemory_push_notification_config_store module

class a2a.server.tasks.inmemory_push_notification_config_store.InMemoryPushNotificationConfigStore

Bases: PushNotificationConfigStore

In-memory implementation of PushNotificationConfigStore interface.

Stores push notification configurations in memory

async delete_info(task_id: str, config_id: str | None = None) None

Deletes the push notification configuration for a task from memory.

async get_info(task_id: str) list[PushNotificationConfig]

Retrieves the push notification configuration for a task from memory.

async set_info(task_id: str, notification_config: PushNotificationConfig) None

Sets or updates the push notification configuration for a task in memory.

a2a.server.tasks.inmemory_task_store module

class a2a.server.tasks.inmemory_task_store.InMemoryTaskStore

Bases: TaskStore

In-memory implementation of TaskStore.

Stores task objects in a dictionary in memory. Task data is lost when the server process stops.

async delete(task_id: str) None

Deletes a task from the in-memory store by ID.

async get(task_id: str) Task | None

Retrieves a task from the in-memory store by ID.

async save(task: Task) None

Saves or updates a task in the in-memory store.

a2a.server.tasks.push_notification_config_store module

class a2a.server.tasks.push_notification_config_store.PushNotificationConfigStore

Bases: ABC

Interface for storing and retrieving push notification configurations for tasks.

abstractmethod async delete_info(task_id: str, config_id: str | None = None) None

Deletes the push notification configuration for a task.

abstractmethod async get_info(task_id: str) list[PushNotificationConfig]

Retrieves the push notification configuration for a task.

abstractmethod async set_info(task_id: str, notification_config: PushNotificationConfig) None

Sets or updates the push notification configuration for a task.

a2a.server.tasks.push_notification_sender module

class a2a.server.tasks.push_notification_sender.PushNotificationSender

Bases: ABC

Interface for sending push notifications for tasks.

abstractmethod async send_notification(task: Task) None

Sends a push notification containing the latest task state.

a2a.server.tasks.result_aggregator module

class a2a.server.tasks.result_aggregator.ResultAggregator(task_manager: TaskManager)

Bases: object

ResultAggregator is used to process the event streams from an AgentExecutor.

There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated

task as the events arrive, and re-emit those events for another consumer

  1. As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object

  2. As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.

async consume_all(consumer: EventConsumer) Task | Message | None

Processes the entire event stream from the consumer and returns the final result.

Blocks until the event stream ends (queue is closed after final event or exception).

Parameters:

consumer – The EventConsumer to read events from.

Returns:

The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True) tuple[Task | Message | None, bool]

Processes the event stream until completion or an interruptable state is encountered.

If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.

Parameters:
  • consumer – The EventConsumer to read events from.

  • blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.

Returns:

  • The current aggregated result (Task or Message) at the point of completion or interruption.

  • A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).

Return type:

A tuple containing

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Processes the event stream from the consumer, updates the task state, and re-emits the same events.

Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.

Parameters:

consumer – The EventConsumer to read events from.

Yields:

The Event objects consumed from the EventConsumer.

property current_result: Task | Message | None

Returns the current aggregated result (Task or Message).

This is the latest state processed from the event stream.

Returns:

The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.

a2a.server.tasks.task_manager module

class a2a.server.tasks.task_manager.TaskManager(task_id: str | None, context_id: str | None, task_store: TaskStore, initial_message: Message | None)

Bases: object

Helps manage a task’s lifecycle during execution of a request.

Responsible for retrieving, saving, and updating the Task object based on events received from the agent.

async ensure_task(event: TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task

Ensures a Task object exists in memory, loading from store or creating new if needed.

Parameters:

event – The task-related event triggering the need for a Task object.

Returns:

An existing or newly created Task object.

async get_task() Task | None

Retrieves the current task object, either from memory or the store.

If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.

Returns:

The Task object if found, otherwise None.

async process(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

Processes an event, updates the task state if applicable, stores it, and returns the event.

If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.

Parameters:

event – The event object received from the agent.

Returns:

The same event object that was processed.

async save_task_event(event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task | None

Processes a task-related event (Task, Status, Artifact) and saves the updated task state.

Ensures task and context IDs match or are set from the event.

Parameters:

event – The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).

Returns:

The updated Task object after processing the event.

Raises:

ServerError – If the task ID in the event conflicts with the TaskManager’s ID when the TaskManager’s ID is already set.

update_with_message(message: Message, task: Task) Task

Updates a task object in memory by adding a new message to its history.

If the task has a message in its current status, that message is moved to the history first.

Parameters:
  • message – The new Message to add to the history.

  • task – The Task object to update.

Returns:

The updated Task object (updated in-place).

a2a.server.tasks.task_store module

class a2a.server.tasks.task_store.TaskStore

Bases: ABC

Agent Task Store interface.

Defines the methods for persisting and retrieving Task objects.

abstractmethod async delete(task_id: str) None

Deletes a task from the store by ID.

abstractmethod async get(task_id: str) Task | None

Retrieves a task from the store by ID.

abstractmethod async save(task: Task) None

Saves or updates a task in the store.

a2a.server.tasks.task_updater module

class a2a.server.tasks.task_updater.TaskUpdater(event_queue: EventQueue, task_id: str, context_id: str)

Bases: object

Helper class for agents to publish updates to a task’s event queue.

Simplifies the process of creating and enqueueing standard task events.

async add_artifact(parts: list[Part], artifact_id: str | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, append: bool | None = None, last_chunk: bool | None = None) None

Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.

Parameters:
  • parts – A list of Part objects forming the artifact chunk.

  • artifact_id – The ID of the artifact. A new UUID is generated if not provided.

  • name – Optional name for the artifact.

  • metadata – Optional metadata for the artifact.

  • append – Optional boolean indicating if this chunk appends to a previous one.

  • last_chunk – Optional boolean indicating if this is the last chunk.

async cancel(message: Message | None = None) None

Marks the task as cancelled and publishes a finalstatus update.

async complete(message: Message | None = None) None

Marks the task as completed and publishes a final status update.

async failed(message: Message | None = None) None

Marks the task as failed and publishes a final status update.

new_agent_message(parts: list[Part], metadata: dict[str, Any] | None = None) Message

Creates a new message object sent by the agent for this task/context.

Note: This method only creates the message object. It does not

automatically enqueue it.

Parameters:
  • parts – A list of Part objects for the message content.

  • metadata – Optional metadata for the message.

Returns:

A new Message object.

async reject(message: Message | None = None) None

Marks the task as rejected and publishes a final status update.

async requires_auth(message: Message | None = None, final: bool = False) None

Marks the task as auth required and publishes a status update.

async requires_input(message: Message | None = None, final: bool = False) None

Marks the task as input required and publishes a status update.

async start_work(message: Message | None = None) None

Marks the task as working and publishes a status update.

async submit(message: Message | None = None) None

Marks the task as submitted and publishes a status update.

async update_status(state: TaskState, message: Message | None = None, final: bool = False, timestamp: str | None = None, metadata: dict[str, Any] | None = None) None

Updates the status of the task and publishes a TaskStatusUpdateEvent.

Parameters:
  • state – The new state of the task.

  • message – An optional message associated with the status update.

  • final – If True, indicates this is the final status update for the task.

  • timestamp – Optional ISO 8601 datetime string. Defaults to current time.

  • metadata – Optional metadata for extensions.

Module contents

Components for managing tasks within the A2A server.

class a2a.server.tasks.BasePushNotificationSender(httpx_client: AsyncClient, config_store: PushNotificationConfigStore)

Bases: PushNotificationSender

Base implementation of PushNotificationSender interface.

async send_notification(task: Task) None

Sends a push notification for a task if configuration exists.

class a2a.server.tasks.DatabasePushNotificationConfigStore(*args, **kwargs)

Bases: object

Placeholder for DatabasePushNotificationConfigStore when dependencies are not installed.

class a2a.server.tasks.DatabaseTaskStore(*args, **kwargs)

Bases: object

Placeholder for DatabaseTaskStore when dependencies are not installed.

class a2a.server.tasks.InMemoryPushNotificationConfigStore

Bases: PushNotificationConfigStore

In-memory implementation of PushNotificationConfigStore interface.

Stores push notification configurations in memory

async delete_info(task_id: str, config_id: str | None = None) None

Deletes the push notification configuration for a task from memory.

async get_info(task_id: str) list[PushNotificationConfig]

Retrieves the push notification configuration for a task from memory.

async set_info(task_id: str, notification_config: PushNotificationConfig) None

Sets or updates the push notification configuration for a task in memory.

class a2a.server.tasks.InMemoryTaskStore

Bases: TaskStore

In-memory implementation of TaskStore.

Stores task objects in a dictionary in memory. Task data is lost when the server process stops.

async delete(task_id: str) None

Deletes a task from the in-memory store by ID.

async get(task_id: str) Task | None

Retrieves a task from the in-memory store by ID.

async save(task: Task) None

Saves or updates a task in the in-memory store.

class a2a.server.tasks.PushNotificationConfigStore

Bases: ABC

Interface for storing and retrieving push notification configurations for tasks.

abstractmethod async delete_info(task_id: str, config_id: str | None = None) None

Deletes the push notification configuration for a task.

abstractmethod async get_info(task_id: str) list[PushNotificationConfig]

Retrieves the push notification configuration for a task.

abstractmethod async set_info(task_id: str, notification_config: PushNotificationConfig) None

Sets or updates the push notification configuration for a task.

class a2a.server.tasks.PushNotificationSender

Bases: ABC

Interface for sending push notifications for tasks.

abstractmethod async send_notification(task: Task) None

Sends a push notification containing the latest task state.

class a2a.server.tasks.ResultAggregator(task_manager: TaskManager)

Bases: object

ResultAggregator is used to process the event streams from an AgentExecutor.

There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated

task as the events arrive, and re-emit those events for another consumer

  1. As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object

  2. As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.

async consume_all(consumer: EventConsumer) Task | Message | None

Processes the entire event stream from the consumer and returns the final result.

Blocks until the event stream ends (queue is closed after final event or exception).

Parameters:

consumer – The EventConsumer to read events from.

Returns:

The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True) tuple[Task | Message | None, bool]

Processes the event stream until completion or an interruptable state is encountered.

If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.

Parameters:
  • consumer – The EventConsumer to read events from.

  • blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.

Returns:

  • The current aggregated result (Task or Message) at the point of completion or interruption.

  • A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).

Return type:

A tuple containing

Raises:

BaseException – If the EventConsumer raises an exception during consumption.

async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]

Processes the event stream from the consumer, updates the task state, and re-emits the same events.

Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.

Parameters:

consumer – The EventConsumer to read events from.

Yields:

The Event objects consumed from the EventConsumer.

property current_result: Task | Message | None

Returns the current aggregated result (Task or Message).

This is the latest state processed from the event stream.

Returns:

The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.

class a2a.server.tasks.TaskManager(task_id: str | None, context_id: str | None, task_store: TaskStore, initial_message: Message | None)

Bases: object

Helps manage a task’s lifecycle during execution of a request.

Responsible for retrieving, saving, and updating the Task object based on events received from the agent.

async ensure_task(event: TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task

Ensures a Task object exists in memory, loading from store or creating new if needed.

Parameters:

event – The task-related event triggering the need for a Task object.

Returns:

An existing or newly created Task object.

async get_task() Task | None

Retrieves the current task object, either from memory or the store.

If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.

Returns:

The Task object if found, otherwise None.

async process(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

Processes an event, updates the task state if applicable, stores it, and returns the event.

If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.

Parameters:

event – The event object received from the agent.

Returns:

The same event object that was processed.

async save_task_event(event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task | None

Processes a task-related event (Task, Status, Artifact) and saves the updated task state.

Ensures task and context IDs match or are set from the event.

Parameters:

event – The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).

Returns:

The updated Task object after processing the event.

Raises:

ServerError – If the task ID in the event conflicts with the TaskManager’s ID when the TaskManager’s ID is already set.

update_with_message(message: Message, task: Task) Task

Updates a task object in memory by adding a new message to its history.

If the task has a message in its current status, that message is moved to the history first.

Parameters:
  • message – The new Message to add to the history.

  • task – The Task object to update.

Returns:

The updated Task object (updated in-place).

class a2a.server.tasks.TaskStore

Bases: ABC

Agent Task Store interface.

Defines the methods for persisting and retrieving Task objects.

abstractmethod async delete(task_id: str) None

Deletes a task from the store by ID.

abstractmethod async get(task_id: str) Task | None

Retrieves a task from the store by ID.

abstractmethod async save(task: Task) None

Saves or updates a task in the store.

class a2a.server.tasks.TaskUpdater(event_queue: EventQueue, task_id: str, context_id: str)

Bases: object

Helper class for agents to publish updates to a task’s event queue.

Simplifies the process of creating and enqueueing standard task events.

async add_artifact(parts: list[Part], artifact_id: str | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, append: bool | None = None, last_chunk: bool | None = None) None

Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.

Parameters:
  • parts – A list of Part objects forming the artifact chunk.

  • artifact_id – The ID of the artifact. A new UUID is generated if not provided.

  • name – Optional name for the artifact.

  • metadata – Optional metadata for the artifact.

  • append – Optional boolean indicating if this chunk appends to a previous one.

  • last_chunk – Optional boolean indicating if this is the last chunk.

async cancel(message: Message | None = None) None

Marks the task as cancelled and publishes a finalstatus update.

async complete(message: Message | None = None) None

Marks the task as completed and publishes a final status update.

async failed(message: Message | None = None) None

Marks the task as failed and publishes a final status update.

new_agent_message(parts: list[Part], metadata: dict[str, Any] | None = None) Message

Creates a new message object sent by the agent for this task/context.

Note: This method only creates the message object. It does not

automatically enqueue it.

Parameters:
  • parts – A list of Part objects for the message content.

  • metadata – Optional metadata for the message.

Returns:

A new Message object.

async reject(message: Message | None = None) None

Marks the task as rejected and publishes a final status update.

async requires_auth(message: Message | None = None, final: bool = False) None

Marks the task as auth required and publishes a status update.

async requires_input(message: Message | None = None, final: bool = False) None

Marks the task as input required and publishes a status update.

async start_work(message: Message | None = None) None

Marks the task as working and publishes a status update.

async submit(message: Message | None = None) None

Marks the task as submitted and publishes a status update.

async update_status(state: TaskState, message: Message | None = None, final: bool = False, timestamp: str | None = None, metadata: dict[str, Any] | None = None) None

Updates the status of the task and publishes a TaskStatusUpdateEvent.

Parameters:
  • state – The new state of the task.

  • message – An optional message associated with the status update.

  • final – If True, indicates this is the final status update for the task.

  • timestamp – Optional ISO 8601 datetime string. Defaults to current time.

  • metadata – Optional metadata for extensions.