SQLAlchemy 2.0 async infrastructure pattern reference. Use when: DB model definition, Base model setup, table mapping, relationship config, session management, AsyncSession, sessionmaker, connection pool, query patterns (select, join, subquery, pagination), Mixin (Timestamp, SoftDelete), N+1 prevention (selectinload, joinedload), transaction management, nested transaction, savepoint, generic repository pattern, BaseRepository[ModelType]. NOT for: domain entity design (domain-layer skill), Alembic migrations.
Install
npx skillscat add nomik94/claude-code-preset/sqlalchemy Install via the SkillsCat registry.
SQLAlchemy 2.0 Async Skill
Base Model
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, registry
class Base(DeclarativeBase):
registry = registry(
type_annotation_map={str: String(255)}
)Mixins
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, func
class IdMixin:
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False,
)
class SoftDeleteMixin:
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), default=None)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, index=True)
def soft_delete(self) -> None:
self.deleted_at = func.now()
self.is_active = False
# Usage: class UserModel(IdMixin, TimestampMixin, SoftDeleteMixin, Base): ...Session Management
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
engine = create_async_engine(
settings.database.url, # "postgresql+asyncpg://..."
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=settings.debug,
)
async_session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseConnection Pool Settings
| Parameter | Value | Purpose |
|---|---|---|
pool_size |
20 | steady-state connections |
max_overflow |
10 | burst capacity (total max = 30) |
pool_pre_ping |
True | detect stale connections |
pool_recycle |
3600 | avoid DB timeout |
pool_timeout |
30 | max wait for available connection |
Relationship Patterns
MUST use lazy="raise" on ALL relationships. This prevents N+1 queries at attribute-access time by raising InvalidRequestError if a relationship is accessed without explicit eager loading. Unlike lazy="noload" (which silently returns empty), lazy="raise" fails loudly -- forcing developers to declare loading strategy in every query.
Checklist
- Every
relationship()MUST havelazy="raise" - Every query accessing related data MUST use explicit
selectinload/joinedload - Never rely on implicit lazy loading in async context
One-to-Many
class UserModel(IdMixin, TimestampMixin, Base):
__tablename__ = "users"
email: Mapped[str] = mapped_column(String(320), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
posts: Mapped[list["PostModel"]] = relationship(back_populates="author", lazy="raise")
class PostModel(IdMixin, TimestampMixin, Base):
__tablename__ = "posts"
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
author: Mapped["UserModel"] = relationship(back_populates="posts", lazy="raise")Many-to-Many (Association Table)
post_tags = Table(
"post_tags", Base.metadata,
Column("post_id", ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class PostModel(IdMixin, Base):
__tablename__ = "posts"
tags: Mapped[list["TagModel"]] = relationship(
secondary=post_tags, back_populates="posts", lazy="raise",
)
class TagModel(IdMixin, Base):
__tablename__ = "tags"
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["PostModel"]] = relationship(
secondary=post_tags, back_populates="tags", lazy="raise",
)Query Patterns
Basic Select
stmt = select(UserModel).where(UserModel.email == email)
user = (await db.execute(stmt)).scalar_one_or_none()Join Query
stmt = (
select(PostModel, UserModel.name)
.join(UserModel, PostModel.author_id == UserModel.id)
.where(UserModel.is_active == True)
.order_by(PostModel.created_at.desc())
)
rows = (await db.execute(stmt)).all()Offset/Limit Pagination
stmt = (
select(PostModel)
.where(PostModel.is_active == True)
.order_by(PostModel.created_at.desc())
.offset(offset)
.limit(limit)
)
items = (await db.execute(stmt)).scalars().all()
count_stmt = select(func.count()).select_from(PostModel).where(PostModel.is_active == True)
total = (await db.execute(count_stmt)).scalar_one()Eager Loading (N+1 Prevention)
With lazy="raise", accessing an unloaded relationship raises an error. MUST use explicit loading:
from sqlalchemy.orm import selectinload, joinedload
# selectinload: separate IN query -- preferred for collections
stmt = select(UserModel).options(selectinload(UserModel.posts)).where(UserModel.id == user_id)
# joinedload: single JOIN -- preferred for single/scalar relations
stmt = select(PostModel).options(joinedload(PostModel.author)).where(PostModel.id == post_id)
# nested eager loading
stmt = select(UserModel).options(
selectinload(UserModel.posts).selectinload(PostModel.tags),
)Exists Subquery
has_posts = exists().where(PostModel.author_id == UserModel.id, PostModel.is_active == True)
stmt = select(UserModel).where(has_posts)
active_authors = (await db.execute(stmt)).scalars().all()BaseRepository[ModelType] Generic
Generic CRUD repository that eliminates boilerplate. Concrete repositories inherit and add domain-specific queries.
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
"""Generic async CRUD repository."""
model_class: type[ModelType]
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def get_by_id(self, id: int) -> ModelType | None:
return await self.db.get(self.model_class, id)
async def get_many(
self, *, offset: int = 0, limit: int = 100,
) -> list[ModelType]:
stmt = select(self.model_class).offset(offset).limit(limit)
return list((await self.db.execute(stmt)).scalars().all())
async def create(self, model: ModelType) -> ModelType:
self.db.add(model)
await self.db.flush()
return model
async def update(self, model: ModelType, **attrs: object) -> ModelType:
for key, value in attrs.items():
setattr(model, key, value)
await self.db.flush()
return model
async def delete(self, model: ModelType) -> None:
await self.db.delete(model)
await self.db.flush()Concrete Repository Example
Domain layer defines the Protocol; infrastructure implements via BaseRepository.
# domain/repositories.py (Protocol -- no SQLAlchemy imports)
from typing import Protocol
class UserRepository(Protocol):
async def find_by_id(self, user_id: int) -> UserEntity | None: ...
async def find_by_email(self, email: str) -> UserEntity | None: ...
async def save(self, entity: UserEntity) -> UserEntity: ...
# infrastructure/repositories/user_repository.py
class SqlAlchemyUserRepository(BaseRepository[UserModel]):
model_class = UserModel
async def find_by_id(self, user_id: int) -> UserEntity | None:
model = await self.get_by_id(user_id)
return self._to_entity(model) if model else None
async def find_by_email(self, email: str) -> UserEntity | None:
stmt = select(UserModel).where(UserModel.email == email)
model = (await self.db.execute(stmt)).scalar_one_or_none()
return self._to_entity(model) if model else None
async def save(self, entity: UserEntity) -> UserEntity:
if entity.id is None:
model = await self.create(self._to_model(entity))
else:
model = await self.get_by_id(entity.id)
assert model is not None
self._apply_changes(model, entity)
await self.db.flush()
return self._to_entity(model)
def _to_entity(self, model: UserModel) -> UserEntity:
return UserEntity(id=model.id, email=Email(model.email), name=model.name)
def _to_model(self, entity: UserEntity) -> UserModel:
return UserModel(email=entity.email.value, name=entity.name)
def _apply_changes(self, model: UserModel, entity: UserEntity) -> None:
model.email = entity.email.value
model.name = entity.nameBaseRepository Checklist
-
model_classMUST be set on every concrete repository - Domain Protocol in
domain/-- zero SQLAlchemy imports - Concrete repository in
infrastructure/-- implements Protocol - Entity-Model mapping methods (
_to_entity,_to_model) MUST be in repository - Use
flush()notcommit()-- commit at service/use-case boundary
Transaction Patterns
Unit of Work (Application Service)
class OrderApplicationService:
def __init__(self, order_repo: OrderRepository, db: AsyncSession) -> None:
self.order_repo = order_repo
self.db = db
async def place_order(self, command: PlaceOrderCommand) -> int:
order = Order.create(...)
saved = await self.order_repo.save(order)
await self.db.commit() # single commit at use-case boundary
return saved.idNested Transaction (Savepoint)
async def transfer(self, from_id: int, to_id: int, amount: Money) -> None:
async with self.db.begin_nested(): # SAVEPOINT
from_acc = await self.account_repo.find_by_id(from_id)
to_acc = await self.account_repo.find_by_id(to_id)
from_acc.withdraw(amount)
to_acc.deposit(amount)
await self.account_repo.save(from_acc)
await self.account_repo.save(to_acc)
await self.db.commit() # outer commitQuick Reference
| Rule | Detail |
|---|---|
lazy="raise" |
MUST on all relationships |
expire_on_commit=False |
MUST on async sessionmaker |
selectinload |
collections (one-to-many, many-to-many) |
joinedload |
scalar relations (many-to-one) |
flush() in repository |
commit() in application service |
BaseRepository[ModelType] |
inherit for CRUD, extend for domain queries |
pool_pre_ping=True |
MUST for production |