构建混合数据模型的实时特征平台 结合 Dask Dgraph Firestore 与 Valtio 的架构权衡


项目启动时,我们面临一个棘手的技术需求:构建一个用户画像系统,它不仅需要向前端实时推送用户的最新特征(如“最近活跃度”、“购买倾向指数”),还需要支持对用户之间深层、复杂的关系网络进行即时分析(如“探索与该用户有三度社交关联且具有相似购物偏好的群体”)。前者要求极低的读取延迟和实时更新能力,后者则需要强大的图计算和遍历能力。

单一数据库方案很快被否决。如果只用 Firestore 这样的文档数据库,实时推送和简单的键值查询性能优异,但执行多层关系查询几乎是不可能的,需要在应用层做大量的循环和拼接,性能会随着关系深度的增加呈指数级下降。反之,如果只用 Dgraph 这样的图数据库,虽然能完美处理复杂关系查询,但将其用于高并发、低延迟的用户单点特征读取场景,不仅是功能上的冗余,其事务模型和数据结构也不如文档数据库那样为前端应用优化得好。

这导向了一个清晰的结论:采用多模数据库(Polyglot Persistence)架构。让每种数据存储只做它最擅长的事情。我们的最终选型是 Dask 用于离线数据处理,Dgraph 存储关系图谱,Firestore 存储实时特征,而前端则使用 Valtio 来聚合这两个异构数据源的状态。

架构决策:为何是 Dask + Dgraph + Firestore?

这个决策的核心是数据流和读写模式的分离。

  1. Dask:海量数据处理的引擎
    我们的原始数据源是存储在对象存储中的数十 TB 的用户行为日志(Parquet 格式)。我们需要一个能够并行处理这些数据、计算用户特征并提取实体间关系的工具。Dask 是一个自然的选择,它能用熟悉的 Pandas 和 NumPy API 来操作无法装入单机内存的数据集,并能轻松扩展到多机集群。在真实项目中,这意味着我们可以用较低的成本完成大规模的 ETL 和特征工程。

  2. Dgraph:复杂关系网络的归宿
    用户之间的社交关系、用户与产品的交互关系、产品之间的关联关系……这些构成了一个巨大的图。Dgraph 原生支持 GraphQL,并且其底层设计就是为图遍历优化的。将这些关系数据存入 Dgraph,意味着我们可以用一条简单的 GraphQL 查询替代传统数据库中需要多次 JOIN 甚至递归查询的复杂操作。

  3. Firestore:面向前端的实时特征缓存层
    Dask 计算出的用户特征,例如 last_active_timestamp, propensity_score,是扁平的键值对结构。这类数据需要被前端以极低的延迟获取,并且当特征更新时,UI 应当实时响应。Firestore 的实时监听能力和为客户端 SDK 优化的数据结构使其成为这个场景下的不二之选。我们把它看作一个持久化的、实时的、面向最终用户的“物化视图”。

  4. Valtio:弥合前端状态的裂缝
    前端需要同时消费来自 Firestore 的实时数据和来自 Dgraph 的按需图查询数据。这两种数据的更新频率和获取方式完全不同。使用 Valtio 这种基于 Proxy 的状态管理器,我们可以将这两个数据源的逻辑封装在统一的状态对象中,而 UI 组件只需订阅这个状态,无需关心数据具体来自哪里,从而极大简化了前端的复杂度。

整体数据流如下所示:

graph TD
    subgraph "离线处理层 (Batch Processing)"
        A[S3/GCS: Raw Logs in Parquet] --> B(Dask Cluster);
    end

    subgraph "数据持久化层 (Persistence Layer)"
        B --> C{Dgraph: Graph Data};
        B --> D[Firestore: Real-time Features];
    end

    subgraph "服务与展现层 (Serving & Presentation)"
        E[Backend API Gateway] --> C;
        E --> D;
        F[React Frontend] --> E;
        F -- Real-time Updates --> D;
    end

    subgraph "前端状态管理"
        G(Valtio Store)
    end

    F -- Manages State --> G;

