在 Sanic 中构建解决读写分离会话一致性问题的异步数据库路由


我们的一个核心服务,基于 Sanic 构建,随着用户量的增长,数据库的读取压力成为了主要的性能瓶颈。团队在一次 Sprint 规划会中决定引入数据库读写分离架构。这个决策本身并不复杂,但在实施的第一个迭代周期结束后,测试环境就暴露了一个典型却棘手的问题:用户更新个人资料后,立即刷新页面,看到的依旧是旧数据。这是一个典型的读写分离架构下的“主从延迟”导致的数据不一致问题,具体来说,是破坏了“读己之写”(Read-Your-Writes)的会话一致性保证。

问题的根源很清晰:写操作发生在主库,而紧随其后的读操作被路由到了尚未完成数据同步的从库。在同步的 Web 框架中,我们通常会用 threading.local 来在单个请求的生命周期内“标记”这个请求已经执行了写操作,从而将后续的读请求强制路由到主库。但在 Sanic 这样的异步框架里,threading.local 会失效,因为一个线程会处理多个不同的请求上下文,协程切换会让状态混淆。

Sprint 1: 初步尝试与异步陷阱

在第一个 Sprint,我们的目标是快速验证一个基础的读写分离路由器。团队的初步构想是实现一个 SQLAlchemy 的 Session 代理,根据 SQL 语句的类型来决定使用哪个数据库引擎。

# db_routing/session.py

import logging
from sqlalchemy.orm import sessionmaker, Session as BaseSession
from sqlalchemy import create_engine
from typing import Dict, Any

# 假设的配置
DB_CONFIG = {
    'primary': 'postgresql+asyncpg://user:pass@primary-db:5432/main',
    'replicas': [
        'postgresql+asyncpg://user:pass@replica-db-1:5432/main',
        'postgresql+asyncpg://user:pass@replica-db-2:5432/main',
    ]
}

# 日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class RoutingSession(BaseSession):
    """
    一个简单的、有缺陷的路由会话,用于演示初步构想。
    它无法处理异步上下文中的会话一致性。
    """
    def get_bind(self, mapper=None, clause=None, **kw):
        # 这是一个非常简化的判断逻辑
        # 生产环境中需要更复杂的SQL解析
        is_write = kw.get('_flushing', False)
        
        # 简单通过 clause 的字符串形式判断是否为读操作
        # 这里的判断非常脆弱,仅作示例
        sql_text = str(clause).strip().upper()
        is_read_query = sql_text.startswith('SELECT')

        if is_write or not is_read_query:
            logging.info("Routing to PRIMARY for write operation.")
            return self.engines['primary']
        else:
            # 简单的轮询策略
            import random
            replica = random.choice(self.engines['replicas'])
            logging.info(f"Routing to REPLICA ({replica.url.host}) for read operation.")
            return replica

# 创建引擎
engines = {
    'primary': create_engine(DB_CONFIG['primary'], future=True, echo=False),
    'replicas': [create_engine(url, future=True, echo=False) for url in DB_CONFIG['replicas']]
}

# 将引擎绑定到自定义 Session 类
RoutingSession.engines = engines

# 创建 Session 工厂
Session = sessionmaker(class_=RoutingSession)

# 使用示例
# async def main():
#     async with Session() as session:
#         # ...

这个实现在单元测试中表现良好,但在集成到 Sanic 应用并进行并发测试时,问题立刻浮现。当一个写请求(await user.save())完成后,同一个用户的下一个读请求(await get_user())被路由到从库,数据不一致的现象频繁发生。

团队在 Sprint 的评审会议上复盘了这个问题。核心症结在于,数据库路由逻辑是无状态的。它只知道当前这一条SQL是读还是写,但它不知道这个请求、这个用户、或者这个会话在不久前是否执行过写操作。我们需要一种机制来传递“这个会话需要强一致性读”的信号,而且这个机制必须是异步安全的。

Sprint 2: 引入 contextvars 实现请求级一致性

在第二个 Sprint,我们明确了技术目标:必须找到一个能在 Sanic 的异步环境中可靠传递请求上下文的方案。contextvars 就是为此而生的标准库。它提供了协程安全的上下文变量,可以确保在 async/await 调用链中,变量值与正确的协程上下文绑定。

