a2a.server.tasks.database_task_store module

class a2a.server.tasks.database_task_store.DatabaseTaskStore(engine: ~sqlalchemy.ext.asyncio.engine.AsyncEngine, create_table: bool = True, table_name: str = 'tasks', owner_resolver: ~collections.abc.Callable[[~a2a.server.context.ServerCallContext], str] = <function resolve_user_scope>, core_to_model_conversion: ~collections.abc.Callable[[~a2a_pb2.Task, str], ~a2a.server.models.TaskModel] | None = None, model_to_core_conversion: ~collections.abc.Callable[[~a2a.server.models.TaskModel], ~a2a_pb2.Task] | None = None)

Bases: TaskStore

SQLAlchemy-based implementation of TaskStore.

Stores task objects in a database supported by SQLAlchemy.

async_session_maker: async_sessionmaker[AsyncSession]
core_to_model_conversion: Callable[[Task, str], TaskModel] | None = None
create_table: bool
async delete(task_id: str, context: ServerCallContext) None

Deletes a task from the database by ID, for the given owner.

engine: AsyncEngine
async get(task_id: str, context: ServerCallContext) Task | None

Retrieves a task from the database by ID, for the given owner.

async initialize() None

Initialize the database and create the table if needed.

async list(params: ListTasksRequest, context: ServerCallContext) ListTasksResponse

Retrieves tasks from the database based on provided parameters, for the given owner.

model_to_core_conversion: Callable[[TaskModel], Task] | None = None
owner_resolver: Callable[[ServerCallContext], str]
async save(task: Task, context: ServerCallContext) None

Saves or updates a task in the database for the resolved owner.

task_model: type[TaskModel]