采用TDD方法构建集成Pandas、Milvus与MobX的实时特征探索系统


我们面临的第一个挑战,是如何缩短机器学习中特征工程的反馈周期。传统的流程是分离的:数据科学家在Jupyter Notebook中用Pandas探索、清洗、构建特征,然后将特征向量批量导入一个系统进行评估或检索。这个过程充满了等待和上下文切换。如果能构建一个交互式系统,让工程师能实时定义特征、并立即看到这些特征在向量空间中的“相似性”表现,整个探索效率将得到质的提升。

构想是一个闭环系统:前端提供一个界面,用户可以输入类似Pandas的转换逻辑;后端接收逻辑,在数据集上执行,生成特征向量,并将其存入向量数据库;最后,系统立即基于一个样本向量进行相似性搜索,并将结果返回前端。这个闭环必须是毫秒级的,并且核心的数据处理逻辑必须是绝对可靠的。

要实现这个目标,技术栈的选择至关重要。

  • 数据处理核心: Pandas。这是处理结构化数据无可争议的标准库,其丰富的数据处理能力是我们系统的基石。
  • 实时向量检索: Milvus。作为一个高性能的向量数据库,它能处理我们实时生成并插入的向量,并提供瞬时的相似性搜索。
  • 后端服务框架: FastAPI。其异步特性和依赖注入系统非常适合构建这种IO密集型且需要清晰分层的服务。
  • 前端状态管理: MobX。前端UI的状态会变得极其复杂:原始数据、用户输入的特征公式、计算后的新特征列、向量检索结果、加载状态等等。MobX的响应式模型能以最小的模板代码优雅地处理这种衍生状态的连锁反应。
  • 开发方法论: 测试驱动开发 (TDD)。数据管道的正确性是系统的生命线。任何一个特征计算的微小错误都可能导致模型训练的失败。因此,我们必须采用TDD,为每一个核心转换逻辑编写测试,确保系统的稳定性和可维护性。

整个项目的成败,取决于能否将这四种看似无关的技术无缝地粘合在一起,并通过TDD保证其健壮性。

TDD驱动的核心数据管道开发

我们的起点不是API,也不是UI,而是最核心的数据处理模块:FeatureEngineeringService。它的职责是接收原始数据和用户的特征工程“配方”,使用Pandas执行计算,然后将结果向量化并提交给Milvus。根据TDD的原则,我们先写测试。

步骤1: 定义测试场景与依赖模拟

在真实项目中,直接依赖一个运行中的Milvus实例进行单元测试是不可取的。测试应该快速、隔离且确定。因此,我们需要模拟pymilvus客户端。unittest.mock是我们的首选工具。

我们的第一个测试用例将验证一个完整的流程:接收一个DataFrame和一条特征生成指令,计算新特征,生成向量,并确保这些向量被正确地格式化并传递给Milvus客户端的upsert方法。

# tests/test_feature_service.py
import pytest
import pandas as pd
import numpy as np
from unittest.mock import MagicMock, patch

from app.services.feature_service import FeatureEngineeringService, MilvusConfig

# 使用fixture来创建一个可复用的服务实例和mock
@pytest.fixture
def mock_milvus_client():
    """创建一个模拟的MilvusCollection对象"""
    with patch('app.services.feature_service.Collection') as mock_collection_class:
        mock_collection_instance = MagicMock()
        mock_collection_class.return_value = mock_collection_instance
        yield mock_collection_instance

@pytest.fixture
def feature_service(mock_milvus_client):
    """创建一个注入了模拟Milvus客户端的服务实例"""
    # 在测试中,我们不需要真实的连接信息
    config = MilvusConfig(
        uri="grpc://localhost:19530",
        collection_name="test_collection"
    )
    # 模拟Collection的构造函数返回我们的mock实例
    with patch('app.services.feature_service.Collection', return_value=mock_milvus_client):
        service = FeatureEngineeringService(config)
        # 确保模拟的集合在服务内部被正确设置
        service.collection = mock_milvus_client
        yield service

