基于ActiveMQ与Chakra UI构建Jib容器化CV处理管道的实时监控系统


一个基于计算机视觉(CV)的图像审核服务最近遇到了瓶颈。任务提交后,业务方无法实时了解处理进度,只能被动等待结果。当任务处理延迟时,我们无法快速定位瓶颈是在消息队列积压,还是在CV模型推理缓慢。缺乏端到端的可观测性,让整个系统成了一个黑盒。直接引入复杂的APM系统如SkyWalking或Pinpoint对于这个特定、独立的系统来说显得过重,我们需要一个轻量级、自包含的解决方案。

初步的构想是构建一个实时仪表盘。这个仪表盘不仅要展示队列的基本状态(如消息积压数),更重要的是,要能追踪每个任务从进入队列到处理完成的全过程。这意味着我们需要一个机制,将前端提交的请求、消息队列中的消息、以及后端处理的CV任务通过一个唯一的标识符关联起来。

技术选型上,我们做出了如下决策:

  1. 消息队列 (ActiveMQ): 团队对JMS规范熟悉,且ActiveMQ在内部已经有应用。它的消息属性(Message Properties)是实现端到端追踪的关键,可以用来携带追踪ID和时间戳。此外,其Advisory Messages机制可以用来监控消费者连接状态,为我们提供了开箱即用的工作节点状态监控能力。
  2. CV工作节点 (Java + DJL): 主体处理逻辑使用Java实现,因为它与ActiveMQ的集成最成熟。CV能力通过Amazon的Deep Java Library (DJL) 实现,它可以方便地加载预训练的PyTorch或TensorFlow模型,比如YOLOv5用于对象检测。
  3. 容器化 (Jib): 对于Java应用,Jib提供了无需编写Dockerfile的容器化方案。它能直接集成到Maven构建流程中,生成优化的分层镜像,极大地简化了CI/CD流程,并且比docker build更快。
  4. 前端仪表盘 (Chakra UI): 需要快速构建一个功能性的内部仪表盘。Chakra UI的组件化和高可组合性非常适合这种场景,使我们能专注于数据展示逻辑,而不是繁琐的CSS样式调整。
  5. 实时通信 (WebSockets): 为了将后端状态实时推送到前端,传统的HTTP轮询效率低下且延迟高。我们选择WebSocket,由一个专门的监控服务(Monitor Service)负责聚合后端状态,并通过WebSocket长连接将数据实时推送到Chakra UI仪表盘。

整个系统的架构如下所示:

graph TD
    subgraph "用户浏览器"
        A[Chakra UI 仪表盘]
    end

    subgraph "后端服务 (Jib 容器化)"
        B(API Gateway) -- HTTP POST --> C{ActiveMQ Broker}
        C -- cv-task-queue --> D1[CV Worker 1]
        C -- cv-task-queue --> D2[CV Worker 2]
        C -- cv-task-queue --> Dn[CV Worker N]
        
        D1 -- cv-status-topic --> E[Monitor Service]
        D2 -- cv-status-topic --> E
        Dn -- cv-status-topic --> E
        
        C -- Advisory Messages --> E
    end
    
    A <-. WebSocket .-> E

    style A fill:#D6EAF8,stroke:#3498DB
    style E fill:#D5F5E3,stroke:#2ECC71
    style C fill:#FCF3CF,stroke:#F1C40F
    style D1 fill:#EBDEF0,stroke:#8E44AD
    style D2 fill:#EBDEF0,stroke:#8E44AD
    style Dn fill:#EBDEF0,stroke:#8E44AD

第一步:构建核心CV工作节点并使用Jib容器化

CV工作节点是整个系统的核心处理单元。它从cv-task-queue队列消费任务,执行模型推理,然后将带有性能指标的结果发送到cv-status-topic主题。

首先是Maven依赖配置,包含了ActiveMQ、DJL(及其YOLOv5模型)、Spring Boot和Jib插件。