核心实现:从数据处理到前端呈现

下面我们将通过代码来展示这个架构的关键环节。

1. Dask 并行处理与双写数据库

这是整个流程的起点。我们假设有一个 Dask 作业,定期运行,处理新的日志数据。

process_user_data.py

import dask.dataframe as dd
import pandas as pd
import logging
from typing import Dict, Any
from config import DaskConfig, FirestoreConfig, DgraphConfig

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 模拟数据库客户端(在生产中应使用真实的SDK)
class MockFirestoreClient:
    def collection(self, name):
        logging.info(f"[Firestore] Accessing collection: {name}")
        return self
    def document(self, doc_id):
        logging.info(f"[Firestore] Accessing document: {doc_id}")
        return self
    def set(self, data, merge=False):
        logging.info(f"[Firestore] Setting data (merge={merge}): {data}")
        # 在真实实现中,这里会有网络调用
        pass

class MockDgraphClient:
    def txn(self):
        return self
    def mutate(self, set_nquads):
        logging.info(f"[Dgraph] Mutating nquads:\n{set_nquads}")
        # 在真实实现中,这里会有网络调用
        pass
    def commit(self):
        logging.info("[Dgraph] Committing transaction.")
        pass
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.commit()
        else:
            logging.error(f"Transaction failed: {exc_val}")


# 实际项目中,这里会是真实的客户端初始化
# from google.cloud import firestore
# import pydgraph
# firestore_client = firestore.Client(project=FirestoreConfig.PROJECT_ID)
# dgraph_client_stub = pydgraph.DgraphClientStub(f"{DgraphConfig.HOST}:{DgraphConfig.PORT}")
# dgraph_client = pydgraph.DgraphClient(dgraph_client_stub)

firestore_client = MockFirestoreClient()
dgraph_client = MockDgraphClient()

def process_partition(df: pd.DataFrame) -> int:
    """
    处理单个 Pandas DataFrame 分区。
    这个函数会被 Dask 并行调用。
    """
    processed_count = 0
    try:
        user_features = {}
        mutations = []

        for row in df.itertuples():
            user_id = str(row.user_id)
            
            # 1. 计算扁平化的用户特征 (用于 Firestore)
            if user_id not in user_features:
                user_features[user_id] = {'interaction_count': 0, 'last_seen': row.timestamp}
            user_features[user_id]['interaction_count'] += 1
            if row.timestamp > user_features[user_id]['last_seen']:
                user_features[user_id]['last_seen'] = str(row.timestamp)

            # 2. 提取图关系 (用于 Dgraph)
            # 这里的 N-Quads 格式是 Dgraph 的标准输入格式
            mutations.append(f'_:user_{user_id} <dgraph.type> "User" .')
            mutations.append(f'_:user_{user_id} <user_id> "{user_id}" .')
            
            if hasattr(row, 'friend_id') and pd.notna(row.friend_id):
                friend_id = str(int(row.friend_id))
                mutations.append(f'_:user_{user_id} <knows> _:user_{friend_id} .')
                mutations.append(f'_:user_{friend_id} <dgraph.type> "User" .')
                mutations.append(f'_:user_{friend_id} <user_id> "{friend_id}" .')

            processed_count += 1

        # 批量写入 Firestore
        # 在真实项目中,这里应该使用 Firestore 的 BatchWriter 来提高性能
        for user_id, features in user_features.items():
            doc_ref = firestore_client.collection('user_features').document(user_id)
            doc_ref.set(features, merge=True)

        # 批量写入 Dgraph
        nquads_payload = "\n".join(mutations)
        if nquads_payload:
            with dgraph_client.txn() as txn:
                txn.mutate(set_nquads=nquads_payload)
        
        logging.info(f"Processed partition with {len(df)} rows. Found {len(user_features)} users.")
        return processed_count

    except Exception as e:
        logging.error(f"Error processing partition: {e}", exc_info=True)
        # 这里的错误处理很关键,是保证作业稳定性的基础
        # 可以加入重试机制或者将失败的分区信息记录到死信队列
        return 0