# 我们的第一个失败测试
def test_process_and_upsert_generates_correct_vectors(feature_service, mock_milvus_client):
    """
    测试核心流程:根据配方生成特征,并调用Milvus upsert。
    这是一个集成测试,但其外部依赖(Milvus)被模拟了。
    """
    # 1. 准备输入数据 (Arrange)
    initial_data = pd.DataFrame({
        'id': [1, 2, 3],
        'price': [10.0, 20.0, 30.0],
        'quantity': [5, 10, 2]
    })
    # 用户输入的特征工程配方
    recipe = "df['total_value'] = df['price'] * df['quantity']"
    # 我们期望用于向量化的列
    vector_columns = ['price', 'quantity', 'total_value']

    # 2. 执行操作 (Act)
    result_df = feature_service.process_and_upsert(initial_data, recipe, vector_columns, 'id')

    # 3. 断言结果 (Assert)
    # 3.1 验证返回的DataFrame是否正确计算了新列
    expected_total_value = pd.Series([50.0, 200.0, 60.0], name='total_value')
    pd.testing.assert_series_equal(result_df['total_value'], expected_total_value, check_names=False)
    assert 'total_value' in result_df.columns

    # 3.2 验证Milvus的upsert方法是否被正确调用
    mock_milvus_client.upsert.assert_called_once()
    
    # 3.3 深入验证传递给upsert的数据结构和内容
    # 获取调用upsert时的实际参数
    args, kwargs = mock_milvus_client.upsert.call_args
    upserted_data = args[0]
    
    # 验证数据结构
    assert isinstance(upserted_data, list)
    assert len(upserted_data) == 2 # 应该包含主键和向量两部分
    
    # 验证主键
    assert upserted_data[0] == [1, 2, 3]

    # 验证向量数据
    expected_vectors = np.array([
        [10.0, 5.0, 50.0],
        [20.0, 10.0, 200.0],
        [30.0, 2.0, 60.0]
    ]).tolist()
    assert np.allclose(upserted_data[1], expected_vectors)

def test_process_with_invalid_recipe_raises_error(feature_service):
    """
    测试当配方包含语法错误或逻辑错误时,系统能优雅地处理异常。
    """
    initial_data = pd.DataFrame({'id': [1], 'col_a': [10]})
    # 一个无效的配方,引用了不存在的列
    invalid_recipe = "df['new_col'] = df['non_existent_col'] + 1"

    with pytest.raises(Exception) as excinfo:
        feature_service.process_and_upsert(initial_data, invalid_recipe, ['new_col'], 'id')
    
    # 我们可以断言异常类型或者消息内容,以确保错误处理符合预期
    assert "non_existent_col" in str(excinfo.value).lower()

现在运行pytesttest_process_and_upsert_generates_correct_vectors会失败,因为FeatureEngineeringService及其方法还不存在。这是TDD的“红灯”阶段。

步骤2: 编写最小化实现使其通过

接下来,我们编写FeatureEngineeringService的代码,目标仅仅是让上面的测试通过。

# app/services/feature_service.py
import pandas as pd
import logging
from pydantic import BaseModel
from pymilvus import Collection, utility, connections

# 配置类,用于解耦连接细节
class MilvusConfig(BaseModel):
    uri: str
    collection_name: str

