a2a.client package

Subpackages

Submodules

a2a.client.base_client module

class a2a.client.base_client.BaseClient(card: AgentCard, config: ClientConfig, transport: ClientTransport, consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]], middleware: list[ClientCallInterceptor])

Bases: Client

Base implementation of the A2A client, containing transport-independent logic.

async cancel_task(request: TaskIdParams, *, context: ClientCallContext | None = None) Task

Requests the agent to cancel a specific task.

Parameters:
  • request – The TaskIdParams object specifying the task ID.

  • context – The client call context.

Returns:

A Task object containing the updated task status.

async close() None

Closes the underlying transport.

async get_card(*, context: ClientCallContext | None = None) AgentCard

Retrieves the agent’s card.

This will fetch the authenticated card if necessary and update the client’s internal state with the new card.

Parameters:

context – The client call context.

Returns:

The AgentCard for the agent.

async get_task(request: TaskQueryParams, *, context: ClientCallContext | None = None) Task

Retrieves the current state and history of a specific task.

Parameters:
  • request – The TaskQueryParams object specifying the task ID.

  • context – The client call context.

Returns:

A Task object representing the current state of the task.

async get_task_callback(request: GetTaskPushNotificationConfigParams, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Retrieves the push notification configuration for a specific task.

Parameters:
  • request – The GetTaskPushNotificationConfigParams object specifying the task.

  • context – The client call context.

Returns:

A TaskPushNotificationConfig object containing the configuration.

async resubscribe(request: TaskIdParams, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None]]

Resubscribes to a task’s event stream.

This is only available if both the client and server support streaming.

Parameters:
  • request – Parameters to identify the task to resubscribe to.

  • context – The client call context.

Yields:

An async iterator of ClientEvent objects.

Raises:

NotImplementedError – If streaming is not supported by the client or server.

async send_message(request: Message, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message]

Sends a message to the agent.

This method handles both streaming and non-streaming (polling) interactions based on the client configuration and agent capabilities. It will yield events as they are received from the agent.

Parameters:
  • request – The message to send to the agent.

  • context – The client call context.

Yields:

An async iterator of ClientEvent or a final Message response.

async set_task_callback(request: TaskPushNotificationConfig, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Sets or updates the push notification configuration for a specific task.

Parameters:
  • request – The TaskPushNotificationConfig object with the new configuration.

  • context – The client call context.

Returns:

The created or updated TaskPushNotificationConfig object.

a2a.client.card_resolver module

class a2a.client.card_resolver.A2ACardResolver(httpx_client: AsyncClient, base_url: str, agent_card_path: str = '/.well-known/agent-card.json')

Bases: object

Agent Card resolver.

async get_agent_card(relative_card_path: str | None = None, http_kwargs: dict[str, Any] | None = None) AgentCard

Fetches an agent card from a specified path relative to the base_url.

If relative_card_path is None, it defaults to the resolver’s configured agent_card_path (for the public agent card).

Parameters:
  • relative_card_path – Optional path to the agent card endpoint, relative to the base URL. If None, uses the default public agent card path.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.get request.

Returns:

An AgentCard object representing the agent’s capabilities.

Raises:
  • A2AClientHTTPError – If an HTTP error occurs during the request.

  • A2AClientJSONError – If the response body cannot be decoded as JSON or validated against the AgentCard schema.

a2a.client.client module

class a2a.client.client.Client(consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None, middleware: list[ClientCallInterceptor] | None = None)

Bases: ABC

Abstract base class defining the interface for an A2A client.

This class provides a standard set of methods for interacting with an A2A agent, regardless of the underlying transport protocol (e.g., gRPC, JSON-RPC). It supports sending messages, managing tasks, and handling event streams.

async add_event_consumer(consumer: Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]) None

Attaches additional consumers to the Client.

async add_request_middleware(middleware: ClientCallInterceptor) None

Attaches additional middleware to the Client.