我们的新策略是:

  1. 创建一个 contextvar 变量,用于标记当前请求是否需要强制从主库读取。
  2. 创建一个 Sanic 中间件,在每个请求开始时重置这个上下文变量。
  3. 重构数据库路由逻辑,使其能够感知这个 contextvar
  4. 在执行写操作的数据库接口层,主动设置这个 contextvar

第一步:定义上下文变量和新的数据库路由器

# db_routing/context.py

import contextvars
from typing import Optional

# 定义一个协程安全的上下文变量
# 它将携带一个标志,指示当前上下文是否需要强制从主库读取
# Optional[bool] 意味着它可以是 True, False, 或者 None (未设置)
force_primary_read_context: contextvars.ContextVar[Optional[bool]] = \
    contextvars.ContextVar('force_primary_read', default=None)
# db_routing/async_session.py (重构)

import logging
import random
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from typing import Dict, Any

from .context import force_primary_read_context

# 假设的配置
DB_CONFIG = {
    'primary': 'postgresql+asyncpg://user:pass@primary-db:5432/main',
    'replicas': [
        'postgresql+asyncpg://user:pass@replica-db-1:5432/main',
    ]
}

logging.basicConfig(level=logging.INFO, format='%(asctime)s - [%(name)s] - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class AsyncRoutingSession(AsyncSession):
    """
    一个感知异步上下文的路由会话。
    它使用 contextvars 来决定路由策略。
    """
    def get_bind(self, mapper=None, clause=None, **kw):
        # 1. 检查 contextvars 中是否有强制主库读取的标志
        if force_primary_read_context.get():
            logger.info("Routing to PRIMARY due to context flag.")
            return self.engines['primary']

        # 2. 检查操作本身是否为写操作
        if self._flushing:
            logger.info("Routing to PRIMARY for write operation (flushing).")
            return self.engines['primary']
        
        # 3. 默认将读操作路由到从库
        # 在真实项目中,这里需要更健壮的负载均衡和健康检查策略
        replica = random.choice(self.engines['replicas'])
        logger.info(f"Routing to REPLICA ({replica.url.host}) for read operation.")
        return replica

# 创建异步引擎
engines = {
    'primary': create_async_engine(DB_CONFIG['primary'], future=True, pool_size=10, max_overflow=20),
    'replicas': [create_async_engine(url, future=True, pool_size=10, max_overflow=20) for url in DB_CONFIG['replicas']]
}

# 将引擎绑定到自定义 Session 类
AsyncRoutingSession.engines = engines

# 创建异步 Session 工厂
AsyncSessionMaker = sessionmaker(
    class_=AsyncRoutingSession,
    expire_on_commit=False,
)

第二步:创建 Sanic 应用和中间件

# app.py

import asyncio
from sanic import Sanic, response
from sanic.request import Request
from sqlalchemy.future import select

from db_routing.async_session import AsyncSessionMaker
from db_routing.context import force_primary_read_context
# 假设我们有一个 User 模型
# from models import User 

# 模拟模型
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

app = Sanic("ReadWriteSplitApp")

@app.middleware('request')
async def reset_db_context(request: Request):
    """
    在每个请求开始时,重置数据库路由的上下文,确保请求之间不串扰。
    """
    force_primary_read_context.set(False)

# 封装数据库会话管理
async def get_db_session():
    async with AsyncSessionMaker() as session:
        yield session

@app.post("/users")
async def create_user(request: Request):
    data = request.json
    # 在这个 with 块中,所有的数据库操作都会通过同一个 session 实例
    async with AsyncSessionMaker() as session:
        async with session.begin():
            # 这里是写操作,get_bind 会自动路由到主库
            new_user = User(name=data['name'], email=data['email'])
            session.add(new_user)
            
            # 关键一步:在写操作提交前,设置上下文标志
            # 这样,如果这个请求后续还有读操作,它们也会被强制路由到主库
            force_primary_read_context.set(True)
            
            # session.commit() 会在 async with session.begin() 结束时自动调用
            # flush 会在 commit 前发生,触发 get_bind 的 _flushing=True 分支
        
        # 模拟写后立即读的场景
        result = await session.execute(select(User).where(User.email == data['email']))
        user = result.scalar_one()

        return response.json({"id": user.id, "name": user.name, "message": "User created and read back successfully from primary."})

@app.get("/users/<user_id:int>")
async def get_user(request: Request, user_id: int):
    async with AsyncSessionMaker() as session:
        # 这个读操作默认会路由到从库
        result = await session.execute(select(User).where(User.id == user_id))
        user = result.scalar_one_or_none()

        if user:
            return response.json({"id": user.id, "name": user.name})
        return response.json({"error": "User not found"}, status=404)

这个方案解决了请求内部的“读己之写”问题。在 create_user 路由中,因为我们设置了 force_primary_read_context.set(True),随后的 select 查询被正确地路由到了主库。

然而,在 Sprint 评审时,产品经理提出了一个场景:用户更新了用户名,然后 点击另一个链接 跳转到个人资料页。这是两个独立的 HTTP 请求。我们的方案无法覆盖这种情况,第二个 GET 请求的上下文是全新的,force_primary_readFalse,查询依然可能被路由到有延迟的从库。

Sprint 3: 引入 Redis 实现跨请求的会话级一致性

问题的边界已经很清晰了:我们需要一种能够跨越单个 HTTP 请求、但又有生命周期限制的状态。这个状态应该与用户会话关联。在第三个 Sprint,团队决定引入 Redis 来实现一个“粘性会话”(Sticky Session)策略。

策略如下:

  1. 当一个用户的请求中包含写操作时,在操作成功后,向 Redis 中写入一个有时效性的键,例如 user_read_primary:<user_id>,TTL 设置为5秒(这个值需要根据业务的主从延迟来调整)。
  2. 在每个请求的中间件中,检查当前用户是否存在于 Redis 的粘性会话标记中。
  3. 如果存在,则为当前请求的上下文设置 force_primary_read_context.set(True)

架构流程图

sequenceDiagram
    participant Client
    participant SanicApp as Sanic App
    participant Middleware
    participant Redis
    participant DBRrouter as DB Router
    participant PrimaryDB as Primary DB
    participant ReplicaDB as Replica DB

    Client->>SanicApp: POST /users/{id}/update (Write Request)
    SanicApp->>Middleware: Request start
    Middleware->>SanicApp: Check Redis (no key), context.set(False)
    SanicApp->>DBRrouter: Write operation
    DBRrouter->>PrimaryDB: Route to Primary
    PrimaryDB-->>DBRrouter: Success
    DBRrouter-->>SanicApp: Success
    SanicApp->>Redis: SETEX user_read_primary:123 5
    Redis-->>SanicApp: OK
    SanicApp-->>Client: 200 OK

    %% A few moments later %%

    Client->>SanicApp: GET /users/123 (Read Request)
    SanicApp->>Middleware: Request start
    Middleware->>Redis: GET user_read_primary:123
    Redis-->>Middleware: Key exists
    Middleware->>SanicApp: Set context.set(True)
    SanicApp->>DBRrouter: Read operation
    Note right of DBRrouter: force_primary_read is True
    DBRrouter->>PrimaryDB: Route to Primary
    PrimaryDB-->>DBRrouter: Fresh data
    DBRrouter-->>SanicApp: User data
    SanicApp-->>Client: 200 OK (Consistent Data)

代码实现

我们需要一个异步 Redis 客户端,aioredis 是一个很好的选择。

# utils/redis_client.py
import aioredis
from sanic import Sanic

class RedisClient:
    def __init__(self):
        self.client = None

    async def init_app(self, app: Sanic, config: dict):
        self.client = aioredis.from_url(
            config['REDIS_URL'], 
            encoding="utf-8", 
            decode_responses=True
        )
        app.ctx.redis = self.client
        app.before_server_stop(self.close_connection)
    
    async def close_connection(self, app, loop):
        await self.client.close()

redis_client = RedisClient()

然后,我们需要更新 Sanic 应用和中间件来集成这个逻辑。

# app.py (扩展)

import aioredis
from sanic import Sanic, response
from sanic.request import Request
from sqlalchemy.future import select

from db_routing.async_session import AsyncSessionMaker
from db_routing.context import force_primary_read_context
from utils.redis_client import redis_client

# ... User model ...

# 假设的配置
APP_CONFIG = {
    "REDIS_URL": "redis://localhost:6379/0",
    "STICKY_SESSION_TTL_SECONDS": 5
}

app = Sanic("ReadWriteSplitApp")
app.config.update(APP_CONFIG)

@app.before_server_start
async def setup_db(app, loop):
    # 初始化 Redis 连接
    await redis_client.init_app(app, app.config)

# 粘性会话的 Key
def get_sticky_key(user_id: int) -> str:
    return f"user_read_primary:{user_id}"

@app.middleware('request')
async def sticky_session_middleware(request: Request):
    """
    在每个请求开始时,检查并设置数据库路由上下文。
    """
    # 默认重置
    force_primary_read_context.set(False)

    # 假设用户认证信息在 request.ctx.user 中
    user = getattr(request.ctx, 'user', None)
    if user and hasattr(user, 'id'):
        redis: aioredis.Redis = request.app.ctx.redis
        sticky_key = get_sticky_key(user.id)
        if await redis.exists(sticky_key):
            # 如果存在粘性标记,则强制本次请求读主库
            force_primary_read_context.set(True)

# 模拟认证中间件
@app.middleware('request')
async def authenticate(request: Request):
    # 在真实项目中,这里会从 token 或 session 中解析用户
    # 为了演示,我们硬编码一个用户对象
    from types import SimpleNamespace
    user_id_header = request.headers.get('X-User-ID')
    if user_id_header:
        request.ctx.user = SimpleNamespace(id=int(user_id_header))


async def mark_session_sticky(redis: aioredis.Redis, user_id: int):
    """
    在写操作后,标记用户会话为粘性。
    """
    sticky_key = get_sticky_key(user_id)
    await redis.setex(sticky_key, app.config.STICKY_SESSION_TTL_SECONDS, "1")

@app.put("/users/<user_id:int>")
async def update_user(request: Request, user_id: int):
    # 模拟认证检查
    if not hasattr(request.ctx, 'user') or request.ctx.user.id != user_id:
        return response.json({"error": "Forbidden"}, status=403)

    data = request.json
    redis: aioredis.Redis = request.app.ctx.redis

    async with AsyncSessionMaker() as session:
        async with session.begin():
            result = await session.execute(select(User).where(User.id == user_id))
            user_to_update = result.scalar_one_or_none()
            if not user_to_update:
                return response.json({"error": "User not found"}, status=404)
            
            user_to_update.name = data.get('name', user_to_update.name)
            
            # 写操作会自动路由到主库
            session.add(user_to_update)

        # 写操作成功后,标记会话
        await mark_session_sticky(redis, user_id)

    return response.json({"id": user_to_update.id, "name": user_to_update.name})

@app.get("/users/<user_id:int>")
async def get_user_with_sticky_session(request: Request, user_id: int):
    # 这个GET请求的路由行为现在取决于Redis中是否存在粘性标记
    async with AsyncSessionMaker() as session:
        result = await session.execute(select(User).where(User.id == user_id))
        user = result.scalar_one_or_none()

        if user:
            return response.json({"id": user.id, "name": user.name})
        return response.json({"error": "User not found"}, status=404)

这个最终方案在团队内得到了认可。它通过 contextvars 解决了异步上下文传递的问题,又通过 Redis 实现了跨请求的、有时间窗口的会话一致性。在 Scrum 的迭代过程中,我们从一个有缺陷的简单实现,逐步识别问题边界,最终演进到一个在工程上更完备、更能应对真实业务场景的解决方案。

当前方案的局限性与未来展望

这套基于 Redis 粘性会话的方案并非银弹。它的一个明显缺点是引入了对 Redis 的强依赖。如果 Redis 服务出现故障,我们的中间件逻辑需要有明确的降级策略。是选择“安全倒向”(fail-safe),即全部请求读主库,牺牲性能保证一致性;还是选择“可用性倒向”(fail-open),退化到可能出现数据不一致的状态,保证服务可用性?这需要根据业务的关键性来决策。

另一个问题是 TTL 的设定。5秒 是一个经验值,它需要在主从延迟的监控数据和业务对一致性的容忍度之间找到平衡。过长会给主库带来不必要的读压力,过短则可能无法完全覆盖主从延迟。

未来的迭代方向可以探索更精细化的控制。例如,能否不标记整个用户,而是标记用户修改过的某个具体资源?比如 user_profile_read_primary:<user_id>。这可以避免用户在一个模块的写操作影响到另一个完全不相关模块的读性能。更进一步,可以探索基于 CDC(Change Data Capture)的方案,通过订阅数据库的 binlog 或 WAL 日志来主动失效缓存或更新状态,而不是依赖一个固定的 TTL,但这会极大地增加系统复杂度,需要仔细权衡投入产出比。


  目录