class FeatureEngineeringService:
    def __init__(self, config: MilvusConfig):
        self.config = config
        self.collection = None
        self._connect_to_milvus()
        self._load_collection()

    def _connect_to_milvus(self):
        try:
            # 使用别名 'default',pymilvus的很多操作都默认使用它
            connections.connect("default", uri=self.config.uri)
            logging.info("Successfully connected to Milvus.")
        except Exception as e:
            logging.error(f"Failed to connect to Milvus: {e}")
            raise

    def _load_collection(self):
        try:
            if not utility.has_collection(self.config.collection_name):
                # 在真实应用中,集合的创建应该由独立的部署脚本管理
                # 这里为了简单起见,我们假设它已存在
                raise ValueError(f"Collection '{self.config.collection_name}' does not exist.")
            self.collection = Collection(self.config.collection_name)
            self.collection.load()
            logging.info(f"Successfully loaded collection: {self.config.collection_name}")
        except Exception as e:
            logging.error(f"Failed to load Milvus collection: {e}")
            raise

    def process_and_upsert(self, df: pd.DataFrame, recipe: str, vector_columns: list[str], pk_field: str) -> pd.DataFrame:
        """
        执行特征工程并更新到Milvus。
        这里的'recipe'执行是一个安全风险点,在生产环境中需要沙箱化。
        """
        if not isinstance(df, pd.DataFrame):
            raise TypeError("Input 'df' must be a pandas DataFrame.")

        # 复制DataFrame以避免副作用
        processed_df = df.copy()
        
        # 执行用户定义的配方
        # 警告: exec在生产环境中是不安全的。后续需要替换为安全的DSL解析器。
        try:
            exec_globals = {'df': processed_df}
            exec(recipe, exec_globals)
            processed_df = exec_globals['df']
        except Exception as e:
            logging.error(f"Error executing user recipe: {recipe}. Error: {e}")
            raise # 将原始异常抛出,让上层处理

        # 确保所有向量列都存在
        for col in vector_columns:
            if col not in processed_df.columns:
                raise ValueError(f"Vector column '{col}' not found after processing recipe.")

        # 提取主键和向量
        pks = processed_df[pk_field].tolist()
        vectors = processed_df[vector_columns].to_numpy().tolist()

        # 准备要upsert的数据
        data_to_upsert = [pks, vectors]

        # 调用Milvus客户端
        try:
            self.collection.upsert(data_to_upsert)
            # Milvus的upsert是异步的,在需要强一致性的场景下可能需要flush
            # self.collection.flush() 
            logging.info(f"Upserted {len(pks)} vectors into collection '{self.config.collection_name}'.")
        except Exception as e:
            logging.error(f"Failed to upsert data to Milvus: {e}")
            raise

        return processed_df

    def search_similar(self, vector: list[float], limit: int = 10) -> list[dict]:
        """
        在Milvus中执行向量相似性搜索
        """
        search_params = {
            "metric_type": "L2",
            "params": {"nprobe": 10},
        }
        
        try:
            results = self.collection.search(
                data=[vector],
                anns_field="vector", # 假设向量字段名为'vector'
                param=search_params,
                limit=limit,
                output_fields=["id", "price"] # 示例:同时返回一些元数据字段
            )
            
            # 解析结果
            hits = []
            for hit in results[0]:
                entity = hit.entity
                hits.append({
                    "id": hit.id,
                    "distance": hit.distance,
                    "entity": {
                        "id": entity.get("id"),
                        "price": entity.get("price")
                    }
                })
            return hits
        except Exception as e:
            logging.error(f"Failed to search in Milvus: {e}")
            raise

再次运行pytest,现在所有测试都应该通过了。我们有了一个经过测试、行为确定的核心服务。这就是TDD带来的信心。

构建FastAPI服务层

有了核心服务,我们可以轻松地将其包装成一个Web API。FastAPI的依赖注入系统让这件事变得非常简单。

# app/main.py
import pandas as pd
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field
from typing import List, Dict, Any

from app.services.feature_service import FeatureEngineeringService, MilvusConfig

# ---- API 数据模型 ----
class FeatureRequest(BaseModel):
    # Base64编码的CSV字符串或其他序列化格式
    csv_data: str = Field(..., description="Base64 encoded CSV data of the dataframe.")
    recipe: str = Field(..., description="A python expression to be executed on the dataframe named 'df'.")
    vector_columns: List[str] = Field(..., description="Columns to be used for the vector embedding.")
    pk_field: str = Field("id", description="The primary key field in the data.")

class SearchRequest(BaseModel):
    vector: List[float]
    limit: int = 10

# ---- 依赖注入 ----
# 在应用启动时创建单例
milvus_config = MilvusConfig(uri="grpc://milvus-standalone:19530", collection_name="feature_explore")
feature_service_instance = FeatureEngineeringService(milvus_config)

def get_feature_service():
    return feature_service_instance

# ---- FastAPI 应用 ----
app = FastAPI(title="Real-time Feature Exploration API")

@app.post("/process-features", response_model=Dict[str, Any])
def process_features_endpoint(
    request: FeatureRequest,
    service: FeatureEngineeringService = Depends(get_feature_service)
):
    try:
        # 在真实应用中,数据量大时需要流式处理
        import base64
        import io
        decoded_csv = base64.b64decode(request.csv_data)
        df = pd.read_csv(io.StringIO(decoded_csv.decode('utf-8')))

        processed_df = service.process_and_upsert(
            df, request.recipe, request.vector_columns, request.pk_field
        )
        
        return {"status": "success", "rows_processed": len(processed_df)}

    except Exception as e:
        # 捕获服务层抛出的所有异常,并以HTTP错误形式返回
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/search", response_model=List[Dict[str, Any]])
def search_endpoint(
    request: SearchRequest,
    service: FeatureEngineeringService = Depends(get_feature_service)
):
    try:
        results = service.search_similar(request.vector, request.limit)
        return results
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

