risingwave_batch/monitor/
stats.rs
1use std::sync::{Arc, LazyLock};
16
17use prometheus::core::{AtomicU64, GenericCounter};
18use prometheus::{
19 Histogram, IntGauge, Registry, histogram_opts, register_histogram_with_registry,
20 register_int_counter_with_registry,
21};
22use risingwave_common::metrics::TrAdderGauge;
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24use risingwave_connector::source::iceberg::IcebergScanMetrics;
25
26#[derive(Clone)]
31pub struct BatchExecutorMetrics {
32 pub exchange_recv_row_number: GenericCounter<AtomicU64>,
33 pub row_seq_scan_next_duration: Histogram,
34}
35
36pub static GLOBAL_BATCH_EXECUTOR_METRICS: LazyLock<BatchExecutorMetrics> =
37 LazyLock::new(|| BatchExecutorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl BatchExecutorMetrics {
40 fn new(register: &Registry) -> Self {
41 let exchange_recv_row_number = register_int_counter_with_registry!(
42 "batch_exchange_recv_row_number",
43 "Total number of row that have been received from upstream source",
44 register,
45 )
46 .unwrap();
47
48 let opts = histogram_opts!(
49 "batch_row_seq_scan_next_duration",
50 "Time spent deserializing into a row in cell based table.",
51 );
52
53 let row_seq_scan_next_duration = register_histogram_with_registry!(opts, register).unwrap();
54
55 Self {
56 exchange_recv_row_number,
57 row_seq_scan_next_duration,
58 }
59 }
60
61 pub fn for_test() -> Arc<Self> {
63 Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone())
64 }
65}
66
67pub type BatchMetrics = Arc<BatchMetricsInner>;
68
69pub struct BatchMetricsInner {
71 batch_manager_metrics: Arc<BatchManagerMetrics>,
72 executor_metrics: Arc<BatchExecutorMetrics>,
73 iceberg_scan_metrics: Arc<IcebergScanMetrics>,
74}
75
76impl BatchMetricsInner {
77 pub fn new(
78 batch_manager_metrics: Arc<BatchManagerMetrics>,
79 executor_metrics: Arc<BatchExecutorMetrics>,
80 iceberg_scan_metrics: Arc<IcebergScanMetrics>,
81 ) -> Self {
82 Self {
83 batch_manager_metrics,
84 executor_metrics,
85 iceberg_scan_metrics,
86 }
87 }
88
89 pub fn executor_metrics(&self) -> &BatchExecutorMetrics {
90 &self.executor_metrics
91 }
92
93 pub fn batch_manager_metrics(&self) -> &BatchManagerMetrics {
94 &self.batch_manager_metrics
95 }
96
97 pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
98 self.iceberg_scan_metrics.clone()
99 }
100
101 pub fn for_test() -> BatchMetrics {
102 Arc::new(Self {
103 batch_manager_metrics: BatchManagerMetrics::for_test(),
104 executor_metrics: BatchExecutorMetrics::for_test(),
105 iceberg_scan_metrics: IcebergScanMetrics::for_test(),
106 })
107 }
108}
109
110#[derive(Clone)]
111pub struct BatchManagerMetrics {
112 pub task_num: IntGauge,
113 pub batch_total_mem: TrAdderGauge,
114 pub batch_heartbeat_worker_num: IntGauge,
115}
116
117pub static GLOBAL_BATCH_MANAGER_METRICS: LazyLock<BatchManagerMetrics> =
118 LazyLock::new(|| BatchManagerMetrics::new(&GLOBAL_METRICS_REGISTRY));
119
120impl BatchManagerMetrics {
121 fn new(registry: &Registry) -> Self {
122 let task_num = IntGauge::new("batch_task_num", "Number of batch task in memory").unwrap();
123 let batch_total_mem = TrAdderGauge::new(
124 "compute_batch_total_mem",
125 "Total number of memory usage for batch tasks.",
126 )
127 .unwrap();
128 let batch_heartbeat_worker_num = IntGauge::new(
129 "batch_heartbeat_worker_num",
130 "Total number of heartbeat worker for batch tasks.",
131 )
132 .unwrap();
133
134 registry.register(Box::new(task_num.clone())).unwrap();
135 registry
136 .register(Box::new(batch_total_mem.clone()))
137 .unwrap();
138 registry
139 .register(Box::new(batch_heartbeat_worker_num.clone()))
140 .unwrap();
141 Self {
142 task_num,
143 batch_total_mem,
144 batch_heartbeat_worker_num,
145 }
146 }
147
148 pub fn for_test() -> Arc<Self> {
149 Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone())
150 }
151}
152
153#[derive(Clone)]
154pub struct BatchSpillMetrics {
155 pub batch_spill_read_bytes: GenericCounter<AtomicU64>,
156 pub batch_spill_write_bytes: GenericCounter<AtomicU64>,
157}
158
159pub static GLOBAL_BATCH_SPILL_METRICS: LazyLock<BatchSpillMetrics> =
160 LazyLock::new(|| BatchSpillMetrics::new(&GLOBAL_METRICS_REGISTRY));
161
162impl BatchSpillMetrics {
163 fn new(registry: &Registry) -> Self {
164 let batch_spill_read_bytes = register_int_counter_with_registry!(
165 "batch_spill_read_bytes",
166 "Total bytes of requests read from spill files",
167 registry,
168 )
169 .unwrap();
170 let batch_spill_write_bytes = register_int_counter_with_registry!(
171 "batch_spill_write_bytes",
172 "Total bytes of requests write to spill files",
173 registry,
174 )
175 .unwrap();
176 Self {
177 batch_spill_read_bytes,
178 batch_spill_write_bytes,
179 }
180 }
181
182 pub fn for_test() -> Arc<Self> {
183 Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone())
184 }
185}