a2a.server.tasks package¶
Submodules¶
a2a.server.tasks.base_push_notification_sender module¶
- class a2a.server.tasks.base_push_notification_sender.BasePushNotificationSender(httpx_client: AsyncClient, config_store: PushNotificationConfigStore)¶
Bases:
PushNotificationSender
Base implementation of PushNotificationSender interface.
a2a.server.tasks.database_push_notification_config_store module¶
a2a.server.tasks.database_task_store module¶
a2a.server.tasks.inmemory_push_notification_config_store module¶
- class a2a.server.tasks.inmemory_push_notification_config_store.InMemoryPushNotificationConfigStore¶
Bases:
PushNotificationConfigStore
In-memory implementation of PushNotificationConfigStore interface.
Stores push notification configurations in memory
- async delete_info(task_id: str, config_id: str | None = None) None ¶
Deletes the push notification configuration for a task from memory.
- async get_info(task_id: str) list[PushNotificationConfig] ¶
Retrieves the push notification configuration for a task from memory.
- async set_info(task_id: str, notification_config: PushNotificationConfig) None ¶
Sets or updates the push notification configuration for a task in memory.
a2a.server.tasks.inmemory_task_store module¶
a2a.server.tasks.push_notification_config_store module¶
- class a2a.server.tasks.push_notification_config_store.PushNotificationConfigStore¶
Bases:
ABC
Interface for storing and retrieving push notification configurations for tasks.
- abstractmethod async delete_info(task_id: str, config_id: str | None = None) None ¶
Deletes the push notification configuration for a task.
- abstractmethod async get_info(task_id: str) list[PushNotificationConfig] ¶
Retrieves the push notification configuration for a task.
- abstractmethod async set_info(task_id: str, notification_config: PushNotificationConfig) None ¶
Sets or updates the push notification configuration for a task.
a2a.server.tasks.push_notification_sender module¶
a2a.server.tasks.result_aggregator module¶
- class a2a.server.tasks.result_aggregator.ResultAggregator(task_manager: TaskManager)¶
Bases:
object
ResultAggregator is used to process the event streams from an AgentExecutor.
There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated
task as the events arrive, and re-emit those events for another consumer
As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object
As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.
- async consume_all(consumer: EventConsumer) Task | Message | None ¶
Processes the entire event stream from the consumer and returns the final result.
Blocks until the event stream ends (queue is closed after final event or exception).
- Parameters:
consumer – The EventConsumer to read events from.
- Returns:
The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True) tuple[Task | Message | None, bool] ¶
Processes the event stream until completion or an interruptable state is encountered.
If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.
- Parameters:
consumer – The EventConsumer to read events from.
blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.
- Returns:
The current aggregated result (Task or Message) at the point of completion or interruption.
A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).
- Return type:
A tuple containing
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Processes the event stream from the consumer, updates the task state, and re-emits the same events.
Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.
- Parameters:
consumer – The EventConsumer to read events from.
- Yields:
The Event objects consumed from the EventConsumer.
- property current_result: Task | Message | None¶
Returns the current aggregated result (Task or Message).
This is the latest state processed from the event stream.
- Returns:
The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.
a2a.server.tasks.task_manager module¶
- class a2a.server.tasks.task_manager.TaskManager(task_id: str | None, context_id: str | None, task_store: TaskStore, initial_message: Message | None)¶
Bases:
object
Helps manage a task’s lifecycle during execution of a request.
Responsible for retrieving, saving, and updating the Task object based on events received from the agent.
- async ensure_task(event: TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task ¶
Ensures a Task object exists in memory, loading from store or creating new if needed.
- Parameters:
event – The task-related event triggering the need for a Task object.
- Returns:
An existing or newly created Task object.
- async get_task() Task | None ¶
Retrieves the current task object, either from memory or the store.
If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.
- Returns:
The Task object if found, otherwise None.
- async process(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Processes an event, updates the task state if applicable, stores it, and returns the event.
If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.
- Parameters:
event – The event object received from the agent.
- Returns:
The same event object that was processed.
- async save_task_event(event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task | None ¶
Processes a task-related event (Task, Status, Artifact) and saves the updated task state.
Ensures task and context IDs match or are set from the event.
- Parameters:
event – The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).
- Returns:
The updated Task object after processing the event.
- Raises:
ServerError – If the task ID in the event conflicts with the TaskManager’s ID when the TaskManager’s ID is already set.
- update_with_message(message: Message, task: Task) Task ¶
Updates a task object in memory by adding a new message to its history.
If the task has a message in its current status, that message is moved to the history first.
- Parameters:
message – The new Message to add to the history.
task – The Task object to update.
- Returns:
The updated Task object (updated in-place).
a2a.server.tasks.task_store module¶
a2a.server.tasks.task_updater module¶
- class a2a.server.tasks.task_updater.TaskUpdater(event_queue: EventQueue, task_id: str, context_id: str)¶
Bases:
object
Helper class for agents to publish updates to a task’s event queue.
Simplifies the process of creating and enqueueing standard task events.
- async add_artifact(parts: list[Part], artifact_id: str | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, append: bool | None = None, last_chunk: bool | None = None) None ¶
Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.
- Parameters:
parts – A list of Part objects forming the artifact chunk.
artifact_id – The ID of the artifact. A new UUID is generated if not provided.
name – Optional name for the artifact.
metadata – Optional metadata for the artifact.
append – Optional boolean indicating if this chunk appends to a previous one.
last_chunk – Optional boolean indicating if this is the last chunk.
- async cancel(message: Message | None = None) None ¶
Marks the task as cancelled and publishes a finalstatus update.
- async complete(message: Message | None = None) None ¶
Marks the task as completed and publishes a final status update.
- async failed(message: Message | None = None) None ¶
Marks the task as failed and publishes a final status update.
- new_agent_message(parts: list[Part], metadata: dict[str, Any] | None = None) Message ¶
Creates a new message object sent by the agent for this task/context.
- Note: This method only creates the message object. It does not
automatically enqueue it.
- Parameters:
parts – A list of Part objects for the message content.
metadata – Optional metadata for the message.
- Returns:
A new Message object.
- async reject(message: Message | None = None) None ¶
Marks the task as rejected and publishes a final status update.
- async requires_auth(message: Message | None = None, final: bool = False) None ¶
Marks the task as auth required and publishes a status update.
- async requires_input(message: Message | None = None, final: bool = False) None ¶
Marks the task as input required and publishes a status update.
- async start_work(message: Message | None = None) None ¶
Marks the task as working and publishes a status update.
- async submit(message: Message | None = None) None ¶
Marks the task as submitted and publishes a status update.
- async update_status(state: TaskState, message: Message | None = None, final: bool = False, timestamp: str | None = None, metadata: dict[str, Any] | None = None) None ¶
Updates the status of the task and publishes a TaskStatusUpdateEvent.
- Parameters:
state – The new state of the task.
message – An optional message associated with the status update.
final – If True, indicates this is the final status update for the task.
timestamp – Optional ISO 8601 datetime string. Defaults to current time.
metadata – Optional metadata for extensions.
Module contents¶
Components for managing tasks within the A2A server.
- class a2a.server.tasks.BasePushNotificationSender(httpx_client: AsyncClient, config_store: PushNotificationConfigStore)¶
Bases:
PushNotificationSender
Base implementation of PushNotificationSender interface.
- class a2a.server.tasks.DatabasePushNotificationConfigStore(*args, **kwargs)¶
Bases:
object
Placeholder for DatabasePushNotificationConfigStore when dependencies are not installed.
- class a2a.server.tasks.DatabaseTaskStore(*args, **kwargs)¶
Bases:
object
Placeholder for DatabaseTaskStore when dependencies are not installed.
- class a2a.server.tasks.InMemoryPushNotificationConfigStore¶
Bases:
PushNotificationConfigStore
In-memory implementation of PushNotificationConfigStore interface.
Stores push notification configurations in memory
- async delete_info(task_id: str, config_id: str | None = None) None ¶
Deletes the push notification configuration for a task from memory.
- async get_info(task_id: str) list[PushNotificationConfig] ¶
Retrieves the push notification configuration for a task from memory.
- async set_info(task_id: str, notification_config: PushNotificationConfig) None ¶
Sets or updates the push notification configuration for a task in memory.
- class a2a.server.tasks.InMemoryTaskStore¶
Bases:
TaskStore
In-memory implementation of TaskStore.
Stores task objects in a dictionary in memory. Task data is lost when the server process stops.
- async delete(task_id: str) None ¶
Deletes a task from the in-memory store by ID.
- class a2a.server.tasks.PushNotificationConfigStore¶
Bases:
ABC
Interface for storing and retrieving push notification configurations for tasks.
- abstractmethod async delete_info(task_id: str, config_id: str | None = None) None ¶
Deletes the push notification configuration for a task.
- abstractmethod async get_info(task_id: str) list[PushNotificationConfig] ¶
Retrieves the push notification configuration for a task.
- abstractmethod async set_info(task_id: str, notification_config: PushNotificationConfig) None ¶
Sets or updates the push notification configuration for a task.
- class a2a.server.tasks.PushNotificationSender¶
Bases:
ABC
Interface for sending push notifications for tasks.
- class a2a.server.tasks.ResultAggregator(task_manager: TaskManager)¶
Bases:
object
ResultAggregator is used to process the event streams from an AgentExecutor.
There are three main ways to use the ResultAggregator: 1) As part of a processing pipe. consume_and_emit will construct the updated
task as the events arrive, and re-emit those events for another consumer
As part of a blocking call. consume_all will process the entire stream and return the final Task or Message object
As part of a push solution where the latest Task is emitted after processing an event. consume_and_emit_task will consume the Event stream, process the events to the current Task object and emit that Task object.
- async consume_all(consumer: EventConsumer) Task | Message | None ¶
Processes the entire event stream from the consumer and returns the final result.
Blocks until the event stream ends (queue is closed after final event or exception).
- Parameters:
consumer – The EventConsumer to read events from.
- Returns:
The final Task object or Message object after the stream is exhausted. Returns None if the stream ends without producing a final result.
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_break_on_interrupt(consumer: EventConsumer, blocking: bool = True) tuple[Task | Message | None, bool] ¶
Processes the event stream until completion or an interruptable state is encountered.
If blocking is False, it returns after the first event that creates a Task or Message. If blocking is True, it waits for completion unless an auth_required state is encountered, which is always an interruption. If interrupted, consumption continues in a background task.
- Parameters:
consumer – The EventConsumer to read events from.
blocking – If False, the method returns as soon as a task/message is available. If True, it waits for a terminal state.
- Returns:
The current aggregated result (Task or Message) at the point of completion or interruption.
A boolean indicating whether the consumption was interrupted (True) or completed naturally (False).
- Return type:
A tuple containing
- Raises:
BaseException – If the EventConsumer raises an exception during consumption.
- async consume_and_emit(consumer: EventConsumer) AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent] ¶
Processes the event stream from the consumer, updates the task state, and re-emits the same events.
Useful for streaming scenarios where the server needs to observe and process events (e.g., save task state, send push notifications) while forwarding them to the client.
- Parameters:
consumer – The EventConsumer to read events from.
- Yields:
The Event objects consumed from the EventConsumer.
- property current_result: Task | Message | None¶
Returns the current aggregated result (Task or Message).
This is the latest state processed from the event stream.
- Returns:
The current Task object managed by the TaskManager, or the final Message if one was received, or None if no result has been produced yet.
- class a2a.server.tasks.TaskManager(task_id: str | None, context_id: str | None, task_store: TaskStore, initial_message: Message | None)¶
Bases:
object
Helps manage a task’s lifecycle during execution of a request.
Responsible for retrieving, saving, and updating the Task object based on events received from the agent.
- async ensure_task(event: TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task ¶
Ensures a Task object exists in memory, loading from store or creating new if needed.
- Parameters:
event – The task-related event triggering the need for a Task object.
- Returns:
An existing or newly created Task object.
- async get_task() Task | None ¶
Retrieves the current task object, either from memory or the store.
If task_id is set, it first checks the in-memory _current_task, then attempts to load it from the task_store.
- Returns:
The Task object if found, otherwise None.
- async process(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ¶
Processes an event, updates the task state if applicable, stores it, and returns the event.
If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.
- Parameters:
event – The event object received from the agent.
- Returns:
The same event object that was processed.
- async save_task_event(event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task | None ¶
Processes a task-related event (Task, Status, Artifact) and saves the updated task state.
Ensures task and context IDs match or are set from the event.
- Parameters:
event – The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).
- Returns:
The updated Task object after processing the event.
- Raises:
ServerError – If the task ID in the event conflicts with the TaskManager’s ID when the TaskManager’s ID is already set.
- update_with_message(message: Message, task: Task) Task ¶
Updates a task object in memory by adding a new message to its history.
If the task has a message in its current status, that message is moved to the history first.
- Parameters:
message – The new Message to add to the history.
task – The Task object to update.
- Returns:
The updated Task object (updated in-place).
- class a2a.server.tasks.TaskStore¶
Bases:
ABC
Agent Task Store interface.
Defines the methods for persisting and retrieving Task objects.
- abstractmethod async delete(task_id: str) None ¶
Deletes a task from the store by ID.
- class a2a.server.tasks.TaskUpdater(event_queue: EventQueue, task_id: str, context_id: str)¶
Bases:
object
Helper class for agents to publish updates to a task’s event queue.
Simplifies the process of creating and enqueueing standard task events.
- async add_artifact(parts: list[Part], artifact_id: str | None = None, name: str | None = None, metadata: dict[str, Any] | None = None, append: bool | None = None, last_chunk: bool | None = None) None ¶
Adds an artifact chunk to the task and publishes a TaskArtifactUpdateEvent.
- Parameters:
parts – A list of Part objects forming the artifact chunk.
artifact_id – The ID of the artifact. A new UUID is generated if not provided.
name – Optional name for the artifact.
metadata – Optional metadata for the artifact.
append – Optional boolean indicating if this chunk appends to a previous one.
last_chunk – Optional boolean indicating if this is the last chunk.
- async cancel(message: Message | None = None) None ¶
Marks the task as cancelled and publishes a finalstatus update.
- async complete(message: Message | None = None) None ¶
Marks the task as completed and publishes a final status update.
- async failed(message: Message | None = None) None ¶
Marks the task as failed and publishes a final status update.
- new_agent_message(parts: list[Part], metadata: dict[str, Any] | None = None) Message ¶
Creates a new message object sent by the agent for this task/context.
- Note: This method only creates the message object. It does not
automatically enqueue it.
- Parameters:
parts – A list of Part objects for the message content.
metadata – Optional metadata for the message.
- Returns:
A new Message object.
- async reject(message: Message | None = None) None ¶
Marks the task as rejected and publishes a final status update.
- async requires_auth(message: Message | None = None, final: bool = False) None ¶
Marks the task as auth required and publishes a status update.
- async requires_input(message: Message | None = None, final: bool = False) None ¶
Marks the task as input required and publishes a status update.
- async start_work(message: Message | None = None) None ¶
Marks the task as working and publishes a status update.
- async submit(message: Message | None = None) None ¶
Marks the task as submitted and publishes a status update.
- async update_status(state: TaskState, message: Message | None = None, final: bool = False, timestamp: str | None = None, metadata: dict[str, Any] | None = None) None ¶
Updates the status of the task and publishes a TaskStatusUpdateEvent.
- Parameters:
state – The new state of the task.
message – An optional message associated with the status update.
final – If True, indicates this is the final status update for the task.
timestamp – Optional ISO 8601 datetime string. Defaults to current time.
metadata – Optional metadata for extensions.