数据库操作与 ORM
Python 操作数据库的三个层次——原生 SQL、SQLAlchemy Core、SQLAlchemy ORM,从控制力到便捷性逐步递进。
数据库操作层次
graph TD
DB[Python 数据库操作] --> RAW[原生 SQL]
DB --> CORE[SQLAlchemy Core]
DB --> ORM[SQLAlchemy ORM]
RAW --> SQLITE[sqlite3 标准库]
RAW --> PSYCOPG[psycopg2 / asyncpg]
CORE --> TABLE[Table 定义]
CORE --> EXPR[SQL 表达式]
ORM --> MODEL[Model 映射]
ORM --> SESSION[Session 管理]
ORM --> QUERY[查询构建器]
style DB fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style ORM fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
SQLite 原生操作
"""
sqlite3:标准库自带,零配置
"""
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
@dataclass
class User:
id: int
name: str
email: str
@contextmanager
def get_db(path: str = ":memory:"):
"""数据库连接上下文管理器"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 字典式访问
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
with get_db() as db:
# 建表
db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
""")
# 插入(参数化查询防 SQL 注入)
db.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
("Alice", "alice@example.com"),
)
# 批量插入
users = [
("Bob", "bob@example.com"),
("Carol", "carol@example.com"),
]
db.executemany(
"INSERT INTO users (name, email) VALUES (?, ?)", users
)
# 查询
cursor = db.execute("SELECT * FROM users WHERE name LIKE ?", ("%o%",))
for row in cursor:
print(f"{row['id']}: {row['name']} <{row['email']}>")
SQLAlchemy ORM
"""
SQLAlchemy 2.0 ORM:现代 Python 数据库最佳实践
"""
from sqlalchemy import create_engine, String, ForeignKey, select
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
Session,
)
# === 定义基类 ===
class Base(DeclarativeBase):
pass
# === 定义模型 ===
class Author(Base):
__tablename__ = "authors"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
books: Mapped[list["Book"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"Author(id={self.id}, name='{self.name}')"
class Book(Base):
__tablename__ = "books"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
price: Mapped[float]
author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
author: Mapped["Author"] = relationship(back_populates="books")
def __repr__(self) -> str:
return f"Book(id={self.id}, title='{self.title}')"
# === 创建引擎和表 ===
engine = create_engine("sqlite:///bookstore.db", echo=False)
Base.metadata.create_all(engine)
# === CRUD 操作 ===
with Session(engine) as session:
# Create
author = Author(name="太宰治")
author.books = [
Book(title="人间失格", price=29.9),
Book(title="斜阳", price=25.0),
]
session.add(author)
session.commit()
# Read
stmt = select(Book).where(Book.price < 30).order_by(Book.title)
books = session.scalars(stmt).all()
for book in books:
print(f"{book.title} - ¥{book.price} by {book.author.name}")
# Update
book = session.get(Book, 1)
if book:
book.price = 35.0
session.commit()
# Delete
stmt = select(Author).where(Author.name == "太宰治")
author = session.scalar(stmt)
if author:
session.delete(author)
session.commit()
异步数据库
"""
异步数据库操作(SQLAlchemy 2.0 + asyncio)
"""
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
from sqlalchemy import select
# 异步引擎
async_engine = create_async_engine(
"sqlite+aiosqlite:///async_app.db",
echo=False,
)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession)
async def async_crud():
# 建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 增删改查
async with AsyncSessionLocal() as session:
# Create
author = Author(name="川端康成")
session.add(author)
await session.commit()
# Read
stmt = select(Author).where(Author.name == "川端康成")
result = await session.execute(stmt)
author = result.scalar_one()
print(f"找到: {author}")
# asyncio.run(async_crud())
数据库迁移
"""
Alembic 数据库迁移管理
"""
ALEMBIC_SETUP = """
# 安装
pip install alembic
# 初始化
alembic init migrations
# 配置 alembic.ini 中的数据库 URL
# sqlalchemy.url = sqlite:///app.db
# 生成迁移脚本
alembic revision --autogenerate -m "add users table"
# 执行迁移
alembic upgrade head
# 回滚
alembic downgrade -1
# 查看历史
alembic history
"""
# 迁移脚本示例
MIGRATION_EXAMPLE = """
# migrations/versions/xxx_add_users_table.py
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('email', sa.String(200), unique=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
def downgrade():
op.drop_table('users')
"""
ORM 选择指南
| 特性 | sqlite3 | SQLAlchemy Core | SQLAlchemy ORM |
|---|---|---|---|
| 学习难度 | 低 | 中 | 高 |
| 控制力 | 最高 | 高 | 中 |
| 开发效率 | 低 | 中 | 最高 |
| 关系映射 | 手动 | 手动 | 自动 |
| 迁移支持 | 无 | Alembic | Alembic |
| 异步支持 | aiosqlite | 原生 | 原生 |
| 适用场景 | 小脚本 | 复杂查询 | Web 应用 |
本章小结
| 知识点 | 要点 |
|---|---|
| sqlite3 | 标准库、参数化查询防注入 |
| SQLAlchemy | Mapped + mapped_column 声明模型 |
| Session | 上下文管理、CRUD 操作 |
| 异步 | AsyncSession + aiosqlite |
| 迁移 | Alembic 管理 schema 变更 |
下一章:数据分析——NumPy 与 Pandas 实战。