risingwave_batch/monitor/
stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::core::{AtomicU64, GenericCounter};
18use prometheus::{
19    Histogram, IntGauge, Registry, histogram_opts, register_histogram_with_registry,
20    register_int_counter_with_registry,
21};
22use risingwave_common::metrics::TrAdderGauge;
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24use risingwave_connector::source::iceberg::IcebergScanMetrics;
25
26/// Metrics for batch executor.
27/// Currently, it contains:
28/// - `exchange_recv_row_number`: Total number of row that have been received for exchange.
29/// - `row_seq_scan_next_duration`: Time spent iterating next rows.
30#[derive(Clone)]
31pub struct BatchExecutorMetrics {
32    pub exchange_recv_row_number: GenericCounter<AtomicU64>,
33    pub row_seq_scan_next_duration: Histogram,
34}
35
36pub static GLOBAL_BATCH_EXECUTOR_METRICS: LazyLock<BatchExecutorMetrics> =
37    LazyLock::new(|| BatchExecutorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl BatchExecutorMetrics {
40    fn new(register: &Registry) -> Self {
41        let exchange_recv_row_number = register_int_counter_with_registry!(
42            "batch_exchange_recv_row_number",
43            "Total number of row that have been received from upstream source",
44            register,
45        )
46        .unwrap();
47
48        let opts = histogram_opts!(
49            "batch_row_seq_scan_next_duration",
50            "Time spent deserializing into a row in cell based table.",
51        );
52
53        let row_seq_scan_next_duration = register_histogram_with_registry!(opts, register).unwrap();
54
55        Self {
56            exchange_recv_row_number,
57            row_seq_scan_next_duration,
58        }
59    }
60
61    /// Create a new `BatchTaskMetrics` instance used in tests or other places.
62    pub fn for_test() -> Arc<Self> {
63        Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone())
64    }
65}
66
67pub type BatchMetrics = Arc<BatchMetricsInner>;
68
69/// A wrapper of `BatchManagerMetrics` and `BatchExecutorMetrics` that contains all metrics for batch.
70pub struct BatchMetricsInner {
71    batch_manager_metrics: Arc<BatchManagerMetrics>,
72    executor_metrics: Arc<BatchExecutorMetrics>,
73    iceberg_scan_metrics: Arc<IcebergScanMetrics>,
74}
75
76impl BatchMetricsInner {
77    pub fn new(
78        batch_manager_metrics: Arc<BatchManagerMetrics>,
79        executor_metrics: Arc<BatchExecutorMetrics>,
80        iceberg_scan_metrics: Arc<IcebergScanMetrics>,
81    ) -> Self {
82        Self {
83            batch_manager_metrics,
84            executor_metrics,
85            iceberg_scan_metrics,
86        }
87    }
88
89    pub fn executor_metrics(&self) -> &BatchExecutorMetrics {
90        &self.executor_metrics
91    }
92
93    pub fn batch_manager_metrics(&self) -> &BatchManagerMetrics {
94        &self.batch_manager_metrics
95    }
96
97    pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
98        self.iceberg_scan_metrics.clone()
99    }
100
101    pub fn for_test() -> BatchMetrics {
102        Arc::new(Self {
103            batch_manager_metrics: BatchManagerMetrics::for_test(),
104            executor_metrics: BatchExecutorMetrics::for_test(),
105            iceberg_scan_metrics: IcebergScanMetrics::for_test(),
106        })
107    }
108}
109
110#[derive(Clone)]
111pub struct BatchManagerMetrics {
112    pub task_num: IntGauge,
113    pub batch_total_mem: TrAdderGauge,
114    pub batch_heartbeat_worker_num: IntGauge,
115}
116
117pub static GLOBAL_BATCH_MANAGER_METRICS: LazyLock<BatchManagerMetrics> =
118    LazyLock::new(|| BatchManagerMetrics::new(&GLOBAL_METRICS_REGISTRY));
119
120impl BatchManagerMetrics {
121    fn new(registry: &Registry) -> Self {
122        let task_num = IntGauge::new("batch_task_num", "Number of batch task in memory").unwrap();
123        let batch_total_mem = TrAdderGauge::new(
124            "compute_batch_total_mem",
125            "Total number of memory usage for batch tasks.",
126        )
127        .unwrap();
128        let batch_heartbeat_worker_num = IntGauge::new(
129            "batch_heartbeat_worker_num",
130            "Total number of heartbeat worker for batch tasks.",
131        )
132        .unwrap();
133
134        registry.register(Box::new(task_num.clone())).unwrap();
135        registry
136            .register(Box::new(batch_total_mem.clone()))
137            .unwrap();
138        registry
139            .register(Box::new(batch_heartbeat_worker_num.clone()))
140            .unwrap();
141        Self {
142            task_num,
143            batch_total_mem,
144            batch_heartbeat_worker_num,
145        }
146    }
147
148    pub fn for_test() -> Arc<Self> {
149        Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone())
150    }
151}
152
153#[derive(Clone)]
154pub struct BatchSpillMetrics {
155    pub batch_spill_read_bytes: GenericCounter<AtomicU64>,
156    pub batch_spill_write_bytes: GenericCounter<AtomicU64>,
157}
158
159pub static GLOBAL_BATCH_SPILL_METRICS: LazyLock<BatchSpillMetrics> =
160    LazyLock::new(|| BatchSpillMetrics::new(&GLOBAL_METRICS_REGISTRY));
161
162impl BatchSpillMetrics {
163    fn new(registry: &Registry) -> Self {
164        let batch_spill_read_bytes = register_int_counter_with_registry!(
165            "batch_spill_read_bytes",
166            "Total bytes of requests read from spill files",
167            registry,
168        )
169        .unwrap();
170        let batch_spill_write_bytes = register_int_counter_with_registry!(
171            "batch_spill_write_bytes",
172            "Total bytes of requests write to spill files",
173            registry,
174        )
175        .unwrap();
176        Self {
177            batch_spill_read_bytes,
178            batch_spill_write_bytes,
179        }
180    }
181
182    pub fn for_test() -> Arc<Self> {
183        Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone())
184    }
185}