这个API层很薄,它的主要职责是序列化/反序列化、HTTP协议处理和异常转换。所有的业务逻辑都被封装在经过我们严格测试的FeatureEngineeringService中。

MobX驱动的响应式前端

现在转向前端。这里的核心是FeatureStore,它将管理我们所有的UI状态。

// src/stores/FeatureStore.ts
import { makeAutoObservable, runInAction, computed } from "mobx";
import axios from "axios";
import { parse, unparse } from "papaparse"; // 用于处理CSV

// 定义API客户端
const apiClient = axios.create({
  baseURL: "/api", // 假设我们用代理转发到FastAPI
});

export class FeatureStore {
  // --- Observables (核心状态) ---
  rawCsvData: string = `id,price,quantity\n1,10.0,5\n2,20.0,10\n3,30.0,2`;
  featureRecipe: string = "df['total_value'] = df['price'] * df['quantity']";
  vectorColumns: string[] = ["price", "quantity", "total_value"];
  pkField: string = "id";
  
  // --- State for async operations ---
  isLoading: boolean = false;
  error: string | null = null;

  // --- Derived state (衍生状态) ---
  searchResults: any[] = [];
  
  constructor() {
    makeAutoObservable(this, {
      parsedData: computed, // 显式标记计算属性
    });
    // 首次加载时自动处理
    this.processFeatures();
  }

  // --- Actions (修改状态的方法) ---
  setRawCsvData(data: string) {
    this.rawCsvData = data;
  }

  setFeatureRecipe(recipe: string) {
    this.featureRecipe = recipe;
  }

  // 核心业务动作:处理特征并上传
  async processFeatures() {
    if (this.isLoading) return;
    
    this.isLoading = true;
    this.error = null;

    try {
      const b64Csv = btoa(this.rawCsvData); // Base64编码
      
      await apiClient.post("/process-features", {
        csv_data: b64Csv,
        recipe: this.featureRecipe,
        vector_columns: this.vectorColumns,
        pk_field: this.pkField,
      });

      // 成功后清除状态
      runInAction(() => {
        this.isLoading = false;
      });

    } catch (err: any) {
      runInAction(() => {
        this.isLoading = false;
        this.error = err.response?.data?.detail || "An unknown error occurred.";
      });
    }
  }
  
  // 另一个业务动作:执行搜索
  async searchSimilar(vector: number[]) {
    if (this.isLoading) return;

    this.isLoading = true;
    this.error = null;
    
    try {
      const response = await apiClient.post("/search", { vector });
      runInAction(() => {
        this.searchResults = response.data;
        this.isLoading = false;
      });
    } catch (err: any) {
       runInAction(() => {
        this.isLoading = false;
        this.error = err.response?.data?.detail || "Search failed.";
        this.searchResults = [];
      });
    }
  }

  // --- Computed (从核心状态自动计算出的衍生值) ---
  get parsedData(): any[] {
    // 使用PapaParse安全地解析CSV字符串
    const result = parse(this.rawCsvData, { header: true, dynamicTyping: true });
    if (result.errors.length > 0) {
        console.error("CSV parsing errors:", result.errors);
        // 在真实应用中,这里应该更新一个error state
        return [];
    }
    return result.data.filter((row: any) => row[this.pkField] != null); // 过滤空行
  }
}

// 创建一个全局单例
export const featureStore = new FeatureStore();

这个Store的设计体现了MobX的精髓:

  1. @observable(通过makeAutoObservable自动应用)的rawCsvDatafeatureRecipe是“状态源”。
  2. @computedparsedData是衍生状态,它依赖于rawCsvData。当rawCsvData改变时,parsedData会自动重新计算并触发UI更新,无需手动干预。
  3. @actionprocessFeaturessearchSimilar是唯一可以修改状态的地方,它们封装了异步逻辑,并通过runInAction安全地更新状态。

在React组件中消费这个Store会非常直观:

// src/components/Workbench.tsx
import React from 'react';
import { observer } from 'mobx-react-lite';
import { featureStore } from '../stores/FeatureStore';
import { reaction } from 'mobx';