<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot for simplified setup -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    
    <!-- DJL for Computer Vision -->
    <dependency>
        <groupId>ai.djl</groupId>
        <artifactId>api</artifactId>
        <version>0.26.0</version>
    </dependency>
    <dependency>
        <groupId>ai.djl.pytorch</groupId>
        <artifactId>pytorch-engine</artifactId>
        <version>0.26.0</version>
    </dependency>
    <dependency>
        <groupId>ai.djl.pytorch</groupId>
        <artifactId>pytorch-model-zoo</artifactId>
        <version>0.26.0</version>
    </dependency>
    
    <!-- Jackson for JSON processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>3.4.0</version>
            <configuration>
                <from>
                    <!-- Use an optimized base image for Java -->
                    <image>eclipse-temurin:17-jre-jammy</image>
                </from>
                <to>
                    <!-- Target image name -->
                    <image>my-registry/cv-worker:1.0.0</image>
                </to>
                <container>
                    <mainClass>com.example.cvworker.CvWorkerApplication</mainClass>
                    <ports>
                        <!-- Expose any necessary ports, though this worker might not need any -->
                    </ports>
                    <jvmFlags>
                        <jvmFlag>-Xms512m</jvmFlag>
                        <jvmFlag>-Xmx1024m</jvmFlag>
                        <jvmFlag>-Djava.awt.headless=true</jvmFlag>
                    </jvmFlags>
                </container>
            </configuration>
        </plugin>
    </plugins>
</build>

Jib的配置非常直观。我们指定了基础镜像、目标镜像名称、主类和JVM参数。在真实项目中,my-registry应替换为实际的Docker镜像仓库地址。执行mvn compile jib:build即可完成镜像构建和推送,无需本地安装Docker守护进程。

接下来是工作节点的业务逻辑。

// CvTaskListener.java
package com.example.cvworker;

import ai.djl.Application;
import ai.djl.inference.Predictor;
import ai.djl.modality.cv.Image;
import ai.djl.modality.cv.ImageFactory;
import ai.djl.modality.cv.output.DetectedObjects;
import ai.djl.repository.zoo.Criteria;
import ai.djl.repository.zoo.ZooModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Component
public class CvTaskListener {

    private static final Logger logger = LoggerFactory.getLogger(CvTaskListener.class);
    private final JmsTemplate jmsTemplate;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Predictor<Image, DetectedObjects> predictor;

    @Autowired
    public CvTaskListener(JmsTemplate jmsTemplate) throws Exception {
        this.jmsTemplate = jmsTemplate;
        
        // Initialize the DJL model once on startup
        // This is a time-consuming operation and should not be done per message.
        Criteria<Image, DetectedObjects> criteria = Criteria.builder()
                .optApplication(Application.CV.OBJECT_DETECTION)
                .setTypes(Image.class, DetectedObjects.class)
                .optFilter("backbone", "yolov5s")
                .optEngine("PyTorch")
                .build();
        ZooModel<Image, DetectedObjects> model = criteria.loadModel();
        this.predictor = model.newPredictor();
        logger.info("YOLOv5 model loaded successfully.");
    }

    @JmsListener(destination = "cv-task-queue")
    public void receiveMessage(Message message) {
        String traceId = null;
        long queueEntryTimestamp = 0L;

        try {
            // 1. Extract tracing metadata from message properties
            traceId = message.getStringProperty("traceId");
            queueEntryTimestamp = message.getLongProperty("startTimestamp");

            if (traceId == null || queueEntryTimestamp == 0) {
                logger.warn("Received a message without tracing properties. Ignoring.");
                return;
            }
            
            // Notify monitor service that processing has started
            publishStatus(traceId, queueEntryTimestamp, "PROCESSING", null, -1);

            long processingStart = System.currentTimeMillis();
            
            // 2. Process the image data
            if (!(message instanceof BytesMessage)) {
                throw new IllegalArgumentException("Message must be of type BytesMessage");
            }
            BytesMessage bytesMessage = (BytesMessage) message;
            byte[] imageBytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(imageBytes);

            Image img = ImageFactory.getInstance().fromInputStream(new ByteArrayInputStream(imageBytes));
            
            DetectedObjects detections = predictor.predict(img);
            
            long processingEnd = System.currentTimeMillis();
            long processingTimeMs = processingEnd - processingStart;

            logger.info("TraceID [{}]: CV processing completed in {} ms. Found {} objects.", 
                traceId, processingTimeMs, detections.getNumberOfObjects());
            
            // 3. Publish completion status to the status topic
            publishStatus(traceId, queueEntryTimestamp, "COMPLETED", detections.toString(), processingTimeMs);

        } catch (Exception e) {
            logger.error("TraceID [{}]: Failed to process CV task.", traceId, e);
            if (traceId != null && queueEntryTimestamp != 0) {
                try {
                    // Publish failure status
                    publishStatus(traceId, queueEntryTimestamp, "FAILED", e.getMessage(), -1);
                } catch (Exception publishEx) {
                    logger.error("TraceID [{}]: Failed to publish failure status.", traceId, publishEx);
                }
            }
            // In a real system, you would likely move the message to a Dead Letter Queue (DLQ).
            // Spring JMS can be configured to do this automatically.
        }
    }

