risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec,
58    actor_scheduled_duration: LabelGuardedGaugeVec,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60    actor_fast_poll_duration: LabelGuardedGaugeVec,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62    actor_slow_poll_duration: LabelGuardedGaugeVec,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64    actor_poll_duration: LabelGuardedGaugeVec,
65    actor_poll_cnt: LabelGuardedIntGaugeVec,
66    actor_idle_duration: LabelGuardedGaugeVec,
67    actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec,
77    pub source_split_change_count: LabelGuardedIntCounterVec,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec,
82    sink_input_bytes: LabelGuardedIntCounterVec,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161    over_window_compute_count: LabelGuardedIntCounterVec,
162    over_window_same_output_count: LabelGuardedIntCounterVec,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217    // PostgreSQL CDC LSN monitoring
218    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221    // MySQL CDC binlog monitoring
222    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
223    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
224
225    // Gap Fill
226    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
227}
228
229pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
230
231pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
232    GLOBAL_STREAMING_METRICS
233        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
234        .clone()
235}
236
237impl StreamingMetrics {
238    fn new(registry: &Registry, level: MetricLevel) -> Self {
239        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
240            "stream_executor_row_count",
241            "Total number of rows that have been output from each executor",
242            &["actor_id", "fragment_id", "executor_identity"],
243            registry
244        )
245        .unwrap()
246        .relabel_debug_1(level);
247
248        let stream_node_output_row_count = CountMap::new();
249        let stream_node_output_blocking_duration_ns = CountMap::new();
250
251        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
252            "stream_source_output_rows_counts",
253            "Total number of rows that have been output from source",
254            &["source_id", "source_name", "actor_id", "fragment_id"],
255            registry
256        )
257        .unwrap();
258
259        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
260            "stream_source_split_change_event_count",
261            "Total number of split change events that have been operated by source",
262            &["source_id", "source_name", "actor_id", "fragment_id"],
263            registry
264        )
265        .unwrap();
266
267        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
268            "stream_source_backfill_rows_counts",
269            "Total number of rows that have been backfilled for source",
270            &["source_id", "source_name", "actor_id", "fragment_id"],
271            registry
272        )
273        .unwrap();
274
275        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
276            "stream_sink_input_row_count",
277            "Total number of rows streamed into sink executors",
278            &["sink_id", "actor_id", "fragment_id"],
279            registry
280        )
281        .unwrap();
282
283        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
284            "stream_sink_input_bytes",
285            "Total size of chunks streamed into sink executors",
286            &["sink_id", "actor_id", "fragment_id"],
287            registry
288        )
289        .unwrap();
290
291        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
292            "stream_mview_input_row_count",
293            "Total number of rows streamed into materialize executors",
294            &["actor_id", "table_id", "fragment_id"],
295            registry
296        )
297        .unwrap()
298        .relabel_debug_1(level);
299
300        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
301            "stream_mview_current_epoch",
302            "The current epoch of the materialized executor",
303            &["actor_id", "table_id", "fragment_id"],
304            registry
305        )
306        .unwrap()
307        .relabel_debug_1(level);
308
309        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
310            "stream_pg_cdc_state_table_lsn",
311            "Current LSN value stored in PostgreSQL CDC state table",
312            &["source_id"],
313            registry,
314        )
315        .unwrap();
316
317        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
318            "stream_pg_cdc_jni_commit_offset_lsn",
319            "LSN value when JNI commit offset is called for PostgreSQL CDC",
320            &["source_id"],
321            registry,
322        )
323        .unwrap();
324
325        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
326            "stream_mysql_cdc_state_binlog_file_seq",
327            "Current binlog file sequence number stored in MySQL CDC state table",
328            &["source_id"],
329            registry,
330        )
331        .unwrap();
332
333        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
334            "stream_mysql_cdc_state_binlog_position",
335            "Current binlog position stored in MySQL CDC state table",
336            &["source_id"],
337            registry,
338        )
339        .unwrap();
340
341        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
342            "stream_sink_chunk_buffer_size",
343            "Total size of chunks buffered in a barrier",
344            &["sink_id", "actor_id", "fragment_id"],
345            registry
346        )
347        .unwrap();
348
349        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
350            "stream_actor_actor_execution_time",
351            "Total execution time (s) of an actor",
352            &["actor_id"],
353            registry
354        )
355        .unwrap();
356
357        let actor_output_buffer_blocking_duration_ns =
358            register_guarded_int_counter_vec_with_registry!(
359                "stream_actor_output_buffer_blocking_duration_ns",
360                "Total blocking duration (ns) of output buffer",
361                &["actor_id", "fragment_id", "downstream_fragment_id"],
362                registry
363            )
364            .unwrap()
365            // mask the first label `actor_id` if the level is less verbose than `Debug`
366            .relabel_debug_1(level);
367
368        let actor_input_buffer_blocking_duration_ns =
369            register_guarded_int_counter_vec_with_registry!(
370                "stream_actor_input_buffer_blocking_duration_ns",
371                "Total blocking duration (ns) of input buffer",
372                &["actor_id", "fragment_id", "upstream_fragment_id"],
373                registry
374            )
375            .unwrap()
376            // mask the first label `actor_id` if the level is less verbose than `Debug`
377            .relabel_debug_1(level);
378
379        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
380            "stream_exchange_frag_recv_size",
381            "Total size of messages that have been received from upstream Fragment",
382            &["up_fragment_id", "down_fragment_id"],
383            registry
384        )
385        .unwrap();
386
387        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
388            "stream_actor_fast_poll_duration",
389            "tokio's metrics",
390            &["actor_id"],
391            registry
392        )
393        .unwrap();
394
395        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
396            "stream_actor_fast_poll_cnt",
397            "tokio's metrics",
398            &["actor_id"],
399            registry
400        )
401        .unwrap();
402
403        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
404            "stream_actor_slow_poll_duration",
405            "tokio's metrics",
406            &["actor_id"],
407            registry
408        )
409        .unwrap();
410
411        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
412            "stream_actor_slow_poll_cnt",
413            "tokio's metrics",
414            &["actor_id"],
415            registry
416        )
417        .unwrap();
418
419        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
420            "stream_actor_poll_duration",
421            "tokio's metrics",
422            &["actor_id"],
423            registry
424        )
425        .unwrap();
426
427        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
428            "stream_actor_poll_cnt",
429            "tokio's metrics",
430            &["actor_id"],
431            registry
432        )
433        .unwrap();
434
435        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
436            "stream_actor_scheduled_duration",
437            "tokio's metrics",
438            &["actor_id"],
439            registry
440        )
441        .unwrap();
442
443        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
444            "stream_actor_scheduled_cnt",
445            "tokio's metrics",
446            &["actor_id"],
447            registry
448        )
449        .unwrap();
450
451        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
452            "stream_actor_idle_duration",
453            "tokio's metrics",
454            &["actor_id"],
455            registry
456        )
457        .unwrap();
458
459        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
460            "stream_actor_idle_cnt",
461            "tokio's metrics",
462            &["actor_id"],
463            registry
464        )
465        .unwrap();
466
467        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
468            "stream_actor_in_record_cnt",
469            "Total number of rows actor received",
470            &["actor_id", "fragment_id", "upstream_fragment_id"],
471            registry
472        )
473        .unwrap()
474        .relabel_debug_1(level);
475
476        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
477            "stream_actor_out_record_cnt",
478            "Total number of rows actor sent",
479            &["actor_id", "fragment_id"],
480            registry
481        )
482        .unwrap()
483        .relabel_debug_1(level);
484
485        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
486            "stream_actor_current_epoch",
487            "Current epoch of actor",
488            &["actor_id", "fragment_id"],
489            registry
490        )
491        .unwrap()
492        .relabel_debug_1(level);
493
494        let actor_count = register_guarded_int_gauge_vec_with_registry!(
495            "stream_actor_count",
496            "Total number of actors (parallelism)",
497            &["fragment_id"],
498            registry
499        )
500        .unwrap();
501
502        let opts = histogram_opts!(
503            "stream_merge_barrier_align_duration",
504            "Duration of merge align barrier",
505            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
506        );
507        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
508            opts,
509            &["actor_id", "fragment_id"],
510            registry
511        )
512        .unwrap()
513        .relabel_debug_1(level);
514
515        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
516            "stream_join_lookup_miss_count",
517            "Join executor lookup miss duration",
518            &["side", "join_table_id", "actor_id", "fragment_id"],
519            registry
520        )
521        .unwrap();
522
523        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
524            "stream_join_lookup_total_count",
525            "Join executor lookup total operation",
526            &["side", "join_table_id", "actor_id", "fragment_id"],
527            registry
528        )
529        .unwrap();
530
531        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
532            "stream_join_insert_cache_miss_count",
533            "Join executor cache miss when insert operation",
534            &["side", "join_table_id", "actor_id", "fragment_id"],
535            registry
536        )
537        .unwrap();
538
539        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
540            "stream_join_actor_input_waiting_duration_ns",
541            "Total waiting duration (ns) of input buffer of join actor",
542            &["actor_id", "fragment_id"],
543            registry
544        )
545        .unwrap();
546
547        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
548            "stream_join_match_duration_ns",
549            "Matching duration for each side",
550            &["actor_id", "fragment_id", "side"],
551            registry
552        )
553        .unwrap();
554
555        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
556            "stream_barrier_align_duration_ns",
557            "Duration of join align barrier",
558            &["actor_id", "fragment_id", "wait_side", "executor"],
559            registry
560        )
561        .unwrap()
562        .relabel_debug_1(level);
563
564        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
565            "stream_join_cached_entry_count",
566            "Number of cached entries in streaming join operators",
567            &["actor_id", "fragment_id", "side"],
568            registry
569        )
570        .unwrap();
571
572        let join_matched_join_keys_opts = histogram_opts!(
573            "stream_join_matched_join_keys",
574            "The number of keys matched in the opposite side",
575            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
576        );
577
578        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
579            join_matched_join_keys_opts,
580            &["actor_id", "fragment_id", "table_id"],
581            registry
582        )
583        .unwrap()
584        .relabel_debug_1(level);
585
586        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
587            "stream_agg_lookup_miss_count",
588            "Aggregation executor lookup miss duration",
589            &["table_id", "actor_id", "fragment_id"],
590            registry
591        )
592        .unwrap();
593
594        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
595            "stream_agg_lookup_total_count",
596            "Aggregation executor lookup total operation",
597            &["table_id", "actor_id", "fragment_id"],
598            registry
599        )
600        .unwrap();
601
602        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
603            "stream_agg_distinct_cache_miss_count",
604            "Aggregation executor dinsinct miss duration",
605            &["table_id", "actor_id", "fragment_id"],
606            registry
607        )
608        .unwrap();
609
610        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
611            "stream_agg_distinct_total_cache_count",
612            "Aggregation executor distinct total operation",
613            &["table_id", "actor_id", "fragment_id"],
614            registry
615        )
616        .unwrap();
617
618        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
619            "stream_agg_distinct_cached_entry_count",
620            "Total entry counts in distinct aggregation executor cache",
621            &["table_id", "actor_id", "fragment_id"],
622            registry
623        )
624        .unwrap();
625
626        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
627            "stream_agg_dirty_groups_count",
628            "Total dirty group counts in aggregation executor",
629            &["table_id", "actor_id", "fragment_id"],
630            registry
631        )
632        .unwrap();
633
634        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
635            "stream_agg_dirty_groups_heap_size",
636            "Total dirty group heap size in aggregation executor",
637            &["table_id", "actor_id", "fragment_id"],
638            registry
639        )
640        .unwrap();
641
642        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
643            "stream_agg_state_cache_lookup_count",
644            "Aggregation executor state cache lookup count",
645            &["table_id", "actor_id", "fragment_id"],
646            registry
647        )
648        .unwrap();
649
650        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
651            "stream_agg_state_cache_miss_count",
652            "Aggregation executor state cache miss count",
653            &["table_id", "actor_id", "fragment_id"],
654            registry
655        )
656        .unwrap();
657
658        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
659            "stream_group_top_n_cache_miss_count",
660            "Group top n executor cache miss count",
661            &["table_id", "actor_id", "fragment_id"],
662            registry
663        )
664        .unwrap();
665
666        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
667            "stream_group_top_n_total_query_cache_count",
668            "Group top n executor query cache total count",
669            &["table_id", "actor_id", "fragment_id"],
670            registry
671        )
672        .unwrap();
673
674        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
675            "stream_group_top_n_cached_entry_count",
676            "Total entry counts in group top n executor cache",
677            &["table_id", "actor_id", "fragment_id"],
678            registry
679        )
680        .unwrap();
681
682        let group_top_n_appendonly_cache_miss_count =
683            register_guarded_int_counter_vec_with_registry!(
684                "stream_group_top_n_appendonly_cache_miss_count",
685                "Group top n appendonly executor cache miss count",
686                &["table_id", "actor_id", "fragment_id"],
687                registry
688            )
689            .unwrap();
690
691        let group_top_n_appendonly_total_query_cache_count =
692            register_guarded_int_counter_vec_with_registry!(
693                "stream_group_top_n_appendonly_total_query_cache_count",
694                "Group top n appendonly executor total cache count",
695                &["table_id", "actor_id", "fragment_id"],
696                registry
697            )
698            .unwrap();
699
700        let group_top_n_appendonly_cached_entry_count =
701            register_guarded_int_gauge_vec_with_registry!(
702                "stream_group_top_n_appendonly_cached_entry_count",
703                "Total entry counts in group top n appendonly executor cache",
704                &["table_id", "actor_id", "fragment_id"],
705                registry
706            )
707            .unwrap();
708
709        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
710            "stream_lookup_cache_miss_count",
711            "Lookup executor cache miss count",
712            &["table_id", "actor_id", "fragment_id"],
713            registry
714        )
715        .unwrap();
716
717        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
718            "stream_lookup_total_query_cache_count",
719            "Lookup executor query cache total count",
720            &["table_id", "actor_id", "fragment_id"],
721            registry
722        )
723        .unwrap();
724
725        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726            "stream_lookup_cached_entry_count",
727            "Total entry counts in lookup executor cache",
728            &["table_id", "actor_id", "fragment_id"],
729            registry
730        )
731        .unwrap();
732
733        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
734            "stream_temporal_join_cache_miss_count",
735            "Temporal join executor cache miss count",
736            &["table_id", "actor_id", "fragment_id"],
737            registry
738        )
739        .unwrap();
740
741        let temporal_join_total_query_cache_count =
742            register_guarded_int_counter_vec_with_registry!(
743                "stream_temporal_join_total_query_cache_count",
744                "Temporal join executor query cache total count",
745                &["table_id", "actor_id", "fragment_id"],
746                registry
747            )
748            .unwrap();
749
750        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
751            "stream_temporal_join_cached_entry_count",
752            "Total entry count in temporal join executor cache",
753            &["table_id", "actor_id", "fragment_id"],
754            registry
755        )
756        .unwrap();
757
758        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
759            "stream_agg_cached_entry_count",
760            "Number of cached keys in streaming aggregation operators",
761            &["table_id", "actor_id", "fragment_id"],
762            registry
763        )
764        .unwrap();
765
766        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
767            "stream_agg_chunk_lookup_miss_count",
768            "Aggregation executor chunk-level lookup miss duration",
769            &["table_id", "actor_id", "fragment_id"],
770            registry
771        )
772        .unwrap();
773
774        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
775            "stream_agg_chunk_lookup_total_count",
776            "Aggregation executor chunk-level lookup total operation",
777            &["table_id", "actor_id", "fragment_id"],
778            registry
779        )
780        .unwrap();
781
782        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
783            "stream_backfill_snapshot_read_row_count",
784            "Total number of rows that have been read from the backfill snapshot",
785            &["table_id", "actor_id"],
786            registry
787        )
788        .unwrap();
789
790        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
791            "stream_backfill_upstream_output_row_count",
792            "Total number of rows that have been output from the backfill upstream",
793            &["table_id", "actor_id"],
794            registry
795        )
796        .unwrap();
797
798        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
799            "stream_cdc_backfill_snapshot_read_row_count",
800            "Total number of rows that have been read from the cdc_backfill snapshot",
801            &["table_id", "actor_id"],
802            registry
803        )
804        .unwrap();
805
806        let cdc_backfill_upstream_output_row_count =
807            register_guarded_int_counter_vec_with_registry!(
808                "stream_cdc_backfill_upstream_output_row_count",
809                "Total number of rows that have been output from the cdc_backfill upstream",
810                &["table_id", "actor_id"],
811                registry
812            )
813            .unwrap();
814
815        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
816            "stream_snapshot_backfill_consume_snapshot_row_count",
817            "Total number of rows that have been output from snapshot backfill",
818            &["table_id", "actor_id", "stage"],
819            registry
820        )
821        .unwrap();
822
823        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
824            "stream_over_window_cached_entry_count",
825            "Total entry (partition) count in over window executor cache",
826            &["table_id", "actor_id", "fragment_id"],
827            registry
828        )
829        .unwrap();
830
831        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
832            "stream_over_window_cache_lookup_count",
833            "Over window executor cache lookup count",
834            &["table_id", "actor_id", "fragment_id"],
835            registry
836        )
837        .unwrap();
838
839        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
840            "stream_over_window_cache_miss_count",
841            "Over window executor cache miss count",
842            &["table_id", "actor_id", "fragment_id"],
843            registry
844        )
845        .unwrap();
846
847        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
848            "stream_over_window_range_cache_entry_count",
849            "Over window partition range cache entry count",
850            &["table_id", "actor_id", "fragment_id"],
851            registry,
852        )
853        .unwrap();
854
855        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
856            "stream_over_window_range_cache_lookup_count",
857            "Over window partition range cache lookup count",
858            &["table_id", "actor_id", "fragment_id"],
859            registry
860        )
861        .unwrap();
862
863        let over_window_range_cache_left_miss_count =
864            register_guarded_int_counter_vec_with_registry!(
865                "stream_over_window_range_cache_left_miss_count",
866                "Over window partition range cache left miss count",
867                &["table_id", "actor_id", "fragment_id"],
868                registry
869            )
870            .unwrap();
871
872        let over_window_range_cache_right_miss_count =
873            register_guarded_int_counter_vec_with_registry!(
874                "stream_over_window_range_cache_right_miss_count",
875                "Over window partition range cache right miss count",
876                &["table_id", "actor_id", "fragment_id"],
877                registry
878            )
879            .unwrap();
880
881        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
882            "stream_over_window_accessed_entry_count",
883            "Over window accessed entry count",
884            &["table_id", "actor_id", "fragment_id"],
885            registry
886        )
887        .unwrap();
888
889        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
890            "stream_over_window_compute_count",
891            "Over window compute count",
892            &["table_id", "actor_id", "fragment_id"],
893            registry
894        )
895        .unwrap();
896
897        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
898            "stream_over_window_same_output_count",
899            "Over window same output count",
900            &["table_id", "actor_id", "fragment_id"],
901            registry
902        )
903        .unwrap();
904
905        let opts = histogram_opts!(
906            "stream_barrier_inflight_duration_seconds",
907            "barrier_inflight_latency",
908            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
909        );
910        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
911
912        let opts = histogram_opts!(
913            "stream_barrier_sync_storage_duration_seconds",
914            "barrier_sync_latency",
915            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
916        );
917        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
918
919        let opts = histogram_opts!(
920            "stream_barrier_batch_size",
921            "barrier_batch_size",
922            exponential_buckets(1.0, 2.0, 8).unwrap()
923        );
924        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
925
926        let barrier_manager_progress = register_int_counter_with_registry!(
927            "stream_barrier_manager_progress",
928            "The number of actors that have processed the earliest in-flight barriers",
929            registry
930        )
931        .unwrap();
932
933        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
934            "sync_kv_log_store_wait_next_poll_ns",
935            "Total duration (ns) of waiting for next poll",
936            &["actor_id", "target", "fragment_id", "relation"],
937            registry
938        )
939        .unwrap();
940
941        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
942            "sync_kv_log_store_read_count",
943            "read row count throughput of sync_kv log store",
944            &["type", "actor_id", "target", "fragment_id", "relation"],
945            registry
946        )
947        .unwrap();
948
949        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
950            "sync_kv_log_store_read_size",
951            "read size throughput of sync_kv log store",
952            &["type", "actor_id", "target", "fragment_id", "relation"],
953            registry
954        )
955        .unwrap();
956
957        let sync_kv_log_store_write_pause_duration_ns =
958            register_guarded_int_counter_vec_with_registry!(
959                "sync_kv_log_store_write_pause_duration_ns",
960                "Duration (ns) of sync_kv log store write pause",
961                &["actor_id", "target", "fragment_id", "relation"],
962                registry
963            )
964            .unwrap();
965
966        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
967            "sync_kv_log_store_state",
968            "clean/unclean state transition for sync_kv log store",
969            &["state", "actor_id", "target", "fragment_id", "relation"],
970            registry
971        )
972        .unwrap();
973
974        let sync_kv_log_store_storage_write_count =
975            register_guarded_int_counter_vec_with_registry!(
976                "sync_kv_log_store_storage_write_count",
977                "Write row count throughput of sync_kv log store",
978                &["actor_id", "target", "fragment_id", "relation"],
979                registry
980            )
981            .unwrap();
982
983        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
984            "sync_kv_log_store_storage_write_size",
985            "Write size throughput of sync_kv log store",
986            &["actor_id", "target", "fragment_id", "relation"],
987            registry
988        )
989        .unwrap();
990
991        let sync_kv_log_store_buffer_unconsumed_item_count =
992            register_guarded_int_gauge_vec_with_registry!(
993                "sync_kv_log_store_buffer_unconsumed_item_count",
994                "Number of Unconsumed Item in buffer",
995                &["actor_id", "target", "fragment_id", "relation"],
996                registry
997            )
998            .unwrap();
999
1000        let sync_kv_log_store_buffer_unconsumed_row_count =
1001            register_guarded_int_gauge_vec_with_registry!(
1002                "sync_kv_log_store_buffer_unconsumed_row_count",
1003                "Number of Unconsumed Row in buffer",
1004                &["actor_id", "target", "fragment_id", "relation"],
1005                registry
1006            )
1007            .unwrap();
1008
1009        let sync_kv_log_store_buffer_unconsumed_epoch_count =
1010            register_guarded_int_gauge_vec_with_registry!(
1011                "sync_kv_log_store_buffer_unconsumed_epoch_count",
1012                "Number of Unconsumed Epoch in buffer",
1013                &["actor_id", "target", "fragment_id", "relation"],
1014                registry
1015            )
1016            .unwrap();
1017
1018        let sync_kv_log_store_buffer_unconsumed_min_epoch =
1019            register_guarded_int_gauge_vec_with_registry!(
1020                "sync_kv_log_store_buffer_unconsumed_min_epoch",
1021                "Number of Unconsumed Epoch in buffer",
1022                &["actor_id", "target", "fragment_id", "relation"],
1023                registry
1024            )
1025            .unwrap();
1026
1027        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1028            "kv_log_store_storage_write_count",
1029            "Write row count throughput of kv log store",
1030            &["actor_id", "connector", "sink_id", "sink_name"],
1031            registry
1032        )
1033        .unwrap();
1034
1035        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1036            "kv_log_store_storage_write_size",
1037            "Write size throughput of kv log store",
1038            &["actor_id", "connector", "sink_id", "sink_name"],
1039            registry
1040        )
1041        .unwrap();
1042
1043        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1044            "kv_log_store_storage_read_count",
1045            "Write row count throughput of kv log store",
1046            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1047            registry
1048        )
1049        .unwrap();
1050
1051        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1052            "kv_log_store_storage_read_size",
1053            "Write size throughput of kv log store",
1054            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1055            registry
1056        )
1057        .unwrap();
1058
1059        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1060            "kv_log_store_rewind_count",
1061            "Kv log store rewind rate",
1062            &["actor_id", "connector", "sink_id", "sink_name"],
1063            registry
1064        )
1065        .unwrap();
1066
1067        let kv_log_store_rewind_delay_opts = {
1068            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1069            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1070                - REWIND_BASE_DELAY.as_secs_f64().log2())
1071            .ceil() as usize;
1072            let buckets = exponential_buckets(
1073                REWIND_BASE_DELAY.as_secs_f64(),
1074                REWIND_BACKOFF_FACTOR as _,
1075                bucket_count,
1076            )
1077            .unwrap();
1078            histogram_opts!(
1079                "kv_log_store_rewind_delay",
1080                "Kv log store rewind delay",
1081                buckets,
1082            )
1083        };
1084
1085        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1086            kv_log_store_rewind_delay_opts,
1087            &["actor_id", "connector", "sink_id", "sink_name"],
1088            registry
1089        )
1090        .unwrap();
1091
1092        let kv_log_store_buffer_unconsumed_item_count =
1093            register_guarded_int_gauge_vec_with_registry!(
1094                "kv_log_store_buffer_unconsumed_item_count",
1095                "Number of Unconsumed Item in buffer",
1096                &["actor_id", "connector", "sink_id", "sink_name"],
1097                registry
1098            )
1099            .unwrap();
1100
1101        let kv_log_store_buffer_unconsumed_row_count =
1102            register_guarded_int_gauge_vec_with_registry!(
1103                "kv_log_store_buffer_unconsumed_row_count",
1104                "Number of Unconsumed Row in buffer",
1105                &["actor_id", "connector", "sink_id", "sink_name"],
1106                registry
1107            )
1108            .unwrap();
1109
1110        let kv_log_store_buffer_unconsumed_epoch_count =
1111            register_guarded_int_gauge_vec_with_registry!(
1112                "kv_log_store_buffer_unconsumed_epoch_count",
1113                "Number of Unconsumed Epoch in buffer",
1114                &["actor_id", "connector", "sink_id", "sink_name"],
1115                registry
1116            )
1117            .unwrap();
1118
1119        let kv_log_store_buffer_unconsumed_min_epoch =
1120            register_guarded_int_gauge_vec_with_registry!(
1121                "kv_log_store_buffer_unconsumed_min_epoch",
1122                "Number of Unconsumed Epoch in buffer",
1123                &["actor_id", "connector", "sink_id", "sink_name"],
1124                registry
1125            )
1126            .unwrap();
1127
1128        let lru_runtime_loop_count = register_int_counter_with_registry!(
1129            "lru_runtime_loop_count",
1130            "The counts of the eviction loop in LRU manager per second",
1131            registry
1132        )
1133        .unwrap();
1134
1135        let lru_latest_sequence = register_int_gauge_with_registry!(
1136            "lru_latest_sequence",
1137            "Current LRU global sequence",
1138            registry,
1139        )
1140        .unwrap();
1141
1142        let lru_watermark_sequence = register_int_gauge_with_registry!(
1143            "lru_watermark_sequence",
1144            "Current LRU watermark sequence",
1145            registry,
1146        )
1147        .unwrap();
1148
1149        let lru_eviction_policy = register_int_gauge_with_registry!(
1150            "lru_eviction_policy",
1151            "Current LRU eviction policy",
1152            registry,
1153        )
1154        .unwrap();
1155
1156        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1157            "jemalloc_allocated_bytes",
1158            "The allocated memory jemalloc, got from jemalloc_ctl",
1159            registry
1160        )
1161        .unwrap();
1162
1163        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1164            "jemalloc_active_bytes",
1165            "The active memory jemalloc, got from jemalloc_ctl",
1166            registry
1167        )
1168        .unwrap();
1169
1170        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1171            "jemalloc_resident_bytes",
1172            "The active memory jemalloc, got from jemalloc_ctl",
1173            registry
1174        )
1175        .unwrap();
1176
1177        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1178            "jemalloc_metadata_bytes",
1179            "The active memory jemalloc, got from jemalloc_ctl",
1180            registry
1181        )
1182        .unwrap();
1183
1184        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1185            "jvm_allocated_bytes",
1186            "The allocated jvm memory",
1187            registry
1188        )
1189        .unwrap();
1190
1191        let jvm_active_bytes = register_int_gauge_with_registry!(
1192            "jvm_active_bytes",
1193            "The active jvm memory",
1194            registry
1195        )
1196        .unwrap();
1197
1198        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1199            "stream_materialize_cache_hit_count",
1200            "Materialize executor cache hit count",
1201            &["actor_id", "table_id", "fragment_id"],
1202            registry
1203        )
1204        .unwrap()
1205        .relabel_debug_1(level);
1206
1207        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1208            "stream_materialize_data_exist_count",
1209            "Materialize executor data exist count",
1210            &["actor_id", "table_id", "fragment_id"],
1211            registry
1212        )
1213        .unwrap()
1214        .relabel_debug_1(level);
1215
1216        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1217            "stream_materialize_cache_total_count",
1218            "Materialize executor cache total operation",
1219            &["actor_id", "table_id", "fragment_id"],
1220            registry
1221        )
1222        .unwrap()
1223        .relabel_debug_1(level);
1224
1225        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1226            "stream_memory_usage",
1227            "Memory usage for stream executors",
1228            &["actor_id", "table_id", "desc"],
1229            registry
1230        )
1231        .unwrap()
1232        .relabel_debug_1(level);
1233
1234        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1235            "gap_fill_generated_rows_count",
1236            "Total number of rows generated by gap fill executor",
1237            &["actor_id", "fragment_id"],
1238            registry
1239        )
1240        .unwrap()
1241        .relabel_debug_1(level);
1242
1243        Self {
1244            level,
1245            executor_row_count,
1246            mem_stream_node_output_row_count: stream_node_output_row_count,
1247            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1248            actor_execution_time,
1249            actor_scheduled_duration,
1250            actor_scheduled_cnt,
1251            actor_fast_poll_duration,
1252            actor_fast_poll_cnt,
1253            actor_slow_poll_duration,
1254            actor_slow_poll_cnt,
1255            actor_poll_duration,
1256            actor_poll_cnt,
1257            actor_idle_duration,
1258            actor_idle_cnt,
1259            actor_count,
1260            actor_in_record_cnt,
1261            actor_out_record_cnt,
1262            actor_current_epoch,
1263            source_output_row_count,
1264            source_split_change_count,
1265            source_backfill_row_count,
1266            sink_input_row_count,
1267            sink_input_bytes,
1268            sink_chunk_buffer_size,
1269            exchange_frag_recv_size,
1270            merge_barrier_align_duration,
1271            actor_output_buffer_blocking_duration_ns,
1272            actor_input_buffer_blocking_duration_ns,
1273            join_lookup_miss_count,
1274            join_lookup_total_count,
1275            join_insert_cache_miss_count,
1276            join_actor_input_waiting_duration_ns,
1277            join_match_duration_ns,
1278            join_cached_entry_count,
1279            join_matched_join_keys,
1280            barrier_align_duration,
1281            agg_lookup_miss_count,
1282            agg_total_lookup_count,
1283            agg_cached_entry_count,
1284            agg_chunk_lookup_miss_count,
1285            agg_chunk_total_lookup_count,
1286            agg_dirty_groups_count,
1287            agg_dirty_groups_heap_size,
1288            agg_distinct_cache_miss_count,
1289            agg_distinct_total_cache_count,
1290            agg_distinct_cached_entry_count,
1291            agg_state_cache_lookup_count,
1292            agg_state_cache_miss_count,
1293            group_top_n_cache_miss_count,
1294            group_top_n_total_query_cache_count,
1295            group_top_n_cached_entry_count,
1296            group_top_n_appendonly_cache_miss_count,
1297            group_top_n_appendonly_total_query_cache_count,
1298            group_top_n_appendonly_cached_entry_count,
1299            lookup_cache_miss_count,
1300            lookup_total_query_cache_count,
1301            lookup_cached_entry_count,
1302            temporal_join_cache_miss_count,
1303            temporal_join_total_query_cache_count,
1304            temporal_join_cached_entry_count,
1305            backfill_snapshot_read_row_count,
1306            backfill_upstream_output_row_count,
1307            cdc_backfill_snapshot_read_row_count,
1308            cdc_backfill_upstream_output_row_count,
1309            snapshot_backfill_consume_row_count,
1310            over_window_cached_entry_count,
1311            over_window_cache_lookup_count,
1312            over_window_cache_miss_count,
1313            over_window_range_cache_entry_count,
1314            over_window_range_cache_lookup_count,
1315            over_window_range_cache_left_miss_count,
1316            over_window_range_cache_right_miss_count,
1317            over_window_accessed_entry_count,
1318            over_window_compute_count,
1319            over_window_same_output_count,
1320            barrier_inflight_latency,
1321            barrier_sync_latency,
1322            barrier_batch_size,
1323            barrier_manager_progress,
1324            kv_log_store_storage_write_count,
1325            kv_log_store_storage_write_size,
1326            kv_log_store_rewind_count,
1327            kv_log_store_rewind_delay,
1328            kv_log_store_storage_read_count,
1329            kv_log_store_storage_read_size,
1330            kv_log_store_buffer_unconsumed_item_count,
1331            kv_log_store_buffer_unconsumed_row_count,
1332            kv_log_store_buffer_unconsumed_epoch_count,
1333            kv_log_store_buffer_unconsumed_min_epoch,
1334            sync_kv_log_store_read_count,
1335            sync_kv_log_store_read_size,
1336            sync_kv_log_store_write_pause_duration_ns,
1337            sync_kv_log_store_state,
1338            sync_kv_log_store_wait_next_poll_ns,
1339            sync_kv_log_store_storage_write_count,
1340            sync_kv_log_store_storage_write_size,
1341            sync_kv_log_store_buffer_unconsumed_item_count,
1342            sync_kv_log_store_buffer_unconsumed_row_count,
1343            sync_kv_log_store_buffer_unconsumed_epoch_count,
1344            sync_kv_log_store_buffer_unconsumed_min_epoch,
1345            lru_runtime_loop_count,
1346            lru_latest_sequence,
1347            lru_watermark_sequence,
1348            lru_eviction_policy,
1349            jemalloc_allocated_bytes,
1350            jemalloc_active_bytes,
1351            jemalloc_resident_bytes,
1352            jemalloc_metadata_bytes,
1353            jvm_allocated_bytes,
1354            jvm_active_bytes,
1355            stream_memory_usage,
1356            materialize_cache_hit_count,
1357            materialize_data_exist_count,
1358            materialize_cache_total_count,
1359            materialize_input_row_count,
1360            materialize_current_epoch,
1361            pg_cdc_state_table_lsn,
1362            pg_cdc_jni_commit_offset_lsn,
1363            mysql_cdc_state_binlog_file_seq,
1364            mysql_cdc_state_binlog_position,
1365            gap_fill_generated_rows_count,
1366        }
1367    }
1368
1369    /// Create a new `StreamingMetrics` instance used in tests or other places.
1370    pub fn unused() -> Self {
1371        global_streaming_metrics(MetricLevel::Disabled)
1372    }
1373
1374    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1375        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1376        let actor_execution_time = self
1377            .actor_execution_time
1378            .with_guarded_label_values(label_list);
1379        let actor_scheduled_duration = self
1380            .actor_scheduled_duration
1381            .with_guarded_label_values(label_list);
1382        let actor_scheduled_cnt = self
1383            .actor_scheduled_cnt
1384            .with_guarded_label_values(label_list);
1385        let actor_fast_poll_duration = self
1386            .actor_fast_poll_duration
1387            .with_guarded_label_values(label_list);
1388        let actor_fast_poll_cnt = self
1389            .actor_fast_poll_cnt
1390            .with_guarded_label_values(label_list);
1391        let actor_slow_poll_duration = self
1392            .actor_slow_poll_duration
1393            .with_guarded_label_values(label_list);
1394        let actor_slow_poll_cnt = self
1395            .actor_slow_poll_cnt
1396            .with_guarded_label_values(label_list);
1397        let actor_poll_duration = self
1398            .actor_poll_duration
1399            .with_guarded_label_values(label_list);
1400        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1401        let actor_idle_duration = self
1402            .actor_idle_duration
1403            .with_guarded_label_values(label_list);
1404        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1405        ActorMetrics {
1406            actor_execution_time,
1407            actor_scheduled_duration,
1408            actor_scheduled_cnt,
1409            actor_fast_poll_duration,
1410            actor_fast_poll_cnt,
1411            actor_slow_poll_duration,
1412            actor_slow_poll_cnt,
1413            actor_poll_duration,
1414            actor_poll_cnt,
1415            actor_idle_duration,
1416            actor_idle_cnt,
1417        }
1418    }
1419
1420    pub(crate) fn new_actor_input_metrics(
1421        &self,
1422        actor_id: ActorId,
1423        fragment_id: FragmentId,
1424        upstream_fragment_id: FragmentId,
1425    ) -> ActorInputMetrics {
1426        let actor_id_str = actor_id.to_string();
1427        let fragment_id_str = fragment_id.to_string();
1428        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1429        ActorInputMetrics {
1430            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1431                &actor_id_str,
1432                &fragment_id_str,
1433                &upstream_fragment_id_str,
1434            ]),
1435            actor_input_buffer_blocking_duration_ns: self
1436                .actor_input_buffer_blocking_duration_ns
1437                .with_guarded_label_values(&[
1438                    &actor_id_str,
1439                    &fragment_id_str,
1440                    &upstream_fragment_id_str,
1441                ]),
1442        }
1443    }
1444
1445    pub fn new_sink_exec_metrics(
1446        &self,
1447        id: SinkId,
1448        actor_id: ActorId,
1449        fragment_id: FragmentId,
1450    ) -> SinkExecutorMetrics {
1451        let label_list: &[&str; 3] = &[
1452            &id.to_string(),
1453            &actor_id.to_string(),
1454            &fragment_id.to_string(),
1455        ];
1456        SinkExecutorMetrics {
1457            sink_input_row_count: self
1458                .sink_input_row_count
1459                .with_guarded_label_values(label_list),
1460            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1461            sink_chunk_buffer_size: self
1462                .sink_chunk_buffer_size
1463                .with_guarded_label_values(label_list),
1464        }
1465    }
1466
1467    pub fn new_group_top_n_metrics(
1468        &self,
1469        table_id: TableId,
1470        actor_id: ActorId,
1471        fragment_id: FragmentId,
1472    ) -> GroupTopNMetrics {
1473        let label_list: &[&str; 3] = &[
1474            &table_id.to_string(),
1475            &actor_id.to_string(),
1476            &fragment_id.to_string(),
1477        ];
1478
1479        GroupTopNMetrics {
1480            group_top_n_cache_miss_count: self
1481                .group_top_n_cache_miss_count
1482                .with_guarded_label_values(label_list),
1483            group_top_n_total_query_cache_count: self
1484                .group_top_n_total_query_cache_count
1485                .with_guarded_label_values(label_list),
1486            group_top_n_cached_entry_count: self
1487                .group_top_n_cached_entry_count
1488                .with_guarded_label_values(label_list),
1489        }
1490    }
1491
1492    pub fn new_append_only_group_top_n_metrics(
1493        &self,
1494        table_id: TableId,
1495        actor_id: ActorId,
1496        fragment_id: FragmentId,
1497    ) -> GroupTopNMetrics {
1498        let label_list: &[&str; 3] = &[
1499            &table_id.to_string(),
1500            &actor_id.to_string(),
1501            &fragment_id.to_string(),
1502        ];
1503
1504        GroupTopNMetrics {
1505            group_top_n_cache_miss_count: self
1506                .group_top_n_appendonly_cache_miss_count
1507                .with_guarded_label_values(label_list),
1508            group_top_n_total_query_cache_count: self
1509                .group_top_n_appendonly_total_query_cache_count
1510                .with_guarded_label_values(label_list),
1511            group_top_n_cached_entry_count: self
1512                .group_top_n_appendonly_cached_entry_count
1513                .with_guarded_label_values(label_list),
1514        }
1515    }
1516
1517    pub fn new_lookup_executor_metrics(
1518        &self,
1519        table_id: TableId,
1520        actor_id: ActorId,
1521        fragment_id: FragmentId,
1522    ) -> LookupExecutorMetrics {
1523        let label_list: &[&str; 3] = &[
1524            &table_id.to_string(),
1525            &actor_id.to_string(),
1526            &fragment_id.to_string(),
1527        ];
1528
1529        LookupExecutorMetrics {
1530            lookup_cache_miss_count: self
1531                .lookup_cache_miss_count
1532                .with_guarded_label_values(label_list),
1533            lookup_total_query_cache_count: self
1534                .lookup_total_query_cache_count
1535                .with_guarded_label_values(label_list),
1536            lookup_cached_entry_count: self
1537                .lookup_cached_entry_count
1538                .with_guarded_label_values(label_list),
1539        }
1540    }
1541
1542    pub fn new_hash_agg_metrics(
1543        &self,
1544        table_id: TableId,
1545        actor_id: ActorId,
1546        fragment_id: FragmentId,
1547    ) -> HashAggMetrics {
1548        let label_list: &[&str; 3] = &[
1549            &table_id.to_string(),
1550            &actor_id.to_string(),
1551            &fragment_id.to_string(),
1552        ];
1553        HashAggMetrics {
1554            agg_lookup_miss_count: self
1555                .agg_lookup_miss_count
1556                .with_guarded_label_values(label_list),
1557            agg_total_lookup_count: self
1558                .agg_total_lookup_count
1559                .with_guarded_label_values(label_list),
1560            agg_cached_entry_count: self
1561                .agg_cached_entry_count
1562                .with_guarded_label_values(label_list),
1563            agg_chunk_lookup_miss_count: self
1564                .agg_chunk_lookup_miss_count
1565                .with_guarded_label_values(label_list),
1566            agg_chunk_total_lookup_count: self
1567                .agg_chunk_total_lookup_count
1568                .with_guarded_label_values(label_list),
1569            agg_dirty_groups_count: self
1570                .agg_dirty_groups_count
1571                .with_guarded_label_values(label_list),
1572            agg_dirty_groups_heap_size: self
1573                .agg_dirty_groups_heap_size
1574                .with_guarded_label_values(label_list),
1575            agg_state_cache_lookup_count: self
1576                .agg_state_cache_lookup_count
1577                .with_guarded_label_values(label_list),
1578            agg_state_cache_miss_count: self
1579                .agg_state_cache_miss_count
1580                .with_guarded_label_values(label_list),
1581        }
1582    }
1583
1584    pub fn new_agg_distinct_dedup_metrics(
1585        &self,
1586        table_id: TableId,
1587        actor_id: ActorId,
1588        fragment_id: FragmentId,
1589    ) -> AggDistinctDedupMetrics {
1590        let label_list: &[&str; 3] = &[
1591            &table_id.to_string(),
1592            &actor_id.to_string(),
1593            &fragment_id.to_string(),
1594        ];
1595        AggDistinctDedupMetrics {
1596            agg_distinct_cache_miss_count: self
1597                .agg_distinct_cache_miss_count
1598                .with_guarded_label_values(label_list),
1599            agg_distinct_total_cache_count: self
1600                .agg_distinct_total_cache_count
1601                .with_guarded_label_values(label_list),
1602            agg_distinct_cached_entry_count: self
1603                .agg_distinct_cached_entry_count
1604                .with_guarded_label_values(label_list),
1605        }
1606    }
1607
1608    pub fn new_temporal_join_metrics(
1609        &self,
1610        table_id: TableId,
1611        actor_id: ActorId,
1612        fragment_id: FragmentId,
1613    ) -> TemporalJoinMetrics {
1614        let label_list: &[&str; 3] = &[
1615            &table_id.to_string(),
1616            &actor_id.to_string(),
1617            &fragment_id.to_string(),
1618        ];
1619        TemporalJoinMetrics {
1620            temporal_join_cache_miss_count: self
1621                .temporal_join_cache_miss_count
1622                .with_guarded_label_values(label_list),
1623            temporal_join_total_query_cache_count: self
1624                .temporal_join_total_query_cache_count
1625                .with_guarded_label_values(label_list),
1626            temporal_join_cached_entry_count: self
1627                .temporal_join_cached_entry_count
1628                .with_guarded_label_values(label_list),
1629        }
1630    }
1631
1632    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1633        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1634        BackfillMetrics {
1635            backfill_snapshot_read_row_count: self
1636                .backfill_snapshot_read_row_count
1637                .with_guarded_label_values(label_list),
1638            backfill_upstream_output_row_count: self
1639                .backfill_upstream_output_row_count
1640                .with_guarded_label_values(label_list),
1641        }
1642    }
1643
1644    pub fn new_cdc_backfill_metrics(
1645        &self,
1646        table_id: TableId,
1647        actor_id: ActorId,
1648    ) -> CdcBackfillMetrics {
1649        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1650        CdcBackfillMetrics {
1651            cdc_backfill_snapshot_read_row_count: self
1652                .cdc_backfill_snapshot_read_row_count
1653                .with_guarded_label_values(label_list),
1654            cdc_backfill_upstream_output_row_count: self
1655                .cdc_backfill_upstream_output_row_count
1656                .with_guarded_label_values(label_list),
1657        }
1658    }
1659
1660    pub fn new_over_window_metrics(
1661        &self,
1662        table_id: TableId,
1663        actor_id: ActorId,
1664        fragment_id: FragmentId,
1665    ) -> OverWindowMetrics {
1666        let label_list: &[&str; 3] = &[
1667            &table_id.to_string(),
1668            &actor_id.to_string(),
1669            &fragment_id.to_string(),
1670        ];
1671        OverWindowMetrics {
1672            over_window_cached_entry_count: self
1673                .over_window_cached_entry_count
1674                .with_guarded_label_values(label_list),
1675            over_window_cache_lookup_count: self
1676                .over_window_cache_lookup_count
1677                .with_guarded_label_values(label_list),
1678            over_window_cache_miss_count: self
1679                .over_window_cache_miss_count
1680                .with_guarded_label_values(label_list),
1681            over_window_range_cache_entry_count: self
1682                .over_window_range_cache_entry_count
1683                .with_guarded_label_values(label_list),
1684            over_window_range_cache_lookup_count: self
1685                .over_window_range_cache_lookup_count
1686                .with_guarded_label_values(label_list),
1687            over_window_range_cache_left_miss_count: self
1688                .over_window_range_cache_left_miss_count
1689                .with_guarded_label_values(label_list),
1690            over_window_range_cache_right_miss_count: self
1691                .over_window_range_cache_right_miss_count
1692                .with_guarded_label_values(label_list),
1693            over_window_accessed_entry_count: self
1694                .over_window_accessed_entry_count
1695                .with_guarded_label_values(label_list),
1696            over_window_compute_count: self
1697                .over_window_compute_count
1698                .with_guarded_label_values(label_list),
1699            over_window_same_output_count: self
1700                .over_window_same_output_count
1701                .with_guarded_label_values(label_list),
1702        }
1703    }
1704
1705    pub fn new_materialize_metrics(
1706        &self,
1707        table_id: TableId,
1708        actor_id: ActorId,
1709        fragment_id: FragmentId,
1710    ) -> MaterializeMetrics {
1711        let label_list: &[&str; 3] = &[
1712            &actor_id.to_string(),
1713            &table_id.to_string(),
1714            &fragment_id.to_string(),
1715        ];
1716        MaterializeMetrics {
1717            materialize_cache_hit_count: self
1718                .materialize_cache_hit_count
1719                .with_guarded_label_values(label_list),
1720            materialize_data_exist_count: self
1721                .materialize_data_exist_count
1722                .with_guarded_label_values(label_list),
1723            materialize_cache_total_count: self
1724                .materialize_cache_total_count
1725                .with_guarded_label_values(label_list),
1726            materialize_input_row_count: self
1727                .materialize_input_row_count
1728                .with_guarded_label_values(label_list),
1729            materialize_current_epoch: self
1730                .materialize_current_epoch
1731                .with_guarded_label_values(label_list),
1732        }
1733    }
1734}
1735
1736pub(crate) struct ActorInputMetrics {
1737    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1738    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1739}
1740
1741/// Tokio metrics for actors
1742pub struct ActorMetrics {
1743    pub actor_execution_time: LabelGuardedGauge,
1744    pub actor_scheduled_duration: LabelGuardedGauge,
1745    pub actor_scheduled_cnt: LabelGuardedIntGauge,
1746    pub actor_fast_poll_duration: LabelGuardedGauge,
1747    pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1748    pub actor_slow_poll_duration: LabelGuardedGauge,
1749    pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1750    pub actor_poll_duration: LabelGuardedGauge,
1751    pub actor_poll_cnt: LabelGuardedIntGauge,
1752    pub actor_idle_duration: LabelGuardedGauge,
1753    pub actor_idle_cnt: LabelGuardedIntGauge,
1754}
1755
1756pub struct SinkExecutorMetrics {
1757    pub sink_input_row_count: LabelGuardedIntCounter,
1758    pub sink_input_bytes: LabelGuardedIntCounter,
1759    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1760}
1761
1762pub struct MaterializeMetrics {
1763    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1764    pub materialize_data_exist_count: LabelGuardedIntCounter,
1765    pub materialize_cache_total_count: LabelGuardedIntCounter,
1766    pub materialize_input_row_count: LabelGuardedIntCounter,
1767    pub materialize_current_epoch: LabelGuardedIntGauge,
1768}
1769
1770pub struct GroupTopNMetrics {
1771    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1772    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1773    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1774}
1775
1776pub struct LookupExecutorMetrics {
1777    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1778    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1779    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1780}
1781
1782pub struct HashAggMetrics {
1783    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1784    pub agg_total_lookup_count: LabelGuardedIntCounter,
1785    pub agg_cached_entry_count: LabelGuardedIntGauge,
1786    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1787    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1788    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1789    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1790    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1791    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1792}
1793
1794pub struct AggDistinctDedupMetrics {
1795    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1796    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1797    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1798}
1799
1800pub struct TemporalJoinMetrics {
1801    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1802    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1803    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1804}
1805
1806pub struct BackfillMetrics {
1807    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1808    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1809}
1810
1811#[derive(Clone)]
1812pub struct CdcBackfillMetrics {
1813    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1814    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1815}
1816
1817pub struct OverWindowMetrics {
1818    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1819    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1820    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1821    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1822    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1823    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1824    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1825    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1826    pub over_window_compute_count: LabelGuardedIntCounter,
1827    pub over_window_same_output_count: LabelGuardedIntCounter,
1828}