const Workbench: React.FC = observer(() => {
  // 使用React的useEffect来设置响应式逻辑
  React.useEffect(() => {
    // 当配方改变时,自动触发后端处理
    const disposer = reaction(
      () => featureStore.featureRecipe,
      () => {
        featureStore.processFeatures();
      },
      { delay: 500 } // 添加防抖,避免用户输入时频繁请求
    );
    return disposer; // 组件卸载时清理reaction
  }, []);

  const handleRowClick = (rowData: any) => {
    // 当用户点击一行时,用该行数据作为查询向量进行搜索
    const vector = featureStore.vectorColumns.map(col => {
        // 尝试从动态计算的列或原始列中获取值
        const tempDf = { ...rowData };
        // 这是一个简化的模拟,实际应在后端完成
        if (featureStore.featureRecipe.includes("total_value")) {
            tempDf['total_value'] = tempDf['price'] * tempDf['quantity'];
        }
        return tempDf[col] || 0;
    });
    featureStore.searchSimilar(vector);
  };

  return (
    <div>
      {/* ...UI元素,如文本框、表格等,直接绑定featureStore的属性... */}
      <textarea
        value={featureStore.featureRecipe}
        onChange={(e) => featureStore.setFeatureRecipe(e.target.value)}
      />
      {/* ... 表格组件展示 featureStore.parsedData ... */}
      {/* ... 列表组件展示 featureStore.searchResults ... */}
    </div>
  );
});

export default Workbench;

通过reaction,我们建立了一个从UI输入(featureRecipe)到后端处理的响应式数据流,这正是我们闭环系统的核心。

系统的协同工作流

整个系统的交互流程可以用下面的图来描述:

sequenceDiagram
    participant User
    participant ReactUI
    participant MobX_Store
    participant FastAPI_Backend
    participant Pandas_Engine
    participant Milvus_DB

    User->>ReactUI: 修改特征配方 (featureRecipe)
    ReactUI->>MobX_Store: 调用 setFeatureRecipe(newRecipe)
    MobX_Store-->>ReactUI: 状态更新,UI可能显示"loading"
    
    Note over MobX_Store,FastAPI_Backend: MobX reaction 监听到变化,触发异步action

    MobX_Store->>FastAPI_Backend: POST /process-features (recipe, data)
    FastAPI_Backend->>Pandas_Engine: service.process_and_upsert()
    Pandas_Engine->>Pandas_Engine: 执行配方,计算新特征向量
    Pandas_Engine->>Milvus_DB: collection.upsert(vectors)
    Milvus_DB-->>Pandas_Engine: Upsert Acknowledged
    Pandas_Engine-->>FastAPI_Backend: 返回处理成功
    FastAPI_Backend-->>MobX_Store: HTTP 200 OK
    MobX_Store->>MobX_Store: 更新 isLoading = false
    MobX_Store-->>ReactUI: UI状态恢复

    User->>ReactUI: 点击某一行数据
    ReactUI->>MobX_Store: 调用 searchSimilar(vector)
    MobX_Store->>FastAPI_Backend: POST /search (vector)
    FastAPI_Backend->>Milvus_DB: collection.search(vector)
    Milvus_DB-->>FastAPI_Backend: 返回相似向量结果
    FastAPI_Backend-->>MobX_Store: 返回搜索结果JSON
    MobX_Store->>MobX_Store: 更新 searchResults 数组
    MobX_Store-->>ReactUI: 状态更新,UI渲染搜索结果列表

局限性与未来迭代路径

这个系统虽然实现了核心的实时反馈闭环,但在生产环境中仍有几个关键问题需要解决。

首先,exec(recipe) 是一个巨大的安全隐患。在多租户环境下,它允许任意代码执行。一个稳健的替代方案是设计一个安全的领域特定语言(DSL)或使用AST(抽象语法树)解析用户的输入,将其转换为安全的Pandas操作序列。

其次,当前实现将整个数据集加载到内存中进行处理。这对于GB级别以下的数据是可行的,但无法扩展到大数据场景。未来的架构演进可以考虑使用DaskSpark替换Pandas作为后端计算引擎,以支持分布式、内存外计算。

再者,前端的错误反馈机制比较粗糙。当用户的recipe语法错误时,后端会抛出异常,前端只是简单显示错误信息。一个更优的用户体验应该是在前端提供一个编辑器,能够对recipe进行实时语法校验,甚至提供列名自动补全功能。

最后,Milvus的集合(Collection)Schema是预先定义的。一个更灵活的系统应该允许用户动态创建和管理不同的特征集合,对应不同的特征工程实验,这需要对后端的元数据管理和Milvus的交互进行更复杂的设计。这些都是当前方案的技术边界,也是未来迭代的明确方向。


  目录