risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26    LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46    pub level: MetricLevel,
47
48    // Executor metrics (disabled by default)
49    pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51    // Profiling Metrics:
52    // Aggregated per operator rather than per actor.
53    // These are purely in-memory, never collected by prometheus.
54    pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55    pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57    // Streaming actor metrics from tokio (disabled by default)
58    actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59    actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60    actor_poll_duration: RelabeledGuardedIntCounterVec,
61    actor_poll_cnt: RelabeledGuardedIntCounterVec,
62    actor_idle_duration: RelabeledGuardedIntCounterVec,
63    actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65    // Streaming actor
66    pub actor_count: LabelGuardedIntGaugeVec,
67    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71    // Source
72    pub source_output_row_count: LabelGuardedIntCounterVec,
73    pub source_split_change_count: LabelGuardedIntCounterVec,
74    pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76    // Sink
77    sink_input_row_count: LabelGuardedIntCounterVec,
78    sink_input_bytes: LabelGuardedIntCounterVec,
79    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81    // Exchange (see also `compute::ExchangeServiceMetrics`)
82    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84    // Streaming Merge (We breakout this metric from `barrier_align_duration` because
85    // the alignment happens on different levels)
86    pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88    // Backpressure
89    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92    // Streaming Join
93    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94    pub join_lookup_total_count: LabelGuardedIntCounterVec,
95    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97    pub join_match_duration_ns: LabelGuardedIntCounterVec,
98    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101    // Streaming Join, Streaming Dynamic Filter and Streaming Union
102    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104    // Streaming Aggregation
105    agg_lookup_miss_count: LabelGuardedIntCounterVec,
106    agg_total_lookup_count: LabelGuardedIntCounterVec,
107    agg_cached_entry_count: LabelGuardedIntGaugeVec,
108    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118    // Streaming TopN
119    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122    // TODO(rc): why not just use the above three?
123    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127    // Lookup executor
128    lookup_cache_miss_count: LabelGuardedIntCounterVec,
129    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132    // temporal join
133    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137    // Backfill
138    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141    // CDC Backfill
142    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // Snapshot Backfill
146    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148    // Over Window
149    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151    over_window_cache_miss_count: LabelGuardedIntCounterVec,
152    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157    over_window_compute_count: LabelGuardedIntCounterVec,
158    over_window_same_output_count: LabelGuardedIntCounterVec,
159
160    /// The duration from receipt of barrier to all actors collection.
161    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
162    /// to flow through the graph.
163    pub barrier_inflight_latency: Histogram,
164    /// The duration of sync to storage.
165    pub barrier_sync_latency: Histogram,
166    pub barrier_batch_size: Histogram,
167    /// The progress made by the earliest in-flight barriers in the local barrier manager.
168    pub barrier_manager_progress: IntCounter,
169
170    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180
181    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
182    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
183    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
184    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
185    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
189    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
190    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
191    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
192
193    // Memory management
194    pub lru_runtime_loop_count: IntCounter,
195    pub lru_latest_sequence: IntGauge,
196    pub lru_watermark_sequence: IntGauge,
197    pub lru_eviction_policy: IntGauge,
198    pub jemalloc_allocated_bytes: IntGauge,
199    pub jemalloc_active_bytes: IntGauge,
200    pub jemalloc_resident_bytes: IntGauge,
201    pub jemalloc_metadata_bytes: IntGauge,
202    pub jvm_allocated_bytes: IntGauge,
203    pub jvm_active_bytes: IntGauge,
204    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
205
206    // Materialized view
207    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
208    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
209    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
210    materialize_input_row_count: RelabeledGuardedIntCounterVec,
211    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
212
213    // PostgreSQL CDC LSN monitoring
214    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
215    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
216
217    // MySQL CDC binlog monitoring
218    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
219    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
220
221    // Gap Fill
222    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228    GLOBAL_STREAMING_METRICS
229        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230        .clone()
231}
232
233impl StreamingMetrics {
234    fn new(registry: &Registry, level: MetricLevel) -> Self {
235        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236            "stream_executor_row_count",
237            "Total number of rows that have been output from each executor",
238            &["actor_id", "fragment_id", "executor_identity"],
239            registry
240        )
241        .unwrap()
242        .relabel_debug_1(level);
243
244        let stream_node_output_row_count = CountMap::new();
245        let stream_node_output_blocking_duration_ns = CountMap::new();
246
247        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248            "stream_source_output_rows_counts",
249            "Total number of rows that have been output from source",
250            &["source_id", "source_name", "actor_id", "fragment_id"],
251            registry
252        )
253        .unwrap();
254
255        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256            "stream_source_split_change_event_count",
257            "Total number of split change events that have been operated by source",
258            &["source_id", "source_name", "actor_id", "fragment_id"],
259            registry
260        )
261        .unwrap();
262
263        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264            "stream_source_backfill_rows_counts",
265            "Total number of rows that have been backfilled for source",
266            &["source_id", "source_name", "actor_id", "fragment_id"],
267            registry
268        )
269        .unwrap();
270
271        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272            "stream_sink_input_row_count",
273            "Total number of rows streamed into sink executors",
274            &["sink_id", "actor_id", "fragment_id"],
275            registry
276        )
277        .unwrap();
278
279        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280            "stream_sink_input_bytes",
281            "Total size of chunks streamed into sink executors",
282            &["sink_id", "actor_id", "fragment_id"],
283            registry
284        )
285        .unwrap();
286
287        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288            "stream_mview_input_row_count",
289            "Total number of rows streamed into materialize executors",
290            &["actor_id", "table_id", "fragment_id"],
291            registry
292        )
293        .unwrap()
294        .relabel_debug_1(level);
295
296        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297            "stream_mview_current_epoch",
298            "The current epoch of the materialized executor",
299            &["actor_id", "table_id", "fragment_id"],
300            registry
301        )
302        .unwrap()
303        .relabel_debug_1(level);
304
305        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306            "stream_pg_cdc_state_table_lsn",
307            "Current LSN value stored in PostgreSQL CDC state table",
308            &["source_id"],
309            registry,
310        )
311        .unwrap();
312
313        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314            "stream_pg_cdc_jni_commit_offset_lsn",
315            "LSN value when JNI commit offset is called for PostgreSQL CDC",
316            &["source_id"],
317            registry,
318        )
319        .unwrap();
320
321        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
322            "stream_mysql_cdc_state_binlog_file_seq",
323            "Current binlog file sequence number stored in MySQL CDC state table",
324            &["source_id"],
325            registry,
326        )
327        .unwrap();
328
329        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
330            "stream_mysql_cdc_state_binlog_position",
331            "Current binlog position stored in MySQL CDC state table",
332            &["source_id"],
333            registry,
334        )
335        .unwrap();
336
337        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
338            "stream_sink_chunk_buffer_size",
339            "Total size of chunks buffered in a barrier",
340            &["sink_id", "actor_id", "fragment_id"],
341            registry
342        )
343        .unwrap();
344        let actor_output_buffer_blocking_duration_ns =
345            register_guarded_int_counter_vec_with_registry!(
346                "stream_actor_output_buffer_blocking_duration_ns",
347                "Total blocking duration (ns) of output buffer",
348                &["actor_id", "fragment_id", "downstream_fragment_id"],
349                registry
350            )
351            .unwrap()
352            // mask the first label `actor_id` if the level is less verbose than `Debug`
353            .relabel_debug_1(level);
354
355        let actor_input_buffer_blocking_duration_ns =
356            register_guarded_int_counter_vec_with_registry!(
357                "stream_actor_input_buffer_blocking_duration_ns",
358                "Total blocking duration (ns) of input buffer",
359                &["actor_id", "fragment_id", "upstream_fragment_id"],
360                registry
361            )
362            .unwrap()
363            // mask the first label `actor_id` if the level is less verbose than `Debug`
364            .relabel_debug_1(level);
365
366        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
367            "stream_exchange_frag_recv_size",
368            "Total size of messages that have been received from upstream Fragment",
369            &["up_fragment_id", "down_fragment_id"],
370            registry
371        )
372        .unwrap();
373
374        let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
375            "stream_actor_poll_duration",
376            "tokio's metrics",
377            &["actor_id", "fragment_id"],
378            registry
379        )
380        .unwrap()
381        .relabel_debug_1(level);
382
383        let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
384            "stream_actor_poll_cnt",
385            "tokio's metrics",
386            &["actor_id", "fragment_id"],
387            registry
388        )
389        .unwrap()
390        .relabel_debug_1(level);
391
392        let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
393            "stream_actor_scheduled_duration",
394            "tokio's metrics",
395            &["actor_id", "fragment_id"],
396            registry
397        )
398        .unwrap()
399        .relabel_debug_1(level);
400
401        let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
402            "stream_actor_scheduled_cnt",
403            "tokio's metrics",
404            &["actor_id", "fragment_id"],
405            registry
406        )
407        .unwrap()
408        .relabel_debug_1(level);
409
410        let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
411            "stream_actor_idle_duration",
412            "tokio's metrics",
413            &["actor_id", "fragment_id"],
414            registry
415        )
416        .unwrap()
417        .relabel_debug_1(level);
418
419        let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
420            "stream_actor_idle_cnt",
421            "tokio's metrics",
422            &["actor_id", "fragment_id"],
423            registry
424        )
425        .unwrap()
426        .relabel_debug_1(level);
427
428        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
429            "stream_actor_in_record_cnt",
430            "Total number of rows actor received",
431            &["actor_id", "fragment_id", "upstream_fragment_id"],
432            registry
433        )
434        .unwrap()
435        .relabel_debug_1(level);
436
437        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
438            "stream_actor_out_record_cnt",
439            "Total number of rows actor sent",
440            &["actor_id", "fragment_id"],
441            registry
442        )
443        .unwrap()
444        .relabel_debug_1(level);
445
446        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
447            "stream_actor_current_epoch",
448            "Current epoch of actor",
449            &["actor_id", "fragment_id"],
450            registry
451        )
452        .unwrap()
453        .relabel_debug_1(level);
454
455        let actor_count = register_guarded_int_gauge_vec_with_registry!(
456            "stream_actor_count",
457            "Total number of actors (parallelism)",
458            &["fragment_id"],
459            registry
460        )
461        .unwrap();
462
463        let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
464            "stream_merge_barrier_align_duration_ns",
465            "Total merge barrier alignment duration (ns)",
466            &["actor_id", "fragment_id"],
467            registry
468        )
469        .unwrap()
470        .relabel_debug_1(level);
471
472        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
473            "stream_join_lookup_miss_count",
474            "Join executor lookup miss duration",
475            &["side", "join_table_id", "actor_id", "fragment_id"],
476            registry
477        )
478        .unwrap();
479
480        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
481            "stream_join_lookup_total_count",
482            "Join executor lookup total operation",
483            &["side", "join_table_id", "actor_id", "fragment_id"],
484            registry
485        )
486        .unwrap();
487
488        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
489            "stream_join_insert_cache_miss_count",
490            "Join executor cache miss when insert operation",
491            &["side", "join_table_id", "actor_id", "fragment_id"],
492            registry
493        )
494        .unwrap();
495
496        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
497            "stream_join_actor_input_waiting_duration_ns",
498            "Total waiting duration (ns) of input buffer of join actor",
499            &["actor_id", "fragment_id"],
500            registry
501        )
502        .unwrap();
503
504        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
505            "stream_join_match_duration_ns",
506            "Matching duration for each side",
507            &["actor_id", "fragment_id", "side"],
508            registry
509        )
510        .unwrap();
511
512        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
513            "stream_barrier_align_duration_ns",
514            "Duration of join align barrier",
515            &["actor_id", "fragment_id", "wait_side", "executor"],
516            registry
517        )
518        .unwrap()
519        .relabel_debug_1(level);
520
521        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
522            "stream_join_cached_entry_count",
523            "Number of cached entries in streaming join operators",
524            &["actor_id", "fragment_id", "side"],
525            registry
526        )
527        .unwrap();
528
529        let join_matched_join_keys_opts = histogram_opts!(
530            "stream_join_matched_join_keys",
531            "The number of keys matched in the opposite side",
532            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
533        );
534
535        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
536            join_matched_join_keys_opts,
537            &["actor_id", "fragment_id", "table_id"],
538            registry
539        )
540        .unwrap()
541        .relabel_debug_1(level);
542
543        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
544            "stream_agg_lookup_miss_count",
545            "Aggregation executor lookup miss duration",
546            &["table_id", "actor_id", "fragment_id"],
547            registry
548        )
549        .unwrap();
550
551        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
552            "stream_agg_lookup_total_count",
553            "Aggregation executor lookup total operation",
554            &["table_id", "actor_id", "fragment_id"],
555            registry
556        )
557        .unwrap();
558
559        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
560            "stream_agg_distinct_cache_miss_count",
561            "Aggregation executor dinsinct miss duration",
562            &["table_id", "actor_id", "fragment_id"],
563            registry
564        )
565        .unwrap();
566
567        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
568            "stream_agg_distinct_total_cache_count",
569            "Aggregation executor distinct total operation",
570            &["table_id", "actor_id", "fragment_id"],
571            registry
572        )
573        .unwrap();
574
575        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
576            "stream_agg_distinct_cached_entry_count",
577            "Total entry counts in distinct aggregation executor cache",
578            &["table_id", "actor_id", "fragment_id"],
579            registry
580        )
581        .unwrap();
582
583        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
584            "stream_agg_dirty_groups_count",
585            "Total dirty group counts in aggregation executor",
586            &["table_id", "actor_id", "fragment_id"],
587            registry
588        )
589        .unwrap();
590
591        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
592            "stream_agg_dirty_groups_heap_size",
593            "Total dirty group heap size in aggregation executor",
594            &["table_id", "actor_id", "fragment_id"],
595            registry
596        )
597        .unwrap();
598
599        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
600            "stream_agg_state_cache_lookup_count",
601            "Aggregation executor state cache lookup count",
602            &["table_id", "actor_id", "fragment_id"],
603            registry
604        )
605        .unwrap();
606
607        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
608            "stream_agg_state_cache_miss_count",
609            "Aggregation executor state cache miss count",
610            &["table_id", "actor_id", "fragment_id"],
611            registry
612        )
613        .unwrap();
614
615        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
616            "stream_group_top_n_cache_miss_count",
617            "Group top n executor cache miss count",
618            &["table_id", "actor_id", "fragment_id"],
619            registry
620        )
621        .unwrap();
622
623        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
624            "stream_group_top_n_total_query_cache_count",
625            "Group top n executor query cache total count",
626            &["table_id", "actor_id", "fragment_id"],
627            registry
628        )
629        .unwrap();
630
631        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
632            "stream_group_top_n_cached_entry_count",
633            "Total entry counts in group top n executor cache",
634            &["table_id", "actor_id", "fragment_id"],
635            registry
636        )
637        .unwrap();
638
639        let group_top_n_appendonly_cache_miss_count =
640            register_guarded_int_counter_vec_with_registry!(
641                "stream_group_top_n_appendonly_cache_miss_count",
642                "Group top n appendonly executor cache miss count",
643                &["table_id", "actor_id", "fragment_id"],
644                registry
645            )
646            .unwrap();
647
648        let group_top_n_appendonly_total_query_cache_count =
649            register_guarded_int_counter_vec_with_registry!(
650                "stream_group_top_n_appendonly_total_query_cache_count",
651                "Group top n appendonly executor total cache count",
652                &["table_id", "actor_id", "fragment_id"],
653                registry
654            )
655            .unwrap();
656
657        let group_top_n_appendonly_cached_entry_count =
658            register_guarded_int_gauge_vec_with_registry!(
659                "stream_group_top_n_appendonly_cached_entry_count",
660                "Total entry counts in group top n appendonly executor cache",
661                &["table_id", "actor_id", "fragment_id"],
662                registry
663            )
664            .unwrap();
665
666        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
667            "stream_lookup_cache_miss_count",
668            "Lookup executor cache miss count",
669            &["table_id", "actor_id", "fragment_id"],
670            registry
671        )
672        .unwrap();
673
674        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
675            "stream_lookup_total_query_cache_count",
676            "Lookup executor query cache total count",
677            &["table_id", "actor_id", "fragment_id"],
678            registry
679        )
680        .unwrap();
681
682        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
683            "stream_lookup_cached_entry_count",
684            "Total entry counts in lookup executor cache",
685            &["table_id", "actor_id", "fragment_id"],
686            registry
687        )
688        .unwrap();
689
690        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
691            "stream_temporal_join_cache_miss_count",
692            "Temporal join executor cache miss count",
693            &["table_id", "actor_id", "fragment_id"],
694            registry
695        )
696        .unwrap();
697
698        let temporal_join_total_query_cache_count =
699            register_guarded_int_counter_vec_with_registry!(
700                "stream_temporal_join_total_query_cache_count",
701                "Temporal join executor query cache total count",
702                &["table_id", "actor_id", "fragment_id"],
703                registry
704            )
705            .unwrap();
706
707        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
708            "stream_temporal_join_cached_entry_count",
709            "Total entry count in temporal join executor cache",
710            &["table_id", "actor_id", "fragment_id"],
711            registry
712        )
713        .unwrap();
714
715        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
716            "stream_agg_cached_entry_count",
717            "Number of cached keys in streaming aggregation operators",
718            &["table_id", "actor_id", "fragment_id"],
719            registry
720        )
721        .unwrap();
722
723        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
724            "stream_agg_chunk_lookup_miss_count",
725            "Aggregation executor chunk-level lookup miss duration",
726            &["table_id", "actor_id", "fragment_id"],
727            registry
728        )
729        .unwrap();
730
731        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
732            "stream_agg_chunk_lookup_total_count",
733            "Aggregation executor chunk-level lookup total operation",
734            &["table_id", "actor_id", "fragment_id"],
735            registry
736        )
737        .unwrap();
738
739        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
740            "stream_backfill_snapshot_read_row_count",
741            "Total number of rows that have been read from the backfill snapshot",
742            &["table_id", "actor_id"],
743            registry
744        )
745        .unwrap();
746
747        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
748            "stream_backfill_upstream_output_row_count",
749            "Total number of rows that have been output from the backfill upstream",
750            &["table_id", "actor_id"],
751            registry
752        )
753        .unwrap();
754
755        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
756            "stream_cdc_backfill_snapshot_read_row_count",
757            "Total number of rows that have been read from the cdc_backfill snapshot",
758            &["table_id", "actor_id"],
759            registry
760        )
761        .unwrap();
762
763        let cdc_backfill_upstream_output_row_count =
764            register_guarded_int_counter_vec_with_registry!(
765                "stream_cdc_backfill_upstream_output_row_count",
766                "Total number of rows that have been output from the cdc_backfill upstream",
767                &["table_id", "actor_id"],
768                registry
769            )
770            .unwrap();
771
772        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
773            "stream_snapshot_backfill_consume_snapshot_row_count",
774            "Total number of rows that have been output from snapshot backfill",
775            &["table_id", "actor_id", "stage"],
776            registry
777        )
778        .unwrap();
779
780        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
781            "stream_over_window_cached_entry_count",
782            "Total entry (partition) count in over window executor cache",
783            &["table_id", "actor_id", "fragment_id"],
784            registry
785        )
786        .unwrap();
787
788        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
789            "stream_over_window_cache_lookup_count",
790            "Over window executor cache lookup count",
791            &["table_id", "actor_id", "fragment_id"],
792            registry
793        )
794        .unwrap();
795
796        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
797            "stream_over_window_cache_miss_count",
798            "Over window executor cache miss count",
799            &["table_id", "actor_id", "fragment_id"],
800            registry
801        )
802        .unwrap();
803
804        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
805            "stream_over_window_range_cache_entry_count",
806            "Over window partition range cache entry count",
807            &["table_id", "actor_id", "fragment_id"],
808            registry,
809        )
810        .unwrap();
811
812        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
813            "stream_over_window_range_cache_lookup_count",
814            "Over window partition range cache lookup count",
815            &["table_id", "actor_id", "fragment_id"],
816            registry
817        )
818        .unwrap();
819
820        let over_window_range_cache_left_miss_count =
821            register_guarded_int_counter_vec_with_registry!(
822                "stream_over_window_range_cache_left_miss_count",
823                "Over window partition range cache left miss count",
824                &["table_id", "actor_id", "fragment_id"],
825                registry
826            )
827            .unwrap();
828
829        let over_window_range_cache_right_miss_count =
830            register_guarded_int_counter_vec_with_registry!(
831                "stream_over_window_range_cache_right_miss_count",
832                "Over window partition range cache right miss count",
833                &["table_id", "actor_id", "fragment_id"],
834                registry
835            )
836            .unwrap();
837
838        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
839            "stream_over_window_accessed_entry_count",
840            "Over window accessed entry count",
841            &["table_id", "actor_id", "fragment_id"],
842            registry
843        )
844        .unwrap();
845
846        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
847            "stream_over_window_compute_count",
848            "Over window compute count",
849            &["table_id", "actor_id", "fragment_id"],
850            registry
851        )
852        .unwrap();
853
854        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
855            "stream_over_window_same_output_count",
856            "Over window same output count",
857            &["table_id", "actor_id", "fragment_id"],
858            registry
859        )
860        .unwrap();
861
862        let opts = histogram_opts!(
863            "stream_barrier_inflight_duration_seconds",
864            "barrier_inflight_latency",
865            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
866        );
867        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
868
869        let opts = histogram_opts!(
870            "stream_barrier_sync_storage_duration_seconds",
871            "barrier_sync_latency",
872            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
873        );
874        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
875
876        let opts = histogram_opts!(
877            "stream_barrier_batch_size",
878            "barrier_batch_size",
879            exponential_buckets(1.0, 2.0, 8).unwrap()
880        );
881        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
882
883        let barrier_manager_progress = register_int_counter_with_registry!(
884            "stream_barrier_manager_progress",
885            "The number of actors that have processed the earliest in-flight barriers",
886            registry
887        )
888        .unwrap();
889
890        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
891            "sync_kv_log_store_wait_next_poll_ns",
892            "Total duration (ns) of waiting for next poll",
893            &["actor_id", "target", "fragment_id", "relation"],
894            registry
895        )
896        .unwrap();
897
898        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
899            "sync_kv_log_store_read_count",
900            "read row count throughput of sync_kv log store",
901            &["type", "actor_id", "target", "fragment_id", "relation"],
902            registry
903        )
904        .unwrap();
905
906        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
907            "sync_kv_log_store_read_size",
908            "read size throughput of sync_kv log store",
909            &["type", "actor_id", "target", "fragment_id", "relation"],
910            registry
911        )
912        .unwrap();
913
914        let sync_kv_log_store_write_pause_duration_ns =
915            register_guarded_int_counter_vec_with_registry!(
916                "sync_kv_log_store_write_pause_duration_ns",
917                "Duration (ns) of sync_kv log store write pause",
918                &["actor_id", "target", "fragment_id", "relation"],
919                registry
920            )
921            .unwrap();
922
923        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
924            "sync_kv_log_store_state",
925            "clean/unclean state transition for sync_kv log store",
926            &["state", "actor_id", "target", "fragment_id", "relation"],
927            registry
928        )
929        .unwrap();
930
931        let sync_kv_log_store_storage_write_count =
932            register_guarded_int_counter_vec_with_registry!(
933                "sync_kv_log_store_storage_write_count",
934                "Write row count throughput of sync_kv log store",
935                &["actor_id", "target", "fragment_id", "relation"],
936                registry
937            )
938            .unwrap();
939
940        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
941            "sync_kv_log_store_storage_write_size",
942            "Write size throughput of sync_kv log store",
943            &["actor_id", "target", "fragment_id", "relation"],
944            registry
945        )
946        .unwrap();
947
948        let sync_kv_log_store_buffer_unconsumed_item_count =
949            register_guarded_int_gauge_vec_with_registry!(
950                "sync_kv_log_store_buffer_unconsumed_item_count",
951                "Number of Unconsumed Item in buffer",
952                &["actor_id", "target", "fragment_id", "relation"],
953                registry
954            )
955            .unwrap();
956
957        let sync_kv_log_store_buffer_unconsumed_row_count =
958            register_guarded_int_gauge_vec_with_registry!(
959                "sync_kv_log_store_buffer_unconsumed_row_count",
960                "Number of Unconsumed Row in buffer",
961                &["actor_id", "target", "fragment_id", "relation"],
962                registry
963            )
964            .unwrap();
965
966        let sync_kv_log_store_buffer_unconsumed_epoch_count =
967            register_guarded_int_gauge_vec_with_registry!(
968                "sync_kv_log_store_buffer_unconsumed_epoch_count",
969                "Number of Unconsumed Epoch in buffer",
970                &["actor_id", "target", "fragment_id", "relation"],
971                registry
972            )
973            .unwrap();
974
975        let sync_kv_log_store_buffer_unconsumed_min_epoch =
976            register_guarded_int_gauge_vec_with_registry!(
977                "sync_kv_log_store_buffer_unconsumed_min_epoch",
978                "Number of Unconsumed Epoch in buffer",
979                &["actor_id", "target", "fragment_id", "relation"],
980                registry
981            )
982            .unwrap();
983
984        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
985            "kv_log_store_storage_write_count",
986            "Write row count throughput of kv log store",
987            &["actor_id", "connector", "sink_id", "sink_name"],
988            registry
989        )
990        .unwrap();
991
992        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
993            "kv_log_store_storage_write_size",
994            "Write size throughput of kv log store",
995            &["actor_id", "connector", "sink_id", "sink_name"],
996            registry
997        )
998        .unwrap();
999
1000        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1001            "kv_log_store_storage_read_count",
1002            "Write row count throughput of kv log store",
1003            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1004            registry
1005        )
1006        .unwrap();
1007
1008        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1009            "kv_log_store_storage_read_size",
1010            "Write size throughput of kv log store",
1011            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1012            registry
1013        )
1014        .unwrap();
1015
1016        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1017            "kv_log_store_rewind_count",
1018            "Kv log store rewind rate",
1019            &["actor_id", "connector", "sink_id", "sink_name"],
1020            registry
1021        )
1022        .unwrap();
1023
1024        let kv_log_store_rewind_delay_opts = {
1025            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1026            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1027                - REWIND_BASE_DELAY.as_secs_f64().log2())
1028            .ceil() as usize;
1029            let buckets = exponential_buckets(
1030                REWIND_BASE_DELAY.as_secs_f64(),
1031                REWIND_BACKOFF_FACTOR as _,
1032                bucket_count,
1033            )
1034            .unwrap();
1035            histogram_opts!(
1036                "kv_log_store_rewind_delay",
1037                "Kv log store rewind delay",
1038                buckets,
1039            )
1040        };
1041
1042        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1043            kv_log_store_rewind_delay_opts,
1044            &["actor_id", "connector", "sink_id", "sink_name"],
1045            registry
1046        )
1047        .unwrap();
1048
1049        let kv_log_store_buffer_unconsumed_item_count =
1050            register_guarded_int_gauge_vec_with_registry!(
1051                "kv_log_store_buffer_unconsumed_item_count",
1052                "Number of Unconsumed Item in buffer",
1053                &["actor_id", "connector", "sink_id", "sink_name"],
1054                registry
1055            )
1056            .unwrap();
1057
1058        let kv_log_store_buffer_unconsumed_row_count =
1059            register_guarded_int_gauge_vec_with_registry!(
1060                "kv_log_store_buffer_unconsumed_row_count",
1061                "Number of Unconsumed Row in buffer",
1062                &["actor_id", "connector", "sink_id", "sink_name"],
1063                registry
1064            )
1065            .unwrap();
1066
1067        let kv_log_store_buffer_unconsumed_epoch_count =
1068            register_guarded_int_gauge_vec_with_registry!(
1069                "kv_log_store_buffer_unconsumed_epoch_count",
1070                "Number of Unconsumed Epoch in buffer",
1071                &["actor_id", "connector", "sink_id", "sink_name"],
1072                registry
1073            )
1074            .unwrap();
1075
1076        let kv_log_store_buffer_unconsumed_min_epoch =
1077            register_guarded_int_gauge_vec_with_registry!(
1078                "kv_log_store_buffer_unconsumed_min_epoch",
1079                "Number of Unconsumed Epoch in buffer",
1080                &["actor_id", "connector", "sink_id", "sink_name"],
1081                registry
1082            )
1083            .unwrap();
1084
1085        let lru_runtime_loop_count = register_int_counter_with_registry!(
1086            "lru_runtime_loop_count",
1087            "The counts of the eviction loop in LRU manager per second",
1088            registry
1089        )
1090        .unwrap();
1091
1092        let lru_latest_sequence = register_int_gauge_with_registry!(
1093            "lru_latest_sequence",
1094            "Current LRU global sequence",
1095            registry,
1096        )
1097        .unwrap();
1098
1099        let lru_watermark_sequence = register_int_gauge_with_registry!(
1100            "lru_watermark_sequence",
1101            "Current LRU watermark sequence",
1102            registry,
1103        )
1104        .unwrap();
1105
1106        let lru_eviction_policy = register_int_gauge_with_registry!(
1107            "lru_eviction_policy",
1108            "Current LRU eviction policy",
1109            registry,
1110        )
1111        .unwrap();
1112
1113        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1114            "jemalloc_allocated_bytes",
1115            "The allocated memory jemalloc, got from jemalloc_ctl",
1116            registry
1117        )
1118        .unwrap();
1119
1120        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1121            "jemalloc_active_bytes",
1122            "The active memory jemalloc, got from jemalloc_ctl",
1123            registry
1124        )
1125        .unwrap();
1126
1127        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1128            "jemalloc_resident_bytes",
1129            "The active memory jemalloc, got from jemalloc_ctl",
1130            registry
1131        )
1132        .unwrap();
1133
1134        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1135            "jemalloc_metadata_bytes",
1136            "The active memory jemalloc, got from jemalloc_ctl",
1137            registry
1138        )
1139        .unwrap();
1140
1141        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1142            "jvm_allocated_bytes",
1143            "The allocated jvm memory",
1144            registry
1145        )
1146        .unwrap();
1147
1148        let jvm_active_bytes = register_int_gauge_with_registry!(
1149            "jvm_active_bytes",
1150            "The active jvm memory",
1151            registry
1152        )
1153        .unwrap();
1154
1155        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1156            "stream_materialize_cache_hit_count",
1157            "Materialize executor cache hit count",
1158            &["actor_id", "table_id", "fragment_id"],
1159            registry
1160        )
1161        .unwrap()
1162        .relabel_debug_1(level);
1163
1164        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1165            "stream_materialize_data_exist_count",
1166            "Materialize executor data exist count",
1167            &["actor_id", "table_id", "fragment_id"],
1168            registry
1169        )
1170        .unwrap()
1171        .relabel_debug_1(level);
1172
1173        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1174            "stream_materialize_cache_total_count",
1175            "Materialize executor cache total operation",
1176            &["actor_id", "table_id", "fragment_id"],
1177            registry
1178        )
1179        .unwrap()
1180        .relabel_debug_1(level);
1181
1182        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1183            "stream_memory_usage",
1184            "Memory usage for stream executors",
1185            &["actor_id", "table_id", "desc"],
1186            registry
1187        )
1188        .unwrap()
1189        .relabel_debug_1(level);
1190
1191        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1192            "gap_fill_generated_rows_count",
1193            "Total number of rows generated by gap fill executor",
1194            &["actor_id", "fragment_id"],
1195            registry
1196        )
1197        .unwrap()
1198        .relabel_debug_1(level);
1199
1200        Self {
1201            level,
1202            executor_row_count,
1203            mem_stream_node_output_row_count: stream_node_output_row_count,
1204            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1205            actor_scheduled_duration,
1206            actor_scheduled_cnt,
1207            actor_poll_duration,
1208            actor_poll_cnt,
1209            actor_idle_duration,
1210            actor_idle_cnt,
1211            actor_count,
1212            actor_in_record_cnt,
1213            actor_out_record_cnt,
1214            actor_current_epoch,
1215            source_output_row_count,
1216            source_split_change_count,
1217            source_backfill_row_count,
1218            sink_input_row_count,
1219            sink_input_bytes,
1220            sink_chunk_buffer_size,
1221            exchange_frag_recv_size,
1222            merge_barrier_align_duration,
1223            actor_output_buffer_blocking_duration_ns,
1224            actor_input_buffer_blocking_duration_ns,
1225            join_lookup_miss_count,
1226            join_lookup_total_count,
1227            join_insert_cache_miss_count,
1228            join_actor_input_waiting_duration_ns,
1229            join_match_duration_ns,
1230            join_cached_entry_count,
1231            join_matched_join_keys,
1232            barrier_align_duration,
1233            agg_lookup_miss_count,
1234            agg_total_lookup_count,
1235            agg_cached_entry_count,
1236            agg_chunk_lookup_miss_count,
1237            agg_chunk_total_lookup_count,
1238            agg_dirty_groups_count,
1239            agg_dirty_groups_heap_size,
1240            agg_distinct_cache_miss_count,
1241            agg_distinct_total_cache_count,
1242            agg_distinct_cached_entry_count,
1243            agg_state_cache_lookup_count,
1244            agg_state_cache_miss_count,
1245            group_top_n_cache_miss_count,
1246            group_top_n_total_query_cache_count,
1247            group_top_n_cached_entry_count,
1248            group_top_n_appendonly_cache_miss_count,
1249            group_top_n_appendonly_total_query_cache_count,
1250            group_top_n_appendonly_cached_entry_count,
1251            lookup_cache_miss_count,
1252            lookup_total_query_cache_count,
1253            lookup_cached_entry_count,
1254            temporal_join_cache_miss_count,
1255            temporal_join_total_query_cache_count,
1256            temporal_join_cached_entry_count,
1257            backfill_snapshot_read_row_count,
1258            backfill_upstream_output_row_count,
1259            cdc_backfill_snapshot_read_row_count,
1260            cdc_backfill_upstream_output_row_count,
1261            snapshot_backfill_consume_row_count,
1262            over_window_cached_entry_count,
1263            over_window_cache_lookup_count,
1264            over_window_cache_miss_count,
1265            over_window_range_cache_entry_count,
1266            over_window_range_cache_lookup_count,
1267            over_window_range_cache_left_miss_count,
1268            over_window_range_cache_right_miss_count,
1269            over_window_accessed_entry_count,
1270            over_window_compute_count,
1271            over_window_same_output_count,
1272            barrier_inflight_latency,
1273            barrier_sync_latency,
1274            barrier_batch_size,
1275            barrier_manager_progress,
1276            kv_log_store_storage_write_count,
1277            kv_log_store_storage_write_size,
1278            kv_log_store_rewind_count,
1279            kv_log_store_rewind_delay,
1280            kv_log_store_storage_read_count,
1281            kv_log_store_storage_read_size,
1282            kv_log_store_buffer_unconsumed_item_count,
1283            kv_log_store_buffer_unconsumed_row_count,
1284            kv_log_store_buffer_unconsumed_epoch_count,
1285            kv_log_store_buffer_unconsumed_min_epoch,
1286            sync_kv_log_store_read_count,
1287            sync_kv_log_store_read_size,
1288            sync_kv_log_store_write_pause_duration_ns,
1289            sync_kv_log_store_state,
1290            sync_kv_log_store_wait_next_poll_ns,
1291            sync_kv_log_store_storage_write_count,
1292            sync_kv_log_store_storage_write_size,
1293            sync_kv_log_store_buffer_unconsumed_item_count,
1294            sync_kv_log_store_buffer_unconsumed_row_count,
1295            sync_kv_log_store_buffer_unconsumed_epoch_count,
1296            sync_kv_log_store_buffer_unconsumed_min_epoch,
1297            lru_runtime_loop_count,
1298            lru_latest_sequence,
1299            lru_watermark_sequence,
1300            lru_eviction_policy,
1301            jemalloc_allocated_bytes,
1302            jemalloc_active_bytes,
1303            jemalloc_resident_bytes,
1304            jemalloc_metadata_bytes,
1305            jvm_allocated_bytes,
1306            jvm_active_bytes,
1307            stream_memory_usage,
1308            materialize_cache_hit_count,
1309            materialize_data_exist_count,
1310            materialize_cache_total_count,
1311            materialize_input_row_count,
1312            materialize_current_epoch,
1313            pg_cdc_state_table_lsn,
1314            pg_cdc_jni_commit_offset_lsn,
1315            mysql_cdc_state_binlog_file_seq,
1316            mysql_cdc_state_binlog_position,
1317            gap_fill_generated_rows_count,
1318        }
1319    }
1320
1321    /// Create a new `StreamingMetrics` instance used in tests or other places.
1322    pub fn unused() -> Self {
1323        global_streaming_metrics(MetricLevel::Disabled)
1324    }
1325
1326    pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1327        let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1328        let actor_scheduled_duration = self
1329            .actor_scheduled_duration
1330            .with_guarded_label_values(label_list);
1331        let actor_scheduled_cnt = self
1332            .actor_scheduled_cnt
1333            .with_guarded_label_values(label_list);
1334        let actor_poll_duration = self
1335            .actor_poll_duration
1336            .with_guarded_label_values(label_list);
1337        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1338        let actor_idle_duration = self
1339            .actor_idle_duration
1340            .with_guarded_label_values(label_list);
1341        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1342        ActorMetrics {
1343            actor_scheduled_duration,
1344            actor_scheduled_cnt,
1345            actor_poll_duration,
1346            actor_poll_cnt,
1347            actor_idle_duration,
1348            actor_idle_cnt,
1349        }
1350    }
1351
1352    pub(crate) fn new_actor_input_metrics(
1353        &self,
1354        actor_id: ActorId,
1355        fragment_id: FragmentId,
1356        upstream_fragment_id: FragmentId,
1357    ) -> ActorInputMetrics {
1358        let actor_id_str = actor_id.to_string();
1359        let fragment_id_str = fragment_id.to_string();
1360        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1361        ActorInputMetrics {
1362            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1363                &actor_id_str,
1364                &fragment_id_str,
1365                &upstream_fragment_id_str,
1366            ]),
1367            actor_input_buffer_blocking_duration_ns: self
1368                .actor_input_buffer_blocking_duration_ns
1369                .with_guarded_label_values(&[
1370                    &actor_id_str,
1371                    &fragment_id_str,
1372                    &upstream_fragment_id_str,
1373                ]),
1374        }
1375    }
1376
1377    pub fn new_sink_exec_metrics(
1378        &self,
1379        id: SinkId,
1380        actor_id: ActorId,
1381        fragment_id: FragmentId,
1382    ) -> SinkExecutorMetrics {
1383        let label_list: &[&str; 3] = &[
1384            &id.to_string(),
1385            &actor_id.to_string(),
1386            &fragment_id.to_string(),
1387        ];
1388        SinkExecutorMetrics {
1389            sink_input_row_count: self
1390                .sink_input_row_count
1391                .with_guarded_label_values(label_list),
1392            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1393            sink_chunk_buffer_size: self
1394                .sink_chunk_buffer_size
1395                .with_guarded_label_values(label_list),
1396        }
1397    }
1398
1399    pub fn new_group_top_n_metrics(
1400        &self,
1401        table_id: TableId,
1402        actor_id: ActorId,
1403        fragment_id: FragmentId,
1404    ) -> GroupTopNMetrics {
1405        let label_list: &[&str; 3] = &[
1406            &table_id.to_string(),
1407            &actor_id.to_string(),
1408            &fragment_id.to_string(),
1409        ];
1410
1411        GroupTopNMetrics {
1412            group_top_n_cache_miss_count: self
1413                .group_top_n_cache_miss_count
1414                .with_guarded_label_values(label_list),
1415            group_top_n_total_query_cache_count: self
1416                .group_top_n_total_query_cache_count
1417                .with_guarded_label_values(label_list),
1418            group_top_n_cached_entry_count: self
1419                .group_top_n_cached_entry_count
1420                .with_guarded_label_values(label_list),
1421        }
1422    }
1423
1424    pub fn new_append_only_group_top_n_metrics(
1425        &self,
1426        table_id: TableId,
1427        actor_id: ActorId,
1428        fragment_id: FragmentId,
1429    ) -> GroupTopNMetrics {
1430        let label_list: &[&str; 3] = &[
1431            &table_id.to_string(),
1432            &actor_id.to_string(),
1433            &fragment_id.to_string(),
1434        ];
1435
1436        GroupTopNMetrics {
1437            group_top_n_cache_miss_count: self
1438                .group_top_n_appendonly_cache_miss_count
1439                .with_guarded_label_values(label_list),
1440            group_top_n_total_query_cache_count: self
1441                .group_top_n_appendonly_total_query_cache_count
1442                .with_guarded_label_values(label_list),
1443            group_top_n_cached_entry_count: self
1444                .group_top_n_appendonly_cached_entry_count
1445                .with_guarded_label_values(label_list),
1446        }
1447    }
1448
1449    pub fn new_lookup_executor_metrics(
1450        &self,
1451        table_id: TableId,
1452        actor_id: ActorId,
1453        fragment_id: FragmentId,
1454    ) -> LookupExecutorMetrics {
1455        let label_list: &[&str; 3] = &[
1456            &table_id.to_string(),
1457            &actor_id.to_string(),
1458            &fragment_id.to_string(),
1459        ];
1460
1461        LookupExecutorMetrics {
1462            lookup_cache_miss_count: self
1463                .lookup_cache_miss_count
1464                .with_guarded_label_values(label_list),
1465            lookup_total_query_cache_count: self
1466                .lookup_total_query_cache_count
1467                .with_guarded_label_values(label_list),
1468            lookup_cached_entry_count: self
1469                .lookup_cached_entry_count
1470                .with_guarded_label_values(label_list),
1471        }
1472    }
1473
1474    pub fn new_hash_agg_metrics(
1475        &self,
1476        table_id: TableId,
1477        actor_id: ActorId,
1478        fragment_id: FragmentId,
1479    ) -> HashAggMetrics {
1480        let label_list: &[&str; 3] = &[
1481            &table_id.to_string(),
1482            &actor_id.to_string(),
1483            &fragment_id.to_string(),
1484        ];
1485        HashAggMetrics {
1486            agg_lookup_miss_count: self
1487                .agg_lookup_miss_count
1488                .with_guarded_label_values(label_list),
1489            agg_total_lookup_count: self
1490                .agg_total_lookup_count
1491                .with_guarded_label_values(label_list),
1492            agg_cached_entry_count: self
1493                .agg_cached_entry_count
1494                .with_guarded_label_values(label_list),
1495            agg_chunk_lookup_miss_count: self
1496                .agg_chunk_lookup_miss_count
1497                .with_guarded_label_values(label_list),
1498            agg_chunk_total_lookup_count: self
1499                .agg_chunk_total_lookup_count
1500                .with_guarded_label_values(label_list),
1501            agg_dirty_groups_count: self
1502                .agg_dirty_groups_count
1503                .with_guarded_label_values(label_list),
1504            agg_dirty_groups_heap_size: self
1505                .agg_dirty_groups_heap_size
1506                .with_guarded_label_values(label_list),
1507            agg_state_cache_lookup_count: self
1508                .agg_state_cache_lookup_count
1509                .with_guarded_label_values(label_list),
1510            agg_state_cache_miss_count: self
1511                .agg_state_cache_miss_count
1512                .with_guarded_label_values(label_list),
1513        }
1514    }
1515
1516    pub fn new_agg_distinct_dedup_metrics(
1517        &self,
1518        table_id: TableId,
1519        actor_id: ActorId,
1520        fragment_id: FragmentId,
1521    ) -> AggDistinctDedupMetrics {
1522        let label_list: &[&str; 3] = &[
1523            &table_id.to_string(),
1524            &actor_id.to_string(),
1525            &fragment_id.to_string(),
1526        ];
1527        AggDistinctDedupMetrics {
1528            agg_distinct_cache_miss_count: self
1529                .agg_distinct_cache_miss_count
1530                .with_guarded_label_values(label_list),
1531            agg_distinct_total_cache_count: self
1532                .agg_distinct_total_cache_count
1533                .with_guarded_label_values(label_list),
1534            agg_distinct_cached_entry_count: self
1535                .agg_distinct_cached_entry_count
1536                .with_guarded_label_values(label_list),
1537        }
1538    }
1539
1540    pub fn new_temporal_join_metrics(
1541        &self,
1542        table_id: TableId,
1543        actor_id: ActorId,
1544        fragment_id: FragmentId,
1545    ) -> TemporalJoinMetrics {
1546        let label_list: &[&str; 3] = &[
1547            &table_id.to_string(),
1548            &actor_id.to_string(),
1549            &fragment_id.to_string(),
1550        ];
1551        TemporalJoinMetrics {
1552            temporal_join_cache_miss_count: self
1553                .temporal_join_cache_miss_count
1554                .with_guarded_label_values(label_list),
1555            temporal_join_total_query_cache_count: self
1556                .temporal_join_total_query_cache_count
1557                .with_guarded_label_values(label_list),
1558            temporal_join_cached_entry_count: self
1559                .temporal_join_cached_entry_count
1560                .with_guarded_label_values(label_list),
1561        }
1562    }
1563
1564    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1565        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1566        BackfillMetrics {
1567            backfill_snapshot_read_row_count: self
1568                .backfill_snapshot_read_row_count
1569                .with_guarded_label_values(label_list),
1570            backfill_upstream_output_row_count: self
1571                .backfill_upstream_output_row_count
1572                .with_guarded_label_values(label_list),
1573        }
1574    }
1575
1576    pub fn new_cdc_backfill_metrics(
1577        &self,
1578        table_id: TableId,
1579        actor_id: ActorId,
1580    ) -> CdcBackfillMetrics {
1581        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1582        CdcBackfillMetrics {
1583            cdc_backfill_snapshot_read_row_count: self
1584                .cdc_backfill_snapshot_read_row_count
1585                .with_guarded_label_values(label_list),
1586            cdc_backfill_upstream_output_row_count: self
1587                .cdc_backfill_upstream_output_row_count
1588                .with_guarded_label_values(label_list),
1589        }
1590    }
1591
1592    pub fn new_over_window_metrics(
1593        &self,
1594        table_id: TableId,
1595        actor_id: ActorId,
1596        fragment_id: FragmentId,
1597    ) -> OverWindowMetrics {
1598        let label_list: &[&str; 3] = &[
1599            &table_id.to_string(),
1600            &actor_id.to_string(),
1601            &fragment_id.to_string(),
1602        ];
1603        OverWindowMetrics {
1604            over_window_cached_entry_count: self
1605                .over_window_cached_entry_count
1606                .with_guarded_label_values(label_list),
1607            over_window_cache_lookup_count: self
1608                .over_window_cache_lookup_count
1609                .with_guarded_label_values(label_list),
1610            over_window_cache_miss_count: self
1611                .over_window_cache_miss_count
1612                .with_guarded_label_values(label_list),
1613            over_window_range_cache_entry_count: self
1614                .over_window_range_cache_entry_count
1615                .with_guarded_label_values(label_list),
1616            over_window_range_cache_lookup_count: self
1617                .over_window_range_cache_lookup_count
1618                .with_guarded_label_values(label_list),
1619            over_window_range_cache_left_miss_count: self
1620                .over_window_range_cache_left_miss_count
1621                .with_guarded_label_values(label_list),
1622            over_window_range_cache_right_miss_count: self
1623                .over_window_range_cache_right_miss_count
1624                .with_guarded_label_values(label_list),
1625            over_window_accessed_entry_count: self
1626                .over_window_accessed_entry_count
1627                .with_guarded_label_values(label_list),
1628            over_window_compute_count: self
1629                .over_window_compute_count
1630                .with_guarded_label_values(label_list),
1631            over_window_same_output_count: self
1632                .over_window_same_output_count
1633                .with_guarded_label_values(label_list),
1634        }
1635    }
1636
1637    pub fn new_materialize_cache_metrics(
1638        &self,
1639        table_id: TableId,
1640        actor_id: ActorId,
1641        fragment_id: FragmentId,
1642    ) -> MaterializeCacheMetrics {
1643        let label_list: &[&str; 3] = &[
1644            &actor_id.to_string(),
1645            &table_id.to_string(),
1646            &fragment_id.to_string(),
1647        ];
1648        MaterializeCacheMetrics {
1649            materialize_cache_hit_count: self
1650                .materialize_cache_hit_count
1651                .with_guarded_label_values(label_list),
1652            materialize_data_exist_count: self
1653                .materialize_data_exist_count
1654                .with_guarded_label_values(label_list),
1655            materialize_cache_total_count: self
1656                .materialize_cache_total_count
1657                .with_guarded_label_values(label_list),
1658        }
1659    }
1660
1661    pub fn new_materialize_metrics(
1662        &self,
1663        table_id: TableId,
1664        actor_id: ActorId,
1665        fragment_id: FragmentId,
1666    ) -> MaterializeMetrics {
1667        let label_list: &[&str; 3] = &[
1668            &actor_id.to_string(),
1669            &table_id.to_string(),
1670            &fragment_id.to_string(),
1671        ];
1672        MaterializeMetrics {
1673            materialize_input_row_count: self
1674                .materialize_input_row_count
1675                .with_guarded_label_values(label_list),
1676            materialize_current_epoch: self
1677                .materialize_current_epoch
1678                .with_guarded_label_values(label_list),
1679        }
1680    }
1681}
1682
1683pub(crate) struct ActorInputMetrics {
1684    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1685    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1686}
1687
1688/// Tokio metrics for actors
1689pub struct ActorMetrics {
1690    pub actor_scheduled_duration: LabelGuardedIntCounter,
1691    pub actor_scheduled_cnt: LabelGuardedIntCounter,
1692    pub actor_poll_duration: LabelGuardedIntCounter,
1693    pub actor_poll_cnt: LabelGuardedIntCounter,
1694    pub actor_idle_duration: LabelGuardedIntCounter,
1695    pub actor_idle_cnt: LabelGuardedIntCounter,
1696}
1697
1698pub struct SinkExecutorMetrics {
1699    pub sink_input_row_count: LabelGuardedIntCounter,
1700    pub sink_input_bytes: LabelGuardedIntCounter,
1701    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1702}
1703
1704pub struct MaterializeCacheMetrics {
1705    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1706    pub materialize_data_exist_count: LabelGuardedIntCounter,
1707    pub materialize_cache_total_count: LabelGuardedIntCounter,
1708}
1709
1710pub struct MaterializeMetrics {
1711    pub materialize_input_row_count: LabelGuardedIntCounter,
1712    pub materialize_current_epoch: LabelGuardedIntGauge,
1713}
1714
1715pub struct GroupTopNMetrics {
1716    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1717    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1718    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1719}
1720
1721pub struct LookupExecutorMetrics {
1722    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1723    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1724    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1725}
1726
1727pub struct HashAggMetrics {
1728    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1729    pub agg_total_lookup_count: LabelGuardedIntCounter,
1730    pub agg_cached_entry_count: LabelGuardedIntGauge,
1731    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1732    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1733    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1734    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1735    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1736    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1737}
1738
1739pub struct AggDistinctDedupMetrics {
1740    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1741    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1742    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1743}
1744
1745pub struct TemporalJoinMetrics {
1746    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1747    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1748    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1749}
1750
1751pub struct BackfillMetrics {
1752    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1753    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1754}
1755
1756#[derive(Clone)]
1757pub struct CdcBackfillMetrics {
1758    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1759    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1760}
1761
1762pub struct OverWindowMetrics {
1763    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1764    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1765    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1766    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1767    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1768    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1769    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1770    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1771    pub over_window_compute_count: LabelGuardedIntCounter,
1772    pub over_window_same_output_count: LabelGuardedIntCounter,
1773}