    private void publishStatus(String traceId, long queueEntryTimestamp, String status, String result, long processingTimeMs) throws Exception {
        Map<String, Object> statusUpdate = new HashMap<>();
        statusUpdate.put("traceId", traceId);
        statusUpdate.put("status", status);
        statusUpdate.put("result", result);
        statusUpdate.put("processingTimeMs", processingTimeMs);
        statusUpdate.put("queueEntryTimestamp", queueEntryTimestamp);
        statusUpdate.put("eventTimestamp", System.currentTimeMillis());
        
        String jsonPayload = objectMapper.writeValueAsString(statusUpdate);
        
        jmsTemplate.convertAndSend("cv-status-topic", jsonPayload);
    }
}

这段代码的核心在于:

  • 模型预加载: DJL模型在构造函数中加载,避免了每次处理消息时都重复加载模型的巨大开销。
  • 元数据提取: JmsListener方法首先从消息属性中提取traceIdstartTimestamp,这是实现追踪的关键。
  • 状态发布: 在处理开始、处理完成或处理失败时,都会调用publishStatus方法,向cv-status-topic发送一个JSON消息。这个消息包含了追踪ID、状态、处理耗时等所有前端需要的信息。这是一个典型的事件驱动更新模式。
  • 错误处理: 包含了基础的异常捕获,并在失败时发布FAILED状态。

第二步:API网关与监控服务

我们需要两个简单的Spring Boot应用:一个是接收HTTP请求并向队列发送任务的API网关,另一个是消费状态更新并将其通过WebSocket推送的监控服务。

API Gateway (api-gateway service):

// TaskController.java
package com.example.apigateway;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import jakarta.jms.BytesMessage;
import java.util.UUID;

@RestController
@RequestMapping("/api/tasks")
public class TaskController {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping(consumes = "multipart/form-data")
    public Map<String, String> submitTask(@RequestParam("image") MultipartFile imageFile) throws Exception {
        if (imageFile.isEmpty()) {
            throw new IllegalArgumentException("Image file is empty.");
        }

        final String traceId = UUID.randomUUID().toString();
        final long startTimestamp = System.currentTimeMillis();

        jmsTemplate.send("cv-task-queue", session -> {
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(imageFile.getBytes());
            
            // Set tracing properties
            message.setStringProperty("traceId", traceId);
            message.setLongProperty("startTimestamp", startTimestamp);
            
            return message;
        });

        return Map.of("traceId", traceId, "status", "QUEUED");
    }
}

该控制器接收一个图片文件,生成traceIdstartTimestamp,将它们设置到JMS消息的属性中,然后将图片字节流作为消息体发送到cv-task-queue

Monitor Service (monitor-service):

这个服务是连接后端和前端的桥梁。它需要spring-boot-starter-websocketspring-boot-starter-activemq依赖。

// WebSocketConfig.java
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private StatusUpdateHandler statusUpdateHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(statusUpdateHandler, "/ws/status").setAllowedOrigins("*");
    }
}

