4. The Agent Executor¶
The core logic of how an A2A agent processes requests and generates responses/events is handled by an Agent Executor. The A2A Python SDK provides an abstract base class a2a.server.agent_execution.AgentExecutor that you implement.
AgentExecutor Interface¶
The AgentExecutor class defines two primary methods:
async def execute(self, context: RequestContext, event_queue: EventQueue): Handles incoming requests that expect a response or a stream of events. It processes the user's input (available viacontext) and uses theevent_queueto send backMessage,Task,TaskStatusUpdateEvent, orTaskArtifactUpdateEventobjects.async def cancel(self, context: RequestContext, event_queue: EventQueue): Handles requests to cancel an ongoing task.
The RequestContext provides information about the incoming request, such as the user's message and any existing task details. The EventQueue is used by the executor to send events back to the client.
Helloworld Agent Executor¶
Let's look at agent_executor.py. It defines HelloWorldAgentExecutor.
-
The Agent (
HelloWorldAgent): This is a simple helper class that encapsulates the actual "business logic".class HelloWorldAgent: """Hello World Agent.""" async def invoke(self) -> str: """Invoke the Hello World agent to generate a response.""" return 'Hello, World!'It has a simple
invokemethod that returns the string "Hello, World!". -
The Executor (
HelloWorldAgentExecutor): This class implements theAgentExecutorinterface.-
__init__:class HelloWorldAgentExecutor(AgentExecutor): """Test AgentProxy Implementation.""" def __init__(self) -> None: self.agent = HelloWorldAgent()It instantiates the
HelloWorldAgent. -
execute:async def execute( self, context: RequestContext, event_queue: EventQueue, ) -> None: """Execute the agent process and enqueue the final response.""" task = context.current_task or new_task(context.message) await event_queue.enqueue_event(task) await event_queue.enqueue_event( TaskStatusUpdateEvent( task_id=context.task_id, context_id=context.context_id, status=TaskStatus( state=TaskState.TASK_STATE_WORKING, message=new_agent_text_message('Processing request...'), ), ) ) result = await self.agent.invoke() await event_queue.enqueue_event( TaskArtifactUpdateEvent( task_id=context.task_id, context_id=context.context_id, artifact=new_text_artifact(name='result', text=result), ) ) await event_queue.enqueue_event( TaskStatusUpdateEvent( task_id=context.task_id, context_id=context.context_id, status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED), ) )When a
message/sendormessage/streamrequest comes in (both are handled byexecutein this simplified executor):- It retrieves the current task from the context or creates a new one, enqueueing it as the first event.
- It enqueues a
TaskStatusUpdateEventwith a state ofTASK_STATE_WORKINGto indicate the agent has begun processing. - It calls
self.agent.invoke()to execute the actual business logic (which simply returns "Hello, World!"). - It enqueues a
TaskArtifactUpdateEventcontaining the result text. - Finally, it enqueues a
TaskStatusUpdateEventwith a state ofTASK_STATE_COMPLETEDto conclude the task.
-
cancel: The Hello World example'scancelmethod simply raises an exception, indicating that cancellation is not supported for this basic agent.
-
The AgentExecutor acts as the bridge between the A2A protocol (managed by the request handler and server application) and your agent's specific logic. It receives context about the request and uses an event queue to communicate results or updates back.