abstractmethod async cancel_task(request: TaskIdParams, *, context: ClientCallContext | None = None) Task

Requests the agent to cancel a specific task.

async consume(event: tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message | None, card: AgentCard) None

Processes the event via all the registered `Consumer`s.

abstractmethod async get_card(*, context: ClientCallContext | None = None) AgentCard

Retrieves the agent’s card.

abstractmethod async get_task(request: TaskQueryParams, *, context: ClientCallContext | None = None) Task

Retrieves the current state and history of a specific task.

abstractmethod async get_task_callback(request: GetTaskPushNotificationConfigParams, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Retrieves the push notification configuration for a specific task.

abstractmethod async resubscribe(request: TaskIdParams, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None]]

Resubscribes to a task’s event stream.

abstractmethod async send_message(request: Message, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message]

Sends a message to the server.

This will automatically use the streaming or non-streaming approach as supported by the server and the client config. Client will aggregate update events and return an iterator of (Task,`Update`) pairs, or a Message. Client will also send these values to any configured `Consumer`s in the client.

abstractmethod async set_task_callback(request: TaskPushNotificationConfig, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Sets or updates the push notification configuration for a specific task.

class a2a.client.client.ClientConfig(streaming: bool = True, polling: bool = False, httpx_client: ~httpx.AsyncClient | None = None, grpc_channel_factory: ~collections.abc.Callable[[str], None] | None = None, supported_transports: list[~a2a.types.TransportProtocol | str] = <factory>, use_client_preference: bool = False, accepted_output_modes: list[str] = <factory>, push_notification_configs: list[~a2a.types.PushNotificationConfig] = <factory>)

Bases: object

Configuration class for the A2AClient Factory.

accepted_output_modes: list[str]

The set of accepted output modes for the client.

grpc_channel_factory: Callable[[str], None] | None = None

Generates a grpc connection channel for a given url.

httpx_client: AsyncClient | None = None

Http client to use to connect to agent.

polling: bool = False

send. It is the callers job to check if the response is completed and if not run a polling loop.

Type:

Whether client prefers to poll for updates from message

push_notification_configs: list[PushNotificationConfig]

Push notification callbacks to use for every request.

streaming: bool = True

Whether client supports streaming

supported_transports: list[TransportProtocol | str]

Ordered list of transports for connecting to agent (in order of preference). Empty implies JSONRPC only.

This is a string type to allow custom transports to exist in closed ecosystems.

use_client_preference: bool = False

Whether to use client transport preferences over server preferences. Recommended to use server preferences in most situations.

a2a.client.client_factory module

class a2a.client.client_factory.ClientFactory(config: ClientConfig, consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None)

Bases: object

ClientFactory is used to generate the appropriate client for the agent.

The factory is configured with a ClientConfig and optionally a list of `Consumer`s to use for all generated `Client`s. The expected use is:

factory = ClientFactory(config, consumers) # Optionally register custom client implementations factory.register(‘my_customer_transport’, NewCustomTransportClient) # Then with an agent card make a client with additional consumers and # interceptors client = factory.create(card, additional_consumers, interceptors) # Now the client can be used the same regardless of transport and # aligns client config with server capabilities.

create(card: AgentCard, consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None, interceptors: list[ClientCallInterceptor] | None = None) Client

Create a new Client for the provided AgentCard.

Parameters:
  • card – An AgentCard defining the characteristics of the agent.

  • consumers – A list of Consumer methods to pass responses to.

  • interceptors – A list of interceptors to use for each request. These are used for things like attaching credentials or http headers to all outbound requests.

Returns:

A Client object.

Raises:
  • If there is no valid matching of the client configuration with the

  • server configuration, a ValueError is raised.

register(label: str, generator: Callable[[AgentCard, str, ClientConfig, list[ClientCallInterceptor]], ClientTransport]) None

Register a new transport producer for a given transport label.

a2a.client.client_factory.minimal_agent_card(url: str, transports: list[str] | None = None) AgentCard

Generates a minimal card to simplify bootstrapping client creation.

This minimal card is not viable itself to interact with the remote agent. Instead this is a short hand way to take a known url and transport option and interact with the get card endpoint of the agent server to get the correct agent card. This pattern is necessary for gRPC based card access as typically these servers won’t expose a well known path card.

a2a.client.client_task_manager module

class a2a.client.client_task_manager.ClientTaskManager

Bases: object

Helps manage a task’s lifecycle during execution of a request.

Responsible for retrieving, saving, and updating the Task object based on events received from the agent.

get_task() Task | None

Retrieves the current task object, either from memory.

If task_id is set, it returns _current_task otherwise None.

Returns:

The Task object if found, otherwise None.

get_task_or_raise() Task

Retrieves the current task object.

Returns:

The Task object.

Raises:

A2AClientInvalidStateError – If there is no current known Task.

async process(event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent

Processes an event, updates the task state if applicable, stores it, and returns the event.

If the event is task-related (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent), the internal task state is updated and persisted.

Parameters:

event – The event object received from the agent.

Returns:

The same event object that was processed.

async save_task_event(event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent) Task | None

Processes a task-related event (Task, Status, Artifact) and saves the updated task state.

Ensures task and context IDs match or are set from the event.

Parameters:

event – The task-related event (Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent).

Returns:

The updated Task object after processing the event.

Raises:

ClientError – If the task ID in the event conflicts with the TaskManager’s ID when the TaskManager’s ID is already set.

update_with_message(message: Message, task: Task) Task

Updates a task object adding a new message to its history.

If the task has a message in its current status, that message is moved to the history first.

Parameters:
  • message – The new Message to add to the history.

  • task – The Task object to update.

Returns:

The updated Task object (updated in-place).

a2a.client.errors module

Custom exceptions for the A2A client.

exception a2a.client.errors.A2AClientError

Bases: Exception

Base exception for A2A Client errors.

exception a2a.client.errors.A2AClientHTTPError(status_code: int, message: str)

Bases: A2AClientError

Client exception for HTTP errors received from the server.

exception a2a.client.errors.A2AClientInvalidArgsError(message: str)

Bases: A2AClientError

Client exception for invalid arguments passed to a method.

exception a2a.client.errors.A2AClientInvalidStateError(message: str)

Bases: A2AClientError

Client exception for an invalid client state.

exception a2a.client.errors.A2AClientJSONError(message: str)

Bases: A2AClientError

Client exception for JSON errors during response parsing or validation.

exception a2a.client.errors.A2AClientJSONRPCError(error: JSONRPCErrorResponse)

Bases: A2AClientError

Client exception for JSON-RPC errors returned by the server.

exception a2a.client.errors.A2AClientTimeoutError(message: str)

Bases: A2AClientError

Client exception for timeout errors during a request.

a2a.client.helpers module

Helper functions for the A2A client.

a2a.client.helpers.create_text_message_object(role: Role = Role.user, content: str = '') Message

Create a Message object containing a single TextPart.

Parameters:
  • role – The role of the message sender (user or agent). Defaults to Role.user.

  • content – The text content of the message. Defaults to an empty string.

Returns:

A Message object with a new UUID message_id.

a2a.client.legacy module

Backwards compatibility layer for legacy A2A clients.

class a2a.client.legacy.A2AClient(httpx_client: AsyncClient, agent_card: AgentCard | None = None, url: str | None = None, interceptors: list[ClientCallInterceptor] | None = None)

Bases: object

[DEPRECATED] Backwards compatibility wrapper for the JSON-RPC client.

async cancel_task(request: CancelTaskRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) CancelTaskResponse

Requests the agent to cancel a specific task.

Parameters:
  • request – The CancelTaskRequest object specifying the task ID.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A CancelTaskResponse object containing the updated Task with canceled status or an error.

Raises:
async get_card(*, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AgentCard

Retrieves the authenticated card (if necessary) or the public one.

Parameters:
  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A AgentCard object containing the card or an error.

Raises:
async get_task(request: GetTaskRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) GetTaskResponse

Retrieves the current state and history of a specific task.

Parameters:
  • request – The GetTaskRequest object specifying the task ID and history length.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A GetTaskResponse object containing the Task or an error.

Raises:
async get_task_callback(request: GetTaskPushNotificationConfigRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) GetTaskPushNotificationConfigResponse

Retrieves the push notification configuration for a specific task.

Parameters:
  • request – The GetTaskPushNotificationConfigRequest object specifying the task ID.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A GetTaskPushNotificationConfigResponse object containing the configuration or an error.

Raises:
async resubscribe(request: TaskResubscriptionRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AsyncGenerator[SendStreamingMessageResponse, None]

Reconnects to get task updates.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:
  • request – The TaskResubscriptionRequest object containing the task information to reconnect to.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

  • context – The client call context.

Yields:

SendStreamingMessageResponse objects as they are received in the SSE stream. These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:
async send_message(request: SendMessageRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) SendMessageResponse

Sends a non-streaming message request to the agent.

Parameters:
  • request – The SendMessageRequest object containing the message and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A SendMessageResponse object containing the agent’s response (Task or Message) or an error.

Raises:
async send_message_streaming(request: SendStreamingMessageRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AsyncGenerator[SendStreamingMessageResponse, None]

Sends a streaming message request to the agent and yields responses as they arrive.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:
  • request – The SendStreamingMessageRequest object containing the message and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

  • context – The client call context.

Yields:

SendStreamingMessageResponse objects as they are received in the SSE stream. These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:
async set_task_callback(request: SetTaskPushNotificationConfigRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) SetTaskPushNotificationConfigResponse

Sets or updates the push notification configuration for a specific task.

Parameters:
  • request – The SetTaskPushNotificationConfigRequest object specifying the task ID and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A SetTaskPushNotificationConfigResponse object containing the confirmation or an error.

Raises:

a2a.client.legacy_grpc module

a2a.client.middleware module

class a2a.client.middleware.ClientCallContext(*, state: ~collections.abc.MutableMapping[str, ~typing.Any] = <factory>)

Bases: BaseModel

A context passed with each client call, allowing for call-specific.

configuration and data passing. Such as authentication details or request deadlines.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

state: MutableMapping[str, Any]
class a2a.client.middleware.ClientCallInterceptor

Bases: ABC

An abstract base class for client-side call interceptors.

Interceptors can inspect and modify requests before they are sent, which is ideal for concerns like authentication, logging, or tracing.

abstractmethod async intercept(method_name: str, request_payload: dict[str, Any], http_kwargs: dict[str, Any], agent_card: AgentCard | None, context: ClientCallContext | None) tuple[dict[str, Any], dict[str, Any]]

Intercepts a client call before the request is sent.

Parameters:
  • method_name – The name of the RPC method (e.g., ‘message/send’).

  • request_payload – The JSON RPC request payload dictionary.

  • http_kwargs – The keyword arguments for the httpx request.

  • agent_card – The AgentCard associated with the client.

  • context – The ClientCallContext for this specific call.

Returns:

A tuple containing the (potentially modified) request_payload and http_kwargs.

a2a.client.optionals module

Module contents

Client-side components for interacting with an A2A agent.

class a2a.client.A2ACardResolver(httpx_client: AsyncClient, base_url: str, agent_card_path: str = '/.well-known/agent-card.json')

Bases: object

Agent Card resolver.

async get_agent_card(relative_card_path: str | None = None, http_kwargs: dict[str, Any] | None = None) AgentCard

Fetches an agent card from a specified path relative to the base_url.

If relative_card_path is None, it defaults to the resolver’s configured agent_card_path (for the public agent card).

Parameters:
  • relative_card_path – Optional path to the agent card endpoint, relative to the base URL. If None, uses the default public agent card path.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.get request.

Returns:

An AgentCard object representing the agent’s capabilities.

Raises:
  • A2AClientHTTPError – If an HTTP error occurs during the request.

  • A2AClientJSONError – If the response body cannot be decoded as JSON or validated against the AgentCard schema.

class a2a.client.A2AClient(httpx_client: AsyncClient, agent_card: AgentCard | None = None, url: str | None = None, interceptors: list[ClientCallInterceptor] | None = None)

Bases: object

[DEPRECATED] Backwards compatibility wrapper for the JSON-RPC client.

async cancel_task(request: CancelTaskRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) CancelTaskResponse

Requests the agent to cancel a specific task.

Parameters:
  • request – The CancelTaskRequest object specifying the task ID.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A CancelTaskResponse object containing the updated Task with canceled status or an error.

Raises:
async get_card(*, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AgentCard

Retrieves the authenticated card (if necessary) or the public one.

Parameters:
  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A AgentCard object containing the card or an error.

Raises:
async get_task(request: GetTaskRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) GetTaskResponse

Retrieves the current state and history of a specific task.

Parameters:
  • request – The GetTaskRequest object specifying the task ID and history length.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A GetTaskResponse object containing the Task or an error.

Raises:
async get_task_callback(request: GetTaskPushNotificationConfigRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) GetTaskPushNotificationConfigResponse

Retrieves the push notification configuration for a specific task.

Parameters:
  • request – The GetTaskPushNotificationConfigRequest object specifying the task ID.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A GetTaskPushNotificationConfigResponse object containing the configuration or an error.

Raises:
async resubscribe(request: TaskResubscriptionRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AsyncGenerator[SendStreamingMessageResponse, None]

Reconnects to get task updates.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:
  • request – The TaskResubscriptionRequest object containing the task information to reconnect to.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

  • context – The client call context.

Yields:

SendStreamingMessageResponse objects as they are received in the SSE stream. These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:
async send_message(request: SendMessageRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) SendMessageResponse

Sends a non-streaming message request to the agent.

Parameters:
  • request – The SendMessageRequest object containing the message and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A SendMessageResponse object containing the agent’s response (Task or Message) or an error.

Raises:
async send_message_streaming(request: SendStreamingMessageRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) AsyncGenerator[SendStreamingMessageResponse, None]

Sends a streaming message request to the agent and yields responses as they arrive.

This method uses Server-Sent Events (SSE) to receive a stream of updates from the agent.

Parameters:
  • request – The SendStreamingMessageRequest object containing the message and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request. A default timeout=None is set but can be overridden.

  • context – The client call context.

Yields:

SendStreamingMessageResponse objects as they are received in the SSE stream. These can be Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent.

Raises:
async set_task_callback(request: SetTaskPushNotificationConfigRequest, *, http_kwargs: dict[str, Any] | None = None, context: ClientCallContext | None = None) SetTaskPushNotificationConfigResponse

Sets or updates the push notification configuration for a specific task.

Parameters:
  • request – The SetTaskPushNotificationConfigRequest object specifying the task ID and configuration.

  • http_kwargs – Optional dictionary of keyword arguments to pass to the underlying httpx.post request.

  • context – The client call context.

Returns:

A SetTaskPushNotificationConfigResponse object containing the confirmation or an error.

Raises:
exception a2a.client.A2AClientError

Bases: Exception

Base exception for A2A Client errors.

exception a2a.client.A2AClientHTTPError(status_code: int, message: str)

Bases: A2AClientError

Client exception for HTTP errors received from the server.

exception a2a.client.A2AClientJSONError(message: str)

Bases: A2AClientError

Client exception for JSON errors during response parsing or validation.

exception a2a.client.A2AClientTimeoutError(message: str)

Bases: A2AClientError

Client exception for timeout errors during a request.

class a2a.client.A2AGrpcClient(*args, **kwargs)

Bases: object

Placeholder for A2AGrpcClient when dependencies are not installed.

class a2a.client.AuthInterceptor(credential_service: CredentialService)

Bases: ClientCallInterceptor

An interceptor that automatically adds authentication details to requests.

Based on the agent’s security schemes.

async intercept(method_name: str, request_payload: dict[str, Any], http_kwargs: dict[str, Any], agent_card: AgentCard | None, context: ClientCallContext | None) tuple[dict[str, Any], dict[str, Any]]

Applies authentication headers to the request if credentials are available.

class a2a.client.Client(consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None, middleware: list[ClientCallInterceptor] | None = None)

Bases: ABC

Abstract base class defining the interface for an A2A client.

This class provides a standard set of methods for interacting with an A2A agent, regardless of the underlying transport protocol (e.g., gRPC, JSON-RPC). It supports sending messages, managing tasks, and handling event streams.

async add_event_consumer(consumer: Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]) None

Attaches additional consumers to the Client.

async add_request_middleware(middleware: ClientCallInterceptor) None

Attaches additional middleware to the Client.

abstractmethod async cancel_task(request: TaskIdParams, *, context: ClientCallContext | None = None) Task

Requests the agent to cancel a specific task.

async consume(event: tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message | None, card: AgentCard) None

Processes the event via all the registered `Consumer`s.

abstractmethod async get_card(*, context: ClientCallContext | None = None) AgentCard

Retrieves the agent’s card.

abstractmethod async get_task(request: TaskQueryParams, *, context: ClientCallContext | None = None) Task

Retrieves the current state and history of a specific task.

abstractmethod async get_task_callback(request: GetTaskPushNotificationConfigParams, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Retrieves the push notification configuration for a specific task.

abstractmethod async resubscribe(request: TaskIdParams, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None]]

Resubscribes to a task’s event stream.

abstractmethod async send_message(request: Message, *, context: ClientCallContext | None = None) AsyncIterator[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message]

Sends a message to the server.

This will automatically use the streaming or non-streaming approach as supported by the server and the client config. Client will aggregate update events and return an iterator of (Task,`Update`) pairs, or a Message. Client will also send these values to any configured `Consumer`s in the client.

abstractmethod async set_task_callback(request: TaskPushNotificationConfig, *, context: ClientCallContext | None = None) TaskPushNotificationConfig

Sets or updates the push notification configuration for a specific task.

class a2a.client.ClientCallContext(*, state: ~collections.abc.MutableMapping[str, ~typing.Any] = <factory>)

Bases: BaseModel

A context passed with each client call, allowing for call-specific.

configuration and data passing. Such as authentication details or request deadlines.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

state: MutableMapping[str, Any]
class a2a.client.ClientCallInterceptor

Bases: ABC

An abstract base class for client-side call interceptors.

Interceptors can inspect and modify requests before they are sent, which is ideal for concerns like authentication, logging, or tracing.

abstractmethod async intercept(method_name: str, request_payload: dict[str, Any], http_kwargs: dict[str, Any], agent_card: AgentCard | None, context: ClientCallContext | None) tuple[dict[str, Any], dict[str, Any]]

Intercepts a client call before the request is sent.

Parameters:
  • method_name – The name of the RPC method (e.g., ‘message/send’).

  • request_payload – The JSON RPC request payload dictionary.

  • http_kwargs – The keyword arguments for the httpx request.

  • agent_card – The AgentCard associated with the client.

  • context – The ClientCallContext for this specific call.

Returns:

A tuple containing the (potentially modified) request_payload and http_kwargs.

class a2a.client.ClientConfig(streaming: bool = True, polling: bool = False, httpx_client: ~httpx.AsyncClient | None = None, grpc_channel_factory: ~collections.abc.Callable[[str], None] | None = None, supported_transports: list[~a2a.types.TransportProtocol | str] = <factory>, use_client_preference: bool = False, accepted_output_modes: list[str] = <factory>, push_notification_configs: list[~a2a.types.PushNotificationConfig] = <factory>)

Bases: object

Configuration class for the A2AClient Factory.

accepted_output_modes: list[str]

The set of accepted output modes for the client.

grpc_channel_factory: Callable[[str], None] | None = None

Generates a grpc connection channel for a given url.

httpx_client: AsyncClient | None = None

Http client to use to connect to agent.

polling: bool = False

send. It is the callers job to check if the response is completed and if not run a polling loop.

Type:

Whether client prefers to poll for updates from message

push_notification_configs: list[PushNotificationConfig]

Push notification callbacks to use for every request.

streaming: bool = True

Whether client supports streaming

supported_transports: list[TransportProtocol | str]

Ordered list of transports for connecting to agent (in order of preference). Empty implies JSONRPC only.

This is a string type to allow custom transports to exist in closed ecosystems.

use_client_preference: bool = False

Whether to use client transport preferences over server preferences. Recommended to use server preferences in most situations.

class a2a.client.ClientFactory(config: ClientConfig, consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None)

Bases: object

ClientFactory is used to generate the appropriate client for the agent.

The factory is configured with a ClientConfig and optionally a list of `Consumer`s to use for all generated `Client`s. The expected use is:

factory = ClientFactory(config, consumers) # Optionally register custom client implementations factory.register(‘my_customer_transport’, NewCustomTransportClient) # Then with an agent card make a client with additional consumers and # interceptors client = factory.create(card, additional_consumers, interceptors) # Now the client can be used the same regardless of transport and # aligns client config with server capabilities.

create(card: AgentCard, consumers: list[Callable[[tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message, AgentCard], Coroutine[None, Any, Any]]] | None = None, interceptors: list[ClientCallInterceptor] | None = None) Client

Create a new Client for the provided AgentCard.

Parameters:
  • card – An AgentCard defining the characteristics of the agent.

  • consumers – A list of Consumer methods to pass responses to.

  • interceptors – A list of interceptors to use for each request. These are used for things like attaching credentials or http headers to all outbound requests.

Returns:

A Client object.

Raises:
  • If there is no valid matching of the client configuration with the

  • server configuration, a ValueError is raised.

register(label: str, generator: Callable[[AgentCard, str, ClientConfig, list[ClientCallInterceptor]], ClientTransport]) None

Register a new transport producer for a given transport label.

class a2a.client.CredentialService

Bases: ABC

An abstract service for retrieving credentials.

abstractmethod async get_credentials(security_scheme_name: str, context: ClientCallContext | None) str | None

Retrieves a credential (e.g., token) for a security scheme.

class a2a.client.InMemoryContextCredentialStore

Bases: CredentialService

A simple in-memory store for session-keyed credentials.

This class uses the ‘sessionId’ from the ClientCallContext state to store and retrieve credentials…

async get_credentials(security_scheme_name: str, context: ClientCallContext | None) str | None

Retrieves credentials from the in-memory store.

Parameters:
  • security_scheme_name – The name of the security scheme.

  • context – The client call context.

Returns:

The credential string, or None if not found.

async set_credentials(session_id: str, security_scheme_name: str, credential: str) None

Method to populate the store.

a2a.client.create_text_message_object(role: Role = Role.user, content: str = '') Message

Create a Message object containing a single TextPart.

Parameters:
  • role – The role of the message sender (user or agent). Defaults to Role.user.

  • content – The text content of the message. Defaults to an empty string.

Returns:

A Message object with a new UUID message_id.

a2a.client.minimal_agent_card(url: str, transports: list[str] | None = None) AgentCard

Generates a minimal card to simplify bootstrapping client creation.

This minimal card is not viable itself to interact with the remote agent. Instead this is a short hand way to take a known url and transport option and interact with the get card endpoint of the agent server to get the correct agent card. This pattern is necessary for gRPC based card access as typically these servers won’t expose a well known path card.