构建基于ScyllaDB与OpenTelemetry的高基数指标监控系统的实践复盘


我们的告警系统在周一早上7点15分崩溃了。不是因为它承载的业务系统出了故障,而是因为监控系统本身不堪重负。问题根源在于高基数(high cardinality)指标。随着业务从单体架构迁移到数百个微服务,再加上按租户、地域、实例ID等维度进行切分,我们的Prometheus实例的时间序列数量从百万级激增至数亿级。TSDB的索引变得臃肿不堪,查询性能一落千丈,一个稍微复杂些的仪表盘查询就能让整个系统陷入停滞。

最初的构想是寻找一个能原生处理高基数场景的存储后端。我们评估了多种方案,包括VictoriaMetrics、Thanos,甚至考虑过ClickHouse。但真正的瓶颈不仅在存储,还在于数据采集和处理的灵活性。我们需要的不是一个简单的指标存储,而是一个具备高度可扩展性的可观测性数据平台。最终,我们决定围绕ScyllaDB和OpenTelemetry构建一套全新的系统。

技术选型决策过程充满了权衡。

  • 数据库: ScyllaDB。 我们选择它的理由非常明确:对Cassandra协议的兼容性以及其基于Seastar框架的极致性能。ScyllaDB的无共享、线程绑核(thread-per-core)架构理论上能更好地应对我们这种写密集型、高并发的场景。与传统的JVM系数据库相比,它对硬件资源的压榨更彻底,也意味着更可控的P99延迟。在真实项目中,可预测的延迟比偶尔的峰值性能更重要。

  • 数据采集与传输: OpenTelemetry。 这是整个架构的基石。采用OTel标准,我们彻底将应用层代码的“埋点”与后端的“数据处理”解耦。开发者只需使用标准的OTel SDK,而平台团队可以通过配置OTel Collector来决定这些遥测数据(Traces, Metrics, Logs)被发送到哪里、如何被处理。这种灵活性是自建Agent或锁定特定厂商方案所无法比拟的。

  • Web前端与API: Ruby on Rails。 这是团队最熟悉的技术栈。对于一个内部平台而言,快速迭代和开发效率至关重要。Rails的生态系统,特别是与Sidekiq的无缝集成,为我们处理异步任务(如报表生成)提供了坚实的基础。性能对于一个数据展示仪表盘来说绰绰有余。

  • 样式与报表: Sass/SCSS 与 Matplotlib。 Sass/SCSS是Rails社区的标配,能有效组织复杂UI的样式代码。而Matplotlib则是一个看似“异类”的选择。我们的产品经理需要一些用于周报的、出版级别的复杂数据可视化图表。这些图表逻辑复杂、计算量大,不适合在浏览器端用JavaScript实时生成。我们决定用一个由Rails调度的Python进程来异步处理这些请求,这是一种务实的多语言协作方案。

第一步:为时序数据设计ScyllaDB表结构

在ScyllaDB中,Schema设计决定生死。一个错误的Partition Key设计足以让整个集群瘫痪。我们的核心痛点是高基数,因此必须将高基数的标签(label)从主键中分离,或者通过巧妙的设计来分散写压力。

我们设计的核心指标表metrics_ts如下:

