4. The Agent Executor¶
The core logic of how an A2A agent processes requests and generates responses/events is handled by an Agent Executor. The A2A Python SDK provides an abstract base class a2a.server.agent_execution.AgentExecutor that you implement.
AgentExecutor Interface¶
The AgentExecutor class defines two primary methods:
async def execute(self, context: RequestContext, event_queue: EventQueue): Handles incoming requests that expect a response or a stream of events. It processes the user's input (available viacontext) and uses theevent_queueto send backMessage,Task,TaskStatusUpdateEvent, orTaskArtifactUpdateEventobjects.async def cancel(self, context: RequestContext, event_queue: EventQueue): Handles requests to cancel an ongoing task.
The RequestContext provides information about the incoming request, such as the user's message and any existing task details. The EventQueue is used by the executor to send events back to the client.
Helloworld Agent Executor¶
Let's look at agent_executor.py. It defines HelloWorldAgentExecutor.
-
The Agent (
HelloWorldAgent): This is a simple helper class that encapsulates the actual "business logic".class HelloWorldAgent: """Hello World Agent.""" async def invoke(self, user_request: str) -> str: """Invoke the Hello World agent to generate a response.""" return f'Hello, World! I have received your request ({user_request})'It has a simple
invokemethod that returns the string "Hello, World!". -
The Executor (
HelloWorldAgentExecutor): This class implements theAgentExecutorinterface.-
__init__:class HelloWorldAgentExecutor(AgentExecutor): """Test AgentProxy Implementation.""" def __init__(self) -> None: self.agent = HelloWorldAgent()It instantiates the
HelloWorldAgent. -
execute:async def execute( self, context: RequestContext, event_queue: EventQueue, ) -> None: """Process user request.""" # 1. Collect a task from request context if context.current_task: task = context.current_task else: # 1.1 If there is no task, create one and add it event queue task = new_task_from_user_message(context.message) await event_queue.enqueue_event(task) # 2. Update task status in EventQueue using TaskUpdater class object task_updater = TaskUpdater( event_queue=event_queue, task_id=task.id, context_id=task.context_id ) await task_updater.update_status( state=TaskState.TASK_STATE_WORKING, message=new_text_message('Processing request...'), ) # 3. Collect user request from request content and invoke LLM agent to generate content query = get_message_text(context.message) if query: result = await self.agent.invoke(user_request=query) else: result = 'No text input is provided!' # 4. Add generated response as an artifact to EventQueue await task_updater.add_artifact(parts=[new_text_part(text=result, media_type='text/plain')]) print('Result: ', result) # 5. Update task status to completed await task_updater.update_status( state=TaskState.TASK_STATE_COMPLETED, message=new_text_message('Request is completed!'), )When a
Send MessageorSend Streaming Messagerequest comes in (both are handled byexecutein this simplified executor), the following steps occur:Step 1. The
A2A instance(server) retrieves the current task from the context. If there is no task in context, then it creates a new task and adds it to theEventQueue.Step 2. It enqueues a
TaskStatusUpdateEventwith a state ofTASK_STATE_WORKINGto indicate the agent has begun processing.Step 3. It calls
self.agent.invoke()to execute the actual business logic (which simply returns "Hello, World!").Step 4. It enqueues a
TaskArtifactUpdateEventcontaining the result text from the agent.Step 5. Finally, it enqueues a
TaskStatusUpdateEventwith a state ofTASK_STATE_COMPLETEDto conclude the task.
-
The AgentExecutor acts as the bridge between the A2A protocol (managed by the request handler and server application) and your agent's specific logic. It receives context about the request and uses an event queue to communicate results or updates back.