a2a.server.agent_execution.active_task module

Active Task execution and data flow architecture.

This module manages the lifecycle, execution, and data flow of an active A2A task.

High-Level Architecture: - AgentExecutor: An interface with two main methods, execute(…) and cancel(…),

responsible for running the core agent logic.

  • ActiveTask: Coordinates the execution. It runs a main agent loop in ActiveTask._run_producer().

Data Flow and Event Handling: 1. Producer (ActiveTask._run_producer):

  • Listens for incoming requests from ActiveTask._request_queue.

  • Acquires ActiveTask._request_lock to ensure requests are processed sequentially.

  • Calls AgentExecutor.execute() passing ActiveTask._event_queue_agent as the primary communication channel.

  • Enqueues internal lifecycle events (e.g., _RequestStarted, _RequestCompleted) and exceptions to the same ActiveTask._event_queue_agent.

  1. Consumer (EventConsumer): - Consumes events from ActiveTask._event_queue_agent. - Processes all events, updating task state and the database (TaskManager). - Upon receiving _RequestCompleted, it releases ActiveTask._request_lock, making the producer

    ready to process the next queued request.

    • Propagates relevant agent updates and state changes to ActiveTask._event_queue_subscribers.

    • Exception Handling: Re-raises exceptions dequeued from the agent queue, or raises its own validation errors (e.g., ‘Received Message object in task mode.’). In case of failure, it updates the task state to failed and propagates the exception to ActiveTask._event_queue_subscribers.

  2. Subscribers (ActiveTask.subscribe): - ActiveTask._event_queue_subscribers is not consumed directly. Instead, it is consumed by tapped

    queues created within the ActiveTask.subscribe() method.

    • The ActiveTask.subscribe() method is also used to actively run requests, creating a temporary subscription that yields events and automatically finishes once the request is completed.

class a2a.server.agent_execution.active_task.ActiveTask(agent_executor: AgentExecutor, task_id: str, task_manager: TaskManager, push_sender: PushNotificationSender | None = None, on_cleanup: Callable[[ActiveTask], None] | None = None)

Bases: object

Manages the lifecycle and execution of an active A2A task.

It coordinates between the agent’s execution (the producer), the persistence and state management (the TaskManager), and the event distribution to subscribers (the consumer).

Concurrency Guarantees: - This class is designed to be highly concurrent. It manages an internal

producer-consumer model using `asyncio.Task`s.

  • self._lock (asyncio.Lock) ensures mutually exclusive access for critical lifecycle state changes, such as starting the task, subscribing, and determining if cleanup is safe to trigger.

  • self._is_finished (asyncio.Event) provides a thread-safe, non-blocking way for external observers and internal loops to check if the ActiveTask has permanently ceased execution and closed its queues.

async cancel(call_context: ServerCallContext) Task

Cancels the running active task.

Concurrency Guarantee: Uses _lock to ensure we don’t attempt to cancel a producer that is already winding down or hasn’t started. It fires the cancellation signal and blocks until the consumer processes the cancellation events.

async enqueue_request(request_context: RequestContext) UUID

Enqueues a request for the active task to process.

async get_task() Task

Get task from db.

async start(call_context: ServerCallContext, create_task_if_missing: bool = False) None

Starts the active task background processes.

Concurrency Guarantee: Uses self._lock to ensure the producer and consumer tasks are strictly singleton instances for the lifetime of this ActiveTask.

async subscribe(*, request: RequestContext | None = None, include_initial_task: bool = False, replace_status_update_with_task: bool = False) AsyncGenerator[Event, None]

Creates a queue tap and yields events as they are produced.

Concurrency Guarantee: Uses _lock to safely increment and decrement _reference_count. Safely detaches its queue tap when the client disconnects or the task finishes, triggering _maybe_cleanup() to potentially garbage collect the ActiveTask.

property task_id: str

The ID of the task.

class a2a.server.agent_execution.active_task.EventConsumer(active_task: ActiveTask)

Bases: object

Consumes events from the agent and updates system state.

async run() None

Consumes events from the agent and updates system state.