// StatusUpdateHandler.java
@Component
public class StatusUpdateHandler extends TextWebSocketHandler {
    private static final Logger logger = LoggerFactory.getLogger(StatusUpdateHandler.class);
    // Use ConcurrentHashMap for thread-safe session management
    private final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        logger.info("New WebSocket connection established: {}", session.getId());
    }

    public void broadcast(String message) {
        for (WebSocketSession session : sessions) {
            try {
                if (session.isOpen()) {
                    session.sendMessage(new TextMessage(message));
                }
            } catch (IOException e) {
                logger.error("Failed to send message to session {}", session.getId(), e);
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        logger.info("WebSocket connection closed: {}. Status: {}", session.getId(), status);
    }
}

// StatusTopicListener.java
@Component
public class StatusTopicListener {
    @Autowired
    private StatusUpdateHandler statusUpdateHandler;

    // Listens to the status topic from CV workers
    @JmsListener(destination = "cv-status-topic")
    public void receiveStatusUpdate(String jsonPayload) {
        // Simply forward the raw JSON payload to all connected WebSocket clients
        statusUpdateHandler.broadcast(jsonPayload);
    }
    
    // Listens to ActiveMQ Advisory Topics for consumer changes
    @JmsListener(destination = "ActiveMQ.Advisory.Consumer.Queue.cv-task-queue")
    public void handleConsumerAdvisory(ActiveMQMessage message) throws Exception {
        DataStructure ds = message.getDataStructure();
        if (ds instanceof ConsumerInfo) {
            ConsumerInfo consumerInfo = (ConsumerInfo) ds;
            int consumerCount = consumerInfo.getConsumerId().getConnectionId() != null ? 1 : 0; // Simplified logic
            String eventType = message.getJMSXGroupSeq() > 0 ? "CONSUMER_STARTED" : "CONSUMER_STOPPED";
            
            // In a real system, you'd need more robust logic to count active consumers.
            // This is a simplified example to show the concept.
            // We'll create a synthetic JSON message to broadcast
            String advisoryPayload = String.format(
                "{\"type\":\"ADVISORY\",\"event\":\"%s\",\"consumerCount\":%d}",
                eventType, 
                consumerCount
            );
             statusUpdateHandler.broadcast(advisoryPayload);
        }
    }
}

这里的关键点是:

  • WebSocket处理器: StatusUpdateHandler管理所有连接的WebSocket会话,并提供一个broadcast方法向所有客户端发送消息。
  • JMS监听器: StatusTopicListener监听两个目的地:
    • cv-status-topic: 接收来自CV工作节点的状态更新,并直接广播给前端。
    • ActiveMQ.Advisory.Consumer.Queue.cv-task-queue: 这是ActiveMQ的Advisory Topic。当有新的消费者连接或断开cv-task-queue时,Broker会向这个主题发送消息。我们监听此消息来实时更新“活跃工作节点”的数量。这是一种非常轻量级的服务发现机制。

第三步:Chakra UI 实时仪表盘

前端使用React和Chakra UI构建。核心是使用WebSocket API接收来自monitor-service的实时数据流,并更新React状态。

// Dashboard.js
import React, { useState, useEffect, useRef } from 'react';
import {
  ChakraProvider, Box, Heading, Table, Thead, Tbody, Tr, Th, Td,
  Tag, Stat, StatLabel, StatNumber, StatGroup, Code, useToast
} from '@chakra-ui/react';
import { theme } from '@chakra-ui/react';

const TaskStatusTag = ({ status }) => {
  const colorScheme = {
    QUEUED: 'gray',
    PROCESSING: 'blue',
    COMPLETED: 'green',
    FAILED: 'red',
  }[status];
  return <Tag colorScheme={colorScheme}>{status}</Tag>;
};

function App() {
  const [tasks, setTasks] = useState({});
  const [activeWorkers, setActiveWorkers] = useState(0); // This part is a simplification
  const ws = useRef(null);
  const toast = useToast();

  useEffect(() => {
    // The WebSocket URL should point to your Monitor Service
    const WEBSOCKET_URL = 'ws://localhost:8082/ws/status'; 
    ws.current = new WebSocket(WEBSOCKET_URL);

    ws.current.onopen = () => {
      console.log('WebSocket Connected');
      toast({ title: 'Connected to Monitor Service', status: 'success', duration: 2000, isClosable: true });
    };

    ws.current.onmessage = (event) => {
      try {
        const message = JSON.parse(event.data);

        // Handle advisory messages for worker count
        if (message.type === 'ADVISORY') {
            // This is a simplified logic. A real implementation would need a more robust way to count.
            if (message.event === 'CONSUMER_STARTED') {
                setActiveWorkers(prev => prev + 1);
            } else if (message.event === 'CONSUMER_STOPPED') {
                setActiveWorkers(prev => Math.max(0, prev - 1));
            }
            return;
        }

        // Handle regular task status updates
        const { traceId, status, queueEntryTimestamp, processingTimeMs, eventTimestamp } = message;
        
        setTasks(prevTasks => {
          const existingTask = prevTasks[traceId] || { traceId, status: 'QUEUED', startTime: queueEntryTimestamp };
          const endToEndLatency = status === 'COMPLETED' ? eventTimestamp - existingTask.startTime : null;
          
          const updatedTask = {
            ...existingTask,
            status,
            processingTimeMs: processingTimeMs > 0 ? processingTimeMs : existingTask.processingTimeMs,
            endToEndLatency,
            lastUpdated: eventTimestamp,
          };

          return { ...prevTasks, [traceId]: updatedTask };
        });
      } catch (error) {
        console.error('Failed to parse WebSocket message:', event.data, error);
      }
    };

    ws.current.onclose = () => {
      console.log('WebSocket Disconnected');
      toast({ title: 'Disconnected from Monitor Service', description: 'Attempting to reconnect...', status: 'error', duration: 5000, isClosable: true });
      // You might want to implement a reconnection logic here
    };

    return () => {
      ws.current.close();
    };
  }, [toast]);

  const sortedTasks = Object.values(tasks).sort((a, b) => b.lastUpdated - a.lastUpdated);

  return (
    <ChakraProvider theme={theme}>
      <Box p={8}>
        <Heading mb={6}>CV Processing Pipeline - Real-time Dashboard</Heading>
        
        <StatGroup mb={8}>
          <Stat>
            <StatLabel>Total Tasks Tracked</StatLabel>
            <StatNumber>{sortedTasks.length}</StatNumber>
          </Stat>
          <Stat>
            <StatLabel>Active CV Workers</StatLabel>
            <StatNumber>{activeWorkers}</StatNumber>
          </Stat>
        </StatGroup>

        <Table variant="simple">
          <Thead>
            <Tr>
              <Th>Trace ID</Th>
              <Th>Status</Th>
              <Th isNumeric>Model Inference Time (ms)</Th>
              <Th isNumeric>End-to-End Latency (ms)</Th>
            </Tr>
          </Thead>
          <Tbody>
            {sortedTasks.slice(0, 20).map(task => ( // Display latest 20 tasks
              <Tr key={task.traceId}>
                <Td><Code>{task.traceId}</Code></Td>
                <Td><TaskStatusTag status={task.status} /></Td>
                <Td isNumeric>{task.processingTimeMs || 'N/A'}</Td>
                <Td isNumeric>{task.endToEndLatency || 'In Progress'}</Td>
              </Tr>
            ))}
          </Tbody>
        </Table>
      </Box>
    </ChakraProvider>
  );
}

export default App;

前端代码的逻辑很清晰:

  • 状态管理: 使用useState维护一个以traceId为键的任务对象tasks。这使得更新特定任务的状态变得高效。
  • WebSocket连接: useEffect钩子负责建立和清理WebSocket连接。useRef用于在多次渲染之间保持对WebSocket实例的引用。
  • 消息处理: onmessage事件处理器解析收到的JSON。如果是任务状态更新,它会计算端到端延迟(eventTimestamp - queueEntryTimestamp),并更新tasks状态。如果是Advisory消息,则更新activeWorkers状态。
  • 组件渲染: 使用Chakra UI的Table, Tag, Stat等组件来清晰地展示数据。TaskStatusTag是一个简单的组件,根据任务状态显示不同颜色的标签。

当前方案的局限性与未来迭代

这套自建的轻量级监控系统有效地解决了最初的“黑盒”问题,但它并非一个完备的生产级可观测性平台。存在一些明显的局限:

  1. Monitor Service的单点问题: monitor-service目前是单体的,并且其WebSocket会话状态是保存在内存中的。如果该服务重启,所有前端连接都会断开,并且不会有历史状态。在生产环境中,可以考虑将其部署为多个实例,并使用Redis Pub/Sub来广播消息,以解决单点故障和状态共享问题。
  2. 追踪范围有限: 当前的追踪始于消息进入ActiveMQ,止于CV Worker处理完毕。它没有覆盖从用户浏览器到API Gateway的HTTP请求阶段。要实现真正的全链路追踪,需要引入遵循OpenTelemetry标准的追踪库,在HTTP头和JMS消息属性中传播统一的Trace Context。
  3. 指标聚合与持久化不足: 系统只展示了瞬时状态。无法进行历史趋势分析、告警或SLO计算。一个自然的演进方向是将monitor-service收集到的状态事件(如处理耗时、任务成功/失败率)转换为Prometheus指标,通过/metrics端点暴露出去,然后使用Grafana进行持久化存储和更丰富的数据可视化。
  4. Advisory Message的复杂性: 通过Advisory Message来统计消费者数量在集群和复杂网络环境下可能不完全准确。更可靠的方式是通过JMX接口定期查询Broker的队列信息,但这会从“事件驱动”变为“轮询”,需要权衡。

尽管存在这些局限,但该方案作为一个起点,成功地利用ActiveMQ的消息属性、Jib的便捷容器化以及Chakra UI的快速开发能力,以较低的成本实现了一个高度定制化的实时监控仪表盘,为后续构建更完善的可观测性体系奠定了基础。


  目录