-- ScyllaDB CQL for high-cardinality time series data
CREATE TABLE metrics_ts (
    metric_name text,          -- 指标名称, e.g., 'http.server.duration'
    shard_id int,              -- 人工分片键, a hash of labels
    timestamp bigint,          -- 时间戳 (Unix nanoseconds)
    value double,              -- 指标值
    labels map<text, text>,    -- 完整的标签集合 (高基数来源)
    PRIMARY KEY ((metric_name, shard_id), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
  AND compaction = { 'class' : 'TimeWindowCompactionStrategy', 'compaction_window_unit' : 'DAYS', 'compaction_window_size' : 1 };

这里的关键在于shard_id。我们不再使用所有标签的组合作为Partition Key的一部分,因为这会导致分区数量爆炸。取而代之的是,metric_nameshard_id组成了复合分区键。shard_id由所有标签经过一致性哈希算法计算得来,例如 murmur3(labels_json_string) % 128

  • 为什么这样做?
    1. 控制分区数量: 无论标签组合如何变化,一个metric_name只会被分散到固定的128个分区中。这极大地降低了元数据管理的压力。
    2. 避免热点: 通过哈希,我们将同一指标的写入请求均匀地分散到多个分区,进而分散到ScyllaDB集群的不同节点上。
    3. 查询影响: 这种设计的代价是,查询时如果不知道所有标签,就必须并发查询所有128个分片。但对于一个仪表盘来说,这种并发查询完全可以接受,而且速度远快于在单一巨大分区上进行扫描。

TimeWindowCompactionStrategy (TWCS) 是时序数据存储的不二之V择。它能确保同一时间窗口内的数据被归档到同一个SSTable中,使得过期数据删除操作极为高效,几乎没有IO开销。

第二步:配置OpenTelemetry Collector实现数据路由

OTel Collector是数据处理的瑞士军刀。我们的配置文件otel-collector-config.yaml 定义了一个清晰的管道:从应用接收OTLP协议的数据,在内存中进行批处理,然后输出到日志(用于调试)和后续的ScyllaDB写入服务。

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    # 批处理可以显著降低对下游的写入压力
    # 在真实项目中,超时和发送批次大小需要仔细调优
    timeout: 1s
    send_batch_size: 8192

exporters:
  # 在开发阶段,直接输出到控制台进行调试非常有用
  logging:
    loglevel: debug

  # 生产环境中,我们会有一个自定义的 exporter 或通过 Kafka 连接到写入服务
  # 此处为了演示,我们假设有一个 OTLP gRPC 服务负责写入 ScyllaDB
  otlp:
    endpoint: "scylla-writer-service:4317"
    tls:
      insecure: true

service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [logging, otlp]

在Rails应用中集成OTel SDK同样直接。我们在config/initializers/opentelemetry.rb中进行配置。

# config/initializers/opentelemetry.rb

require 'opentelemetry/sdk'
require 'opentelemetry/exporter/otlp'
require 'opentelemetry/instrumentation/all'

# 确保在 fork 子进程(如 Puma worker)之前配置
OpenTelemetry::SDK.configure do |c|
  # 设置服务名,这是可观测性的基本单元
  c.service_name = 'rails-dashboard-app'
  c.use_all # 自动加载所有已安装的 instrumentation

  # 在真实项目中,Collector 的地址应该来自环境变量
  # OTEL_EXPORTER_OTLP_ENDPOINT
  exporter = OpenTelemetry::Exporter::OTLP::Exporter.new(
    endpoint: ENV.fetch('OTEL_COLLECTOR_ENDPOINT', 'http://localhost:4317')
  )

  # 使用 BatchSpanProcessor 提高性能
  processor = OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(exporter)
  c.add_span_processor(processor)

  # 添加一个自定义的 processor 来附加通用属性
  c.add_span_processor(
    Class.new do
      def on_start(span, parent_context)
        span.set_attribute('deployment.environment', Rails.env)
      end
      def on_finish(span); end
      def force_flush(timeout: nil); end
      def shutdown(timeout: nil); end
    end.new
  )
end

# 获取一个 MeterProvider 来创建自定义指标
METER = OpenTelemetry.meter_provider.meter('MyAppMetrics', '1.0')

# 创建一个计数器
USER_SIGNUPS_COUNTER = METER.create_counter(
  'user.signups',
  unit: '1',
  description: 'Counts the number of user signups.'
)

在控制器中,我们可以这样记录一个自定义业务指标:

# app/controllers/users_controller.rb
class UsersController < ApplicationController
  def create
    # ... 用户创建逻辑 ...
    if @user.save
      # 这里的 attributes 会成为 ScyllaDB 中 labels map 的一部分
      USER_SIGNUPS_COUNTER.add(1, attributes: { 'signup_source' => 'web_form', 'tenant_id' => current_tenant.id })
      # ...
    end
  end
end

tenant_id就是典型的高基数标签。

第三步:构建Rails仪表盘与数据查询

Rails控制器负责从ScyllaDB中拉取数据。我们使用cassandra-driver gem来连接ScyllaDB。这里的挑战在于,一次图表渲染可能需要查询多个时间序列,并且要覆盖所有128个分片。

# app/services/metrics_query_service.rb
require 'cassandra'
require 'murmurhash3' # 用于计算 shard_id

class MetricsQueryService
  SHARD_COUNT = 128

  def initialize
    # 在生产环境中,连接池、超时、重试策略是必须配置的
    cluster = Cassandra.cluster(hosts: ['scylla1', 'scylla2'])
    @session = cluster.connect('monitoring_keyspace')
    @query_statement = @session.prepare(
      "SELECT timestamp, value FROM metrics_ts WHERE metric_name = ? AND shard_id = ? AND timestamp >= ? AND timestamp <= ?"
    )
  end

  def fetch_series(metric_name, labels, start_time, end_time)
    shard_id = calculate_shard_id(labels)

    # 执行查询并处理结果
    # 注意:真实项目中这里需要添加异常处理和日志
    results = @session.execute(
      @query_statement,
      arguments: [metric_name, shard_id, start_time.to_i * 1_000_000_000, end_time.to_i * 1_000_000_000]
    )

    results.map { |row| [row['timestamp'] / 1_000_000, row['value']] } # 转换为毫秒时间戳
  rescue Cassandra::Errors::IOError => e
    Rails.logger.error("ScyllaDB query failed: #{e.message}")
    [] # 返回空数组,避免前端崩溃
  end

  private

  def calculate_shard_id(labels)
    # 使用稳定的json序列化确保哈希值一致
    sorted_labels_json = labels.sort.to_h.to_json
    MurmurHash3::V32.str_hash(sorted_labels_json) % SHARD_COUNT
  end
end

前端部分,我们用Sass/SCSS来组织样式。BEM命名法结合Sass的嵌套规则,让组件样式清晰且易于维护。

// app/assets/stylesheets/components/_dashboard_panel.scss

.dashboard-panel {
  background-color: #ffffff;
  border: 1px solid #e0e0e0;
  border-radius: 4px;
  margin-bottom: 1.5rem;

  &__header {
    padding: 1rem 1.5rem;
    border-bottom: 1px solid #e0e0e0;
    font-weight: 600;
  }

  &__body {
    padding: 1.5rem;
    min-height: 300px;

    &--loading {
      display: flex;
      justify-content: center;
      align-items: center;
      color: #9e9e9e;
    }
  }

  &__footer {
    padding: 0.75rem 1.5rem;
    border-top: 1px solid #e0e0e0;
    font-size: 0.8rem;
    color: #757575;
    background-color: #f5f5f5;
  }
}

这种结构化CSS的方式,在大型项目中能有效避免样式冲突和污染。

第四步:Sidekiq与Matplotlib的异步报表生成

对于复杂的报表,实时查询是不现实的。我们设计了一个异步流程。

graph TD
    A[Rails Controller] -- 1. Enqueue Job --> B(Sidekiq);
    B -- 2. Process Job --> C{ReportGeneratorWorker};
    C -- 3. shell out --> D(Python Script: generate_report.py);
    D -- 4. Query --> E(ScyllaDB);
    D -- 5. Generate PNG --> F(Shared Storage);
    C -- 6. Save report path to DB --> G(PostgreSQL);
    H[User Browser] -- 7. Polls for completion --> A;
    A -- 8. Serve Image from Storage --> H;

Sidekiq Worker的代码非常简单,它的职责就是参数校验和调用外部脚本。

# app/workers/report_generator_worker.rb
class ReportGeneratorWorker
  include Sidekiq::Worker
  sidekiq_options retry: 3 # 失败自动重试

  def perform(report_id, query_params_json)
    report = Report.find(report_id)
    report.update!(status: 'generating')

    # 将参数安全地传递给外部脚本
    # 使用 Open3 捕获标准输出、标准错误和退出状态,这是生产级代码的必要实践
    cmd = [
      'python3',
      Rails.root.join('scripts', 'generate_report.py'),
      '--params',
      query_params_json,
      '--output',
      report.file_path
    ].join(' ')

    stdout, stderr, status = Open3.capture3(cmd)

    if status.success?
      report.update!(status: 'completed')
    else
      # 记录详细的错误日志
      Rails.logger.error("Report generation failed for report #{report_id}. Stderr: #{stderr}")
      report.update!(status: 'failed', error_message: stderr)
    end
  end
end

核心的Python脚本 scripts/generate_report.py 负责实际的数据处理和绘图。

# scripts/generate_report.py
import argparse
import json
import os
import pandas as pd
import matplotlib.pyplot as plt
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

def generate_report(params, output_path):
    # 此处应从安全的位置(如环境变量)读取凭证
    # 错误处理是关键
    try:
        cluster = Cluster(['scylla1', 'scylla2'])
        session = cluster.connect('monitoring_keyspace')
    except Exception as e:
        print(f"Error connecting to ScyllaDB: {e}")
        exit(1)

    # 实际查询逻辑会更复杂,可能需要并发查询多个分片
    # 这里仅为示意
    query = "SELECT timestamp, value FROM metrics_ts WHERE ... LIMIT 1000"
    rows = session.execute(query)

    df = pd.DataFrame(list(rows))
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ns')
    df.set_index('timestamp', inplace=True)

    # 使用 Matplotlib 进行绘图
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(12, 6))

    ax.plot(df.index, df['value'], label=params.get('metric_name'))
    ax.set_title('Time Series Analysis Report')
    ax.set_xlabel('Time')
    ax.set_ylabel('Value')
    ax.legend()
    fig.autofmt_xdate()

    # 确保输出目录存在
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    print(f"Report saved to {output_path}")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--params', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args()

    generate_report(json.loads(args.params), args.output)

这个架构的优势在于隔离。Python进程的内存泄漏或CPU飙升不会影响到主Rails应用的稳定性。缺点是增加了部署和运维的复杂性。

当前方案的局限性与未来展望

这套系统成功解决了我们面临的高基数写入和基本查询问题,但它并非银弹。当前的实现存在几个明显的局限性:

  1. 查询灵活性不足: 基于分片键的查询虽然高效,但对于任意标签组合的Ad-hoc查询支持非常弱。我们无法像PromQL那样进行灵活的聚合和函数计算。所有的聚合逻辑都需要在应用层手动实现。

  2. 数据降采样缺失: 系统存储的是原始数据。随着时间推移,数据量会线性增长。我们没有内置的降采样(downsampling)或数据汇总(rollups)机制来归档旧数据,这会是未来的存储成本和查询性能的一大挑战。

  3. 运维复杂性: 维护一个ScyllaDB集群、OTel Collector集群,以及一个混合语言的应用栈,对团队的技能要求更高。

未来的优化路径是清晰的。首先,我们会引入一个流处理引擎(如Apache Flink)在数据写入ScyllaDB之前进行实时的预聚合和降采样。其次,可以考虑在ScyllaDB之上构建一个专用的查询层,该层能理解指标查询语义,将类似PromQL的查询语言翻译成针对我们Schema的并发CQL查询。最后,将Python报表服务容器化,并由Kubernetes管理,可以更好地隔离资源并实现弹性伸缩,而不是简单的shell out


  目录