def run_feature_engineering_job(source_path: str):
    """
    主作业函数,使用 Dask 读取数据并分发处理任务。
    """
    logging.info(f"Starting job. Reading data from {source_path}")
    
    # 使用 Dask 读取大型 Parquet 文件集
    # blocksize 控制了每个分区的大小,这是性能调优的关键参数
    ddf = dd.read_parquet(
        source_path,
        engine='pyarrow',
        blocksize=DaskConfig.BLOCK_SIZE
    )

    # 在 Dask 中,一个常见的错误是直接在 ddf 上进行迭代或操作。
    # 正确的方式是使用 map_partitions,它将函数应用到每个底层的 Pandas DataFrame。
    # meta 参数定义了输出的结构,有助于 Dask 优化执行计划。
    result = ddf.map_partitions(process_partition, meta=(None, 'int64')).compute()
    
    total_processed = result.sum()
    logging.info(f"Job finished. Total rows processed: {total_processed}")


if __name__ == "__main__":
    # 模拟一个 Parquet 数据源目录
    # 在生产环境中,这会指向 S3 或 GCS 上的真实路径
    # e.g., "s3://my-data-bucket/user-logs/date=2023-10-27/"
    mock_source_path = "./mock_data.parquet"
    
    # 创建模拟数据
    mock_df = pd.DataFrame({
        'user_id': [101, 102, 101, 103, 102],
        'timestamp': pd.to_datetime(['2023-10-27 10:00', '2023-10-27 10:05', '2023-10-27 10:10', '2023-10-27 10:15', '2023-10-27 10:20']),
        'action': ['login', 'view_item', 'add_to_cart', 'login', 'knows_friend'],
        'friend_id': [None, None, None, None, 101]
    })
    mock_df.to_parquet(mock_source_path)

    run_feature_engineering_job(mock_source_path)

这里的关键在于 process_partition 函数。它在一个原子操作单元(一个 Pandas DataFrame)内,同时准备了写入 Firestore 的数据和写入 Dgraph 的 N-Quads。这个双写操作的原子性是一个挑战。当前实现不是严格事务性的,Dask 作业的重试可能导致数据不一致。在生产环境中,一个更健壮的方案是 Dask 先将处理结果写入一个可靠的消息队列(如 Kafka),然后由两个独立的消费者分别写入 Dgraph 和 Firestore,通过消费者组的位移来保证至少一次或精确一次的处理。但对于可接受最终一致性的特征计算场景,当前方案因其简单性而具备优势。

2. 后端 API 网关:聚合异构数据源

后端需要提供一个接口,能够根据请求,智能地从 Dgraph 或组合两个数据源的数据返回给前端。我们使用 FastAPI 来演示。

main.py

from fastapi import FastAPI, HTTPException
import httpx
import logging
from typing import Dict, Any

# 配置
DGRAPH_GQL_ENDPOINT = "http://localhost:8080/graphql" # Dgraph GraphQL endpoint

app = FastAPI()
logger = logging.getLogger("api")

# Dgraph 查询模板
# 使用参数化查询防止注入
FIND_FRIENDS_OF_FRIENDS_QUERY = """
query GetRelatedUsers($userId: string, $depth: int) {
  queryUser(filter: { user_id: { eq: $userId } }) {
    user_id
    knows @cascade(depth: $depth) {
      user_id
    }
  }
}
"""

