a2a.utils.helpers module¶
General utility functions for the A2A Python SDK.
- a2a.utils.helpers.append_artifact_to_task(task: Task, event: TaskArtifactUpdateEvent) None¶
Helper method for updating a Task object with new artifact data from an event.
Handles creating the artifacts list if it doesn’t exist, adding new artifacts, and appending parts to existing artifacts based on the append flag in the event.
- Parameters:
task – The Task object to modify.
event – The TaskArtifactUpdateEvent containing the artifact data.
- a2a.utils.helpers.are_modalities_compatible(server_output_modes: list[str] | None, client_output_modes: list[str] | None) bool¶
Checks if server and client output modalities (MIME types) are compatible.
Modalities are compatible if: 1. The client specifies no preferred output modes (client_output_modes is None or empty). 2. The server specifies no supported output modes (server_output_modes is None or empty). 3. There is at least one common modality between the server’s supported list and the client’s preferred list.
- Parameters:
server_output_modes – A list of MIME types supported by the server/agent for output. Can be None or empty if the server doesn’t specify.
client_output_modes – A list of MIME types preferred by the client for output. Can be None or empty if the client accepts any.
- Returns:
True if the modalities are compatible, False otherwise.
- a2a.utils.helpers.build_text_artifact(text: str, artifact_id: str) Artifact¶
Helper to create a text artifact.
- Parameters:
text – The text content for the artifact.
artifact_id – The ID for the artifact.
- Returns:
An Artifact object containing a single TextPart.
- a2a.utils.helpers.canonicalize_agent_card(agent_card: AgentCard) str¶
Canonicalizes the Agent Card JSON according to RFC 8785 (JCS).
- a2a.utils.helpers.create_task_obj(message_send_params: MessageSendParams) Task¶
Create a new task object from message send params.
Generates UUIDs for task and context IDs if they are not already present in the message.
- Parameters:
message_send_params – The MessageSendParams object containing the initial message.
- Returns:
A new Task object initialized with ‘submitted’ status and the input message in history.
- async a2a.utils.helpers.maybe_await(value: T | Awaitable[T]) T¶
Awaits a value if it’s awaitable, otherwise simply provides it back.
- a2a.utils.helpers.validate(expression: Callable[[Any], bool], error_message: str | None = None) Callable¶
Decorator that validates if a given expression evaluates to True.
Typically used on class methods to check capabilities or configuration before executing the method’s logic. If the expression is False, a ServerError with an UnsupportedOperationError is raised.
- Parameters:
expression – A callable that takes the instance (self) as its argument and returns a boolean.
error_message – An optional custom error message for the UnsupportedOperationError. If None, the string representation of the expression will be used.
Examples
Demonstrating with an async method: >>> import asyncio >>> from a2a.utils.errors import ServerError >>> >>> class MyAgent: … def __init__(self, streaming_enabled: bool): … self.streaming_enabled = streaming_enabled … … @validate( … lambda self: self.streaming_enabled, … ‘Streaming is not enabled for this agent’, … ) … async def stream_response(self, message: str): … return f’Streaming: {message}’ >>> >>> async def run_async_test(): … # Successful call … agent_ok = MyAgent(streaming_enabled=True) … result = await agent_ok.stream_response(‘hello’) … print(result) … … # Call that fails validation … agent_fail = MyAgent(streaming_enabled=False) … try: … await agent_fail.stream_response(‘world’) … except ServerError as e: … print(e.error.message) >>> >>> asyncio.run(run_async_test()) Streaming: hello Streaming is not enabled for this agent
Demonstrating with a sync method: >>> class SecureAgent: … def __init__(self): … self.auth_enabled = False … … @validate( … lambda self: self.auth_enabled, … ‘Authentication must be enabled for this operation’, … ) … def secure_operation(self, data: str): … return f’Processing secure data: {data}’ >>> >>> # Error case example >>> agent = SecureAgent() >>> try: … agent.secure_operation(‘secret’) … except ServerError as e: … print(e.error.message) Authentication must be enabled for this operation
Note
This decorator works with both sync and async methods automatically.
- a2a.utils.helpers.validate_async_generator(expression: Callable[[Any], bool], error_message: str | None = None)¶
Decorator that validates if a given expression evaluates to True for async generators.
Typically used on class methods to check capabilities or configuration before executing the method’s logic. If the expression is False, a ServerError with an UnsupportedOperationError is raised.
- Parameters:
expression – A callable that takes the instance (self) as its argument and returns a boolean.
error_message – An optional custom error message for the UnsupportedOperationError. If None, the string representation of the expression will be used.
Examples
Streaming capability validation with success case: >>> import asyncio >>> from a2a.utils.errors import ServerError >>> >>> class StreamingAgent: … def __init__(self, streaming_enabled: bool): … self.streaming_enabled = streaming_enabled … … @validate_async_generator( … lambda self: self.streaming_enabled, … ‘Streaming is not supported by this agent’, … ) … async def stream_messages(self, count: int): … for i in range(count): … yield f’Message {i}’ >>> >>> async def run_streaming_test(): … # Successful streaming … agent = StreamingAgent(streaming_enabled=True) … async for msg in agent.stream_messages(2): … print(msg) >>> >>> asyncio.run(run_streaming_test()) Message 0 Message 1
Error case - validation fails: >>> class FeatureAgent: … def __init__(self): … self.features = {‘real_time’: False} … … @validate_async_generator( … lambda self: self.features.get(‘real_time’, False), … ‘Real-time feature must be enabled to stream updates’, … ) … async def real_time_updates(self): … yield ‘This should not be yielded’ >>> >>> async def run_error_test(): … agent = FeatureAgent() … try: … async for _ in agent.real_time_updates(): … pass … except ServerError as e: … print(e.error.message) >>> >>> asyncio.run(run_error_test()) Real-time feature must be enabled to stream updates
Note
This decorator is specifically for async generator methods (async def with yield). The validation happens before the generator starts yielding values.