@app.get("/user/{user_id}/graph")
async def get_user_graph_data(user_id: str, depth: int = 2):
    """
    从 Dgraph 获取用户的图关系数据。
    这里的坑在于超时和错误处理。对图数据库的深度查询可能非常耗时。
    """
    if depth > 3:
        # 保护性编程:限制查询深度,防止恶意请求拖垮数据库
        raise HTTPException(status_code=400, detail="Query depth cannot exceed 3.")

    variables = {"userId": user_id, "depth": depth}
    
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await client.post(
                DGRAPH_GQL_ENDPOINT,
                json={"query": FIND_FRIENDS_OF_FRIENDS_QUERY, "variables": variables},
            )
            response.raise_for_status() # 如果状态码是 4xx 或 5xx,则抛出异常
            
            data = response.json()
            if data.get("errors"):
                logger.error(f"Dgraph query failed: {data['errors']}")
                raise HTTPException(status_code=500, detail="Error querying graph database.")
            
            return data.get("data", {})

        except httpx.ReadTimeout:
            logger.warning(f"Dgraph query timed out for user {user_id} with depth {depth}")
            raise HTTPException(status_code=504, detail="Graph query timed out.")
        except Exception as e:
            logger.error(f"An unexpected error occurred: {e}", exc_info=True)
            raise HTTPException(status_code=500, detail="Internal server error.")

这个 API 端点只查询 Dgraph。在更复杂的场景中,一个端点可能会先从 Firestore 获取用户的基本特征,再根据某些特征决定是否需要去 Dgraph 进行深度图查询,最后将结果合并返回。这种模式将复杂性隔离在了后端,为前端提供了清晰、统一的接口。

3. 前端状态管理:Valtio 的优雅之道

前端的挑战在于如何优雅地管理两个来源的数据:一个是来自 Firestore 的实时推送,另一个是用户手动触发的、对后端 API 的异步请求(查询 Dgraph)。

store.ts

import { proxy, subscribe } from 'valtio';
import { onSnapshot, doc, DocumentData } from 'firebase/firestore';
import { firestore } from './firebaseConfig'; // 假设这是你的 Firebase 初始化文件

interface UserGraph {
  loading: boolean;
  error: string | null;
  data: any; // 实际项目中应定义更精确的类型
}

interface AppState {
  currentUser: {
    id: string | null;
    features: DocumentData | null;
    unsubscribe: (() => void) | null;
  };
  userGraph: UserGraph;
}

// Valtio 的核心:一个可变的代理对象
export const state = proxy<AppState>({
  currentUser: {
    id: null,
    features: null,
    unsubscribe: null,
  },
  userGraph: {
    loading: false,
    error: null,
    data: null,
  },
});

// --- Actions ---

// 监听指定用户的实时特征
export function watchUserFeatures(userId: string) {
  if (state.currentUser.unsubscribe) {
    state.currentUser.unsubscribe(); // 取消上一个监听
  }
  
  if (!userId) {
    state.currentUser.id = null;
    state.currentUser.features = null;
    state.currentUser.unsubscribe = null;
    return;
  }

  state.currentUser.id = userId;

  const docRef = doc(firestore, 'user_features', userId);
  
  // onSnapshot 会在数据首次加载和后续每次变更时触发
  const unsubscribe = onSnapshot(docRef, (docSnap) => {
    if (docSnap.exists()) {
      // 直接修改 proxy 对象,Valtio 会自动通知相关组件
      state.currentUser.features = docSnap.data();
    } else {
      console.warn(`User features for ${userId} not found.`);
      state.currentUser.features = null;
    }
  }, (error) => {
    console.error("Firestore subscription error:", error);
    state.currentUser.features = null; // 清理状态
  });

  state.currentUser.unsubscribe = unsubscribe;
}

// 从我们的后端 API 获取图数据
export async function fetchUserGraph(userId: string, depth: number) {
  if (!userId) return;

  state.userGraph.loading = true;
  state.userGraph.error = null;

  try {
    const response = await fetch(`/api/user/${userId}/graph?depth=${depth}`);
    if (!response.ok) {
      throw new Error(`API request failed with status ${response.status}`);
    }
    const data = await response.json();
    state.userGraph.data = data;
  } catch (error: any) {
    state.userGraph.error = error.message;
    state.userGraph.data = null; // 出错时清空数据
  } finally {
    state.userGraph.loading = false;
  }
}

// 这是一个单元测试的思路
// 我们可以模拟 firestore 和 fetch,然后调用 action,检查 state 的变化
// e.g. using vitest
// test('watchUserFeatures should update state', () => {
//   // mock onSnapshot
//   watchUserFeatures('user123');
//   // assert that state.currentUser.features is updated
// });

UserProfile.tsx (React Component)

import React, { useEffect } from 'react';
import { useSnapshot } from 'valtio';
import { state, watchUserFeatures, fetchUserGraph } from './store';

const UserProfile: React.FC<{ userId: string }> = ({ userId }) => {
  // useSnapshot 创建了对 state 的一个不可变快照
  // 当 state 变化时,组件会自动重渲染
  const snap = useSnapshot(state);

  useEffect(() => {
    watchUserFeatures(userId);
    // 组件卸载时,清理 Firestore 监听器
    return () => {
      if (snap.currentUser.unsubscribe) {
        snap.currentUser.unsubscribe();
      }
    };
  }, [userId]); // 当 userId 变化时,重新设置监听

  const handleFetchGraph = () => {
    fetchUserGraph(userId, 2);
  };

  return (
    <div>
      <h1>User Profile: {userId}</h1>
      
      <div style={{ border: '1px solid #ccc', padding: '10px', marginBottom: '20px' }}>
        <h2>Real-time Features (from Firestore)</h2>
        {snap.currentUser.features ? (
          <ul>
            {Object.entries(snap.currentUser.features).map(([key, value]) => (
              <li key={key}><strong>{key}:</strong> {JSON.stringify(value)}</li>
            ))}
          </ul>
        ) : (
          <p>Loading real-time features...</p>
        )}
      </div>

      <div style={{ border: '1px solid #ccc', padding: '10px' }}>
        <h2>Graph Relationships (from Dgraph)</h2>
        <button onClick={handleFetchGraph} disabled={snap.userGraph.loading}>
          {snap.userGraph.loading ? 'Loading...' : 'Fetch 2-degree Connections'}
        </button>
        {snap.userGraph.error && <p style={{ color: 'red' }}>Error: {snap.userGraph.error}</p>}
        {snap.userGraph.data && (
          <pre>{JSON.stringify(snap.userGraph.data, null, 2)}</pre>
        )}
      </div>
    </div>
  );
};

export default UserProfile;

Valtio 的美妙之处在于其极简的 API。我们只需要在一个 proxy 对象上执行异步操作来修改数据,任何使用了 useSnapshot 的组件都会自动、高效地更新。这里,UserProfile 组件完全不知道 currentUser.features 来自 Firestore 的实时流,而 userGraph.data 来自一次性的 fetch 请求。这种关注点分离使得组件逻辑保持纯粹,而数据获取的复杂性则被封装在 store.ts 的 action 中。

架构的局限性与未来迭代

此架构并非没有代价。最主要的挑战是维护两个数据库之间的数据一致性。Dask 作业的双写操作在失败和重试时可能导致 Firestore 和 Dgraph 之间的数据出现偏差。一个长期的优化路径是引入变更数据捕获(CDC)机制。例如,Dask 可以只写入一个主数据库(如 PostgreSQL 或 TiDB),然后使用 Debezium 等工具捕获变更,再通过 Kafka 将这些变更流式传输到 Dgraph 和 Firestore 的物化视图构建器中。这将以增加系统复杂性为代价,换取更强的数据一致性保证。

另一个考量是运维成本。管理一个 Dask 集群、一个 Dgraph 集群和一个 Firestore 实例,比管理单一数据库要复杂得多,需要团队在监控、备份、容灾方面投入更多精力。

最后,前端状态聚合也存在优化空间。当图数据变得非常庞大时,一次性将其全部加载到前端状态中是不明智的。可以考虑在前端实现虚拟化渲染,或者后端 API 支持对图数据进行分页,让 Valtio store 只持有当前视图所需的数据。


  目录