risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec,
58    actor_scheduled_duration: LabelGuardedGaugeVec,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60    actor_fast_poll_duration: LabelGuardedGaugeVec,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62    actor_slow_poll_duration: LabelGuardedGaugeVec,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64    actor_poll_duration: LabelGuardedGaugeVec,
65    actor_poll_cnt: LabelGuardedIntGaugeVec,
66    actor_idle_duration: LabelGuardedGaugeVec,
67    actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec,
77    pub source_split_change_count: LabelGuardedIntCounterVec,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec,
82    sink_input_bytes: LabelGuardedIntCounterVec,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94    actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161    over_window_compute_count: LabelGuardedIntCounterVec,
162    over_window_same_output_count: LabelGuardedIntCounterVec,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217    // PostgreSQL CDC LSN monitoring
218    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220}
221
222pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
223
224pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
225    GLOBAL_STREAMING_METRICS
226        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
227        .clone()
228}
229
230impl StreamingMetrics {
231    fn new(registry: &Registry, level: MetricLevel) -> Self {
232        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
233            "stream_executor_row_count",
234            "Total number of rows that have been output from each executor",
235            &["actor_id", "fragment_id", "executor_identity"],
236            registry
237        )
238        .unwrap()
239        .relabel_debug_1(level);
240
241        let stream_node_output_row_count = CountMap::new();
242        let stream_node_output_blocking_duration_ns = CountMap::new();
243
244        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
245            "stream_source_output_rows_counts",
246            "Total number of rows that have been output from source",
247            &["source_id", "source_name", "actor_id", "fragment_id"],
248            registry
249        )
250        .unwrap();
251
252        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
253            "stream_source_split_change_event_count",
254            "Total number of split change events that have been operated by source",
255            &["source_id", "source_name", "actor_id", "fragment_id"],
256            registry
257        )
258        .unwrap();
259
260        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
261            "stream_source_backfill_rows_counts",
262            "Total number of rows that have been backfilled for source",
263            &["source_id", "source_name", "actor_id", "fragment_id"],
264            registry
265        )
266        .unwrap();
267
268        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
269            "stream_sink_input_row_count",
270            "Total number of rows streamed into sink executors",
271            &["sink_id", "actor_id", "fragment_id"],
272            registry
273        )
274        .unwrap();
275
276        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
277            "stream_sink_input_bytes",
278            "Total size of chunks streamed into sink executors",
279            &["sink_id", "actor_id", "fragment_id"],
280            registry
281        )
282        .unwrap();
283
284        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
285            "stream_mview_input_row_count",
286            "Total number of rows streamed into materialize executors",
287            &["actor_id", "table_id", "fragment_id"],
288            registry
289        )
290        .unwrap()
291        .relabel_debug_1(level);
292
293        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
294            "stream_mview_current_epoch",
295            "The current epoch of the materialized executor",
296            &["actor_id", "table_id", "fragment_id"],
297            registry
298        )
299        .unwrap()
300        .relabel_debug_1(level);
301
302        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
303            "stream_pg_cdc_state_table_lsn",
304            "Current LSN value stored in PostgreSQL CDC state table",
305            &["source_id"],
306            registry,
307        )
308        .unwrap();
309
310        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
311            "stream_pg_cdc_jni_commit_offset_lsn",
312            "LSN value when JNI commit offset is called for PostgreSQL CDC",
313            &["source_id"],
314            registry,
315        )
316        .unwrap();
317
318        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
319            "stream_sink_chunk_buffer_size",
320            "Total size of chunks buffered in a barrier",
321            &["sink_id", "actor_id", "fragment_id"],
322            registry
323        )
324        .unwrap();
325
326        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
327            "stream_actor_actor_execution_time",
328            "Total execution time (s) of an actor",
329            &["actor_id"],
330            registry
331        )
332        .unwrap();
333
334        let actor_output_buffer_blocking_duration_ns =
335            register_guarded_int_counter_vec_with_registry!(
336                "stream_actor_output_buffer_blocking_duration_ns",
337                "Total blocking duration (ns) of output buffer",
338                &["actor_id", "fragment_id", "downstream_fragment_id"],
339                registry
340            )
341            .unwrap()
342            // mask the first label `actor_id` if the level is less verbose than `Debug`
343            .relabel_debug_1(level);
344
345        let actor_input_buffer_blocking_duration_ns =
346            register_guarded_int_counter_vec_with_registry!(
347                "stream_actor_input_buffer_blocking_duration_ns",
348                "Total blocking duration (ns) of input buffer",
349                &["actor_id", "fragment_id", "upstream_fragment_id"],
350                registry
351            )
352            .unwrap();
353
354        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
355            "stream_exchange_frag_recv_size",
356            "Total size of messages that have been received from upstream Fragment",
357            &["up_fragment_id", "down_fragment_id"],
358            registry
359        )
360        .unwrap();
361
362        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
363            "stream_actor_fast_poll_duration",
364            "tokio's metrics",
365            &["actor_id"],
366            registry
367        )
368        .unwrap();
369
370        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
371            "stream_actor_fast_poll_cnt",
372            "tokio's metrics",
373            &["actor_id"],
374            registry
375        )
376        .unwrap();
377
378        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
379            "stream_actor_slow_poll_duration",
380            "tokio's metrics",
381            &["actor_id"],
382            registry
383        )
384        .unwrap();
385
386        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
387            "stream_actor_slow_poll_cnt",
388            "tokio's metrics",
389            &["actor_id"],
390            registry
391        )
392        .unwrap();
393
394        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
395            "stream_actor_poll_duration",
396            "tokio's metrics",
397            &["actor_id"],
398            registry
399        )
400        .unwrap();
401
402        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
403            "stream_actor_poll_cnt",
404            "tokio's metrics",
405            &["actor_id"],
406            registry
407        )
408        .unwrap();
409
410        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
411            "stream_actor_scheduled_duration",
412            "tokio's metrics",
413            &["actor_id"],
414            registry
415        )
416        .unwrap();
417
418        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
419            "stream_actor_scheduled_cnt",
420            "tokio's metrics",
421            &["actor_id"],
422            registry
423        )
424        .unwrap();
425
426        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
427            "stream_actor_idle_duration",
428            "tokio's metrics",
429            &["actor_id"],
430            registry
431        )
432        .unwrap();
433
434        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
435            "stream_actor_idle_cnt",
436            "tokio's metrics",
437            &["actor_id"],
438            registry
439        )
440        .unwrap();
441
442        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
443            "stream_actor_in_record_cnt",
444            "Total number of rows actor received",
445            &["actor_id", "fragment_id", "upstream_fragment_id"],
446            registry
447        )
448        .unwrap()
449        .relabel_debug_1(level);
450
451        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
452            "stream_actor_out_record_cnt",
453            "Total number of rows actor sent",
454            &["actor_id", "fragment_id"],
455            registry
456        )
457        .unwrap()
458        .relabel_debug_1(level);
459
460        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
461            "stream_actor_current_epoch",
462            "Current epoch of actor",
463            &["actor_id", "fragment_id"],
464            registry
465        )
466        .unwrap()
467        .relabel_debug_1(level);
468
469        let actor_count = register_guarded_int_gauge_vec_with_registry!(
470            "stream_actor_count",
471            "Total number of actors (parallelism)",
472            &["fragment_id"],
473            registry
474        )
475        .unwrap();
476
477        let opts = histogram_opts!(
478            "stream_merge_barrier_align_duration",
479            "Duration of merge align barrier",
480            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
481        );
482        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
483            opts,
484            &["actor_id", "fragment_id"],
485            registry
486        )
487        .unwrap()
488        .relabel_debug_1(level);
489
490        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
491            "stream_join_lookup_miss_count",
492            "Join executor lookup miss duration",
493            &["side", "join_table_id", "actor_id", "fragment_id"],
494            registry
495        )
496        .unwrap();
497
498        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
499            "stream_join_lookup_total_count",
500            "Join executor lookup total operation",
501            &["side", "join_table_id", "actor_id", "fragment_id"],
502            registry
503        )
504        .unwrap();
505
506        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
507            "stream_join_insert_cache_miss_count",
508            "Join executor cache miss when insert operation",
509            &["side", "join_table_id", "actor_id", "fragment_id"],
510            registry
511        )
512        .unwrap();
513
514        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
515            "stream_join_actor_input_waiting_duration_ns",
516            "Total waiting duration (ns) of input buffer of join actor",
517            &["actor_id", "fragment_id"],
518            registry
519        )
520        .unwrap();
521
522        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
523            "stream_join_match_duration_ns",
524            "Matching duration for each side",
525            &["actor_id", "fragment_id", "side"],
526            registry
527        )
528        .unwrap();
529
530        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
531            "stream_barrier_align_duration_ns",
532            "Duration of join align barrier",
533            &["actor_id", "fragment_id", "wait_side", "executor"],
534            registry
535        )
536        .unwrap()
537        .relabel_debug_1(level);
538
539        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
540            "stream_join_cached_entry_count",
541            "Number of cached entries in streaming join operators",
542            &["actor_id", "fragment_id", "side"],
543            registry
544        )
545        .unwrap();
546
547        let join_matched_join_keys_opts = histogram_opts!(
548            "stream_join_matched_join_keys",
549            "The number of keys matched in the opposite side",
550            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
551        );
552
553        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
554            join_matched_join_keys_opts,
555            &["actor_id", "fragment_id", "table_id"],
556            registry
557        )
558        .unwrap()
559        .relabel_debug_1(level);
560
561        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
562            "stream_agg_lookup_miss_count",
563            "Aggregation executor lookup miss duration",
564            &["table_id", "actor_id", "fragment_id"],
565            registry
566        )
567        .unwrap();
568
569        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
570            "stream_agg_lookup_total_count",
571            "Aggregation executor lookup total operation",
572            &["table_id", "actor_id", "fragment_id"],
573            registry
574        )
575        .unwrap();
576
577        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
578            "stream_agg_distinct_cache_miss_count",
579            "Aggregation executor dinsinct miss duration",
580            &["table_id", "actor_id", "fragment_id"],
581            registry
582        )
583        .unwrap();
584
585        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
586            "stream_agg_distinct_total_cache_count",
587            "Aggregation executor distinct total operation",
588            &["table_id", "actor_id", "fragment_id"],
589            registry
590        )
591        .unwrap();
592
593        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
594            "stream_agg_distinct_cached_entry_count",
595            "Total entry counts in distinct aggregation executor cache",
596            &["table_id", "actor_id", "fragment_id"],
597            registry
598        )
599        .unwrap();
600
601        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
602            "stream_agg_dirty_groups_count",
603            "Total dirty group counts in aggregation executor",
604            &["table_id", "actor_id", "fragment_id"],
605            registry
606        )
607        .unwrap();
608
609        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
610            "stream_agg_dirty_groups_heap_size",
611            "Total dirty group heap size in aggregation executor",
612            &["table_id", "actor_id", "fragment_id"],
613            registry
614        )
615        .unwrap();
616
617        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
618            "stream_agg_state_cache_lookup_count",
619            "Aggregation executor state cache lookup count",
620            &["table_id", "actor_id", "fragment_id"],
621            registry
622        )
623        .unwrap();
624
625        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
626            "stream_agg_state_cache_miss_count",
627            "Aggregation executor state cache miss count",
628            &["table_id", "actor_id", "fragment_id"],
629            registry
630        )
631        .unwrap();
632
633        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
634            "stream_group_top_n_cache_miss_count",
635            "Group top n executor cache miss count",
636            &["table_id", "actor_id", "fragment_id"],
637            registry
638        )
639        .unwrap();
640
641        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
642            "stream_group_top_n_total_query_cache_count",
643            "Group top n executor query cache total count",
644            &["table_id", "actor_id", "fragment_id"],
645            registry
646        )
647        .unwrap();
648
649        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
650            "stream_group_top_n_cached_entry_count",
651            "Total entry counts in group top n executor cache",
652            &["table_id", "actor_id", "fragment_id"],
653            registry
654        )
655        .unwrap();
656
657        let group_top_n_appendonly_cache_miss_count =
658            register_guarded_int_counter_vec_with_registry!(
659                "stream_group_top_n_appendonly_cache_miss_count",
660                "Group top n appendonly executor cache miss count",
661                &["table_id", "actor_id", "fragment_id"],
662                registry
663            )
664            .unwrap();
665
666        let group_top_n_appendonly_total_query_cache_count =
667            register_guarded_int_counter_vec_with_registry!(
668                "stream_group_top_n_appendonly_total_query_cache_count",
669                "Group top n appendonly executor total cache count",
670                &["table_id", "actor_id", "fragment_id"],
671                registry
672            )
673            .unwrap();
674
675        let group_top_n_appendonly_cached_entry_count =
676            register_guarded_int_gauge_vec_with_registry!(
677                "stream_group_top_n_appendonly_cached_entry_count",
678                "Total entry counts in group top n appendonly executor cache",
679                &["table_id", "actor_id", "fragment_id"],
680                registry
681            )
682            .unwrap();
683
684        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
685            "stream_lookup_cache_miss_count",
686            "Lookup executor cache miss count",
687            &["table_id", "actor_id", "fragment_id"],
688            registry
689        )
690        .unwrap();
691
692        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
693            "stream_lookup_total_query_cache_count",
694            "Lookup executor query cache total count",
695            &["table_id", "actor_id", "fragment_id"],
696            registry
697        )
698        .unwrap();
699
700        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
701            "stream_lookup_cached_entry_count",
702            "Total entry counts in lookup executor cache",
703            &["table_id", "actor_id", "fragment_id"],
704            registry
705        )
706        .unwrap();
707
708        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
709            "stream_temporal_join_cache_miss_count",
710            "Temporal join executor cache miss count",
711            &["table_id", "actor_id", "fragment_id"],
712            registry
713        )
714        .unwrap();
715
716        let temporal_join_total_query_cache_count =
717            register_guarded_int_counter_vec_with_registry!(
718                "stream_temporal_join_total_query_cache_count",
719                "Temporal join executor query cache total count",
720                &["table_id", "actor_id", "fragment_id"],
721                registry
722            )
723            .unwrap();
724
725        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726            "stream_temporal_join_cached_entry_count",
727            "Total entry count in temporal join executor cache",
728            &["table_id", "actor_id", "fragment_id"],
729            registry
730        )
731        .unwrap();
732
733        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
734            "stream_agg_cached_entry_count",
735            "Number of cached keys in streaming aggregation operators",
736            &["table_id", "actor_id", "fragment_id"],
737            registry
738        )
739        .unwrap();
740
741        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
742            "stream_agg_chunk_lookup_miss_count",
743            "Aggregation executor chunk-level lookup miss duration",
744            &["table_id", "actor_id", "fragment_id"],
745            registry
746        )
747        .unwrap();
748
749        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
750            "stream_agg_chunk_lookup_total_count",
751            "Aggregation executor chunk-level lookup total operation",
752            &["table_id", "actor_id", "fragment_id"],
753            registry
754        )
755        .unwrap();
756
757        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
758            "stream_backfill_snapshot_read_row_count",
759            "Total number of rows that have been read from the backfill snapshot",
760            &["table_id", "actor_id"],
761            registry
762        )
763        .unwrap();
764
765        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
766            "stream_backfill_upstream_output_row_count",
767            "Total number of rows that have been output from the backfill upstream",
768            &["table_id", "actor_id"],
769            registry
770        )
771        .unwrap();
772
773        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
774            "stream_cdc_backfill_snapshot_read_row_count",
775            "Total number of rows that have been read from the cdc_backfill snapshot",
776            &["table_id", "actor_id"],
777            registry
778        )
779        .unwrap();
780
781        let cdc_backfill_upstream_output_row_count =
782            register_guarded_int_counter_vec_with_registry!(
783                "stream_cdc_backfill_upstream_output_row_count",
784                "Total number of rows that have been output from the cdc_backfill upstream",
785                &["table_id", "actor_id"],
786                registry
787            )
788            .unwrap();
789
790        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
791            "stream_snapshot_backfill_consume_snapshot_row_count",
792            "Total number of rows that have been output from snapshot backfill",
793            &["table_id", "actor_id", "stage"],
794            registry
795        )
796        .unwrap();
797
798        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
799            "stream_over_window_cached_entry_count",
800            "Total entry (partition) count in over window executor cache",
801            &["table_id", "actor_id", "fragment_id"],
802            registry
803        )
804        .unwrap();
805
806        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
807            "stream_over_window_cache_lookup_count",
808            "Over window executor cache lookup count",
809            &["table_id", "actor_id", "fragment_id"],
810            registry
811        )
812        .unwrap();
813
814        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
815            "stream_over_window_cache_miss_count",
816            "Over window executor cache miss count",
817            &["table_id", "actor_id", "fragment_id"],
818            registry
819        )
820        .unwrap();
821
822        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
823            "stream_over_window_range_cache_entry_count",
824            "Over window partition range cache entry count",
825            &["table_id", "actor_id", "fragment_id"],
826            registry,
827        )
828        .unwrap();
829
830        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
831            "stream_over_window_range_cache_lookup_count",
832            "Over window partition range cache lookup count",
833            &["table_id", "actor_id", "fragment_id"],
834            registry
835        )
836        .unwrap();
837
838        let over_window_range_cache_left_miss_count =
839            register_guarded_int_counter_vec_with_registry!(
840                "stream_over_window_range_cache_left_miss_count",
841                "Over window partition range cache left miss count",
842                &["table_id", "actor_id", "fragment_id"],
843                registry
844            )
845            .unwrap();
846
847        let over_window_range_cache_right_miss_count =
848            register_guarded_int_counter_vec_with_registry!(
849                "stream_over_window_range_cache_right_miss_count",
850                "Over window partition range cache right miss count",
851                &["table_id", "actor_id", "fragment_id"],
852                registry
853            )
854            .unwrap();
855
856        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
857            "stream_over_window_accessed_entry_count",
858            "Over window accessed entry count",
859            &["table_id", "actor_id", "fragment_id"],
860            registry
861        )
862        .unwrap();
863
864        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
865            "stream_over_window_compute_count",
866            "Over window compute count",
867            &["table_id", "actor_id", "fragment_id"],
868            registry
869        )
870        .unwrap();
871
872        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
873            "stream_over_window_same_output_count",
874            "Over window same output count",
875            &["table_id", "actor_id", "fragment_id"],
876            registry
877        )
878        .unwrap();
879
880        let opts = histogram_opts!(
881            "stream_barrier_inflight_duration_seconds",
882            "barrier_inflight_latency",
883            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
884        );
885        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
886
887        let opts = histogram_opts!(
888            "stream_barrier_sync_storage_duration_seconds",
889            "barrier_sync_latency",
890            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
891        );
892        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
893
894        let opts = histogram_opts!(
895            "stream_barrier_batch_size",
896            "barrier_batch_size",
897            exponential_buckets(1.0, 2.0, 8).unwrap()
898        );
899        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
900
901        let barrier_manager_progress = register_int_counter_with_registry!(
902            "stream_barrier_manager_progress",
903            "The number of actors that have processed the earliest in-flight barriers",
904            registry
905        )
906        .unwrap();
907
908        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
909            "sync_kv_log_store_wait_next_poll_ns",
910            "Total duration (ns) of waiting for next poll",
911            &["actor_id", "target", "fragment_id", "relation"],
912            registry
913        )
914        .unwrap();
915
916        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
917            "sync_kv_log_store_read_count",
918            "read row count throughput of sync_kv log store",
919            &["type", "actor_id", "target", "fragment_id", "relation"],
920            registry
921        )
922        .unwrap();
923
924        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
925            "sync_kv_log_store_read_size",
926            "read size throughput of sync_kv log store",
927            &["type", "actor_id", "target", "fragment_id", "relation"],
928            registry
929        )
930        .unwrap();
931
932        let sync_kv_log_store_write_pause_duration_ns =
933            register_guarded_int_counter_vec_with_registry!(
934                "sync_kv_log_store_write_pause_duration_ns",
935                "Duration (ns) of sync_kv log store write pause",
936                &["actor_id", "target", "fragment_id", "relation"],
937                registry
938            )
939            .unwrap();
940
941        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
942            "sync_kv_log_store_state",
943            "clean/unclean state transition for sync_kv log store",
944            &["state", "actor_id", "target", "fragment_id", "relation"],
945            registry
946        )
947        .unwrap();
948
949        let sync_kv_log_store_storage_write_count =
950            register_guarded_int_counter_vec_with_registry!(
951                "sync_kv_log_store_storage_write_count",
952                "Write row count throughput of sync_kv log store",
953                &["actor_id", "target", "fragment_id", "relation"],
954                registry
955            )
956            .unwrap();
957
958        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
959            "sync_kv_log_store_storage_write_size",
960            "Write size throughput of sync_kv log store",
961            &["actor_id", "target", "fragment_id", "relation"],
962            registry
963        )
964        .unwrap();
965
966        let sync_kv_log_store_buffer_unconsumed_item_count =
967            register_guarded_int_gauge_vec_with_registry!(
968                "sync_kv_log_store_buffer_unconsumed_item_count",
969                "Number of Unconsumed Item in buffer",
970                &["actor_id", "target", "fragment_id", "relation"],
971                registry
972            )
973            .unwrap();
974
975        let sync_kv_log_store_buffer_unconsumed_row_count =
976            register_guarded_int_gauge_vec_with_registry!(
977                "sync_kv_log_store_buffer_unconsumed_row_count",
978                "Number of Unconsumed Row in buffer",
979                &["actor_id", "target", "fragment_id", "relation"],
980                registry
981            )
982            .unwrap();
983
984        let sync_kv_log_store_buffer_unconsumed_epoch_count =
985            register_guarded_int_gauge_vec_with_registry!(
986                "sync_kv_log_store_buffer_unconsumed_epoch_count",
987                "Number of Unconsumed Epoch in buffer",
988                &["actor_id", "target", "fragment_id", "relation"],
989                registry
990            )
991            .unwrap();
992
993        let sync_kv_log_store_buffer_unconsumed_min_epoch =
994            register_guarded_int_gauge_vec_with_registry!(
995                "sync_kv_log_store_buffer_unconsumed_min_epoch",
996                "Number of Unconsumed Epoch in buffer",
997                &["actor_id", "target", "fragment_id", "relation"],
998                registry
999            )
1000            .unwrap();
1001
1002        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1003            "kv_log_store_storage_write_count",
1004            "Write row count throughput of kv log store",
1005            &["actor_id", "connector", "sink_id", "sink_name"],
1006            registry
1007        )
1008        .unwrap();
1009
1010        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1011            "kv_log_store_storage_write_size",
1012            "Write size throughput of kv log store",
1013            &["actor_id", "connector", "sink_id", "sink_name"],
1014            registry
1015        )
1016        .unwrap();
1017
1018        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1019            "kv_log_store_storage_read_count",
1020            "Write row count throughput of kv log store",
1021            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1022            registry
1023        )
1024        .unwrap();
1025
1026        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1027            "kv_log_store_storage_read_size",
1028            "Write size throughput of kv log store",
1029            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1030            registry
1031        )
1032        .unwrap();
1033
1034        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1035            "kv_log_store_rewind_count",
1036            "Kv log store rewind rate",
1037            &["actor_id", "connector", "sink_id", "sink_name"],
1038            registry
1039        )
1040        .unwrap();
1041
1042        let kv_log_store_rewind_delay_opts = {
1043            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1044            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1045                - REWIND_BASE_DELAY.as_secs_f64().log2())
1046            .ceil() as usize;
1047            let buckets = exponential_buckets(
1048                REWIND_BASE_DELAY.as_secs_f64(),
1049                REWIND_BACKOFF_FACTOR as _,
1050                bucket_count,
1051            )
1052            .unwrap();
1053            histogram_opts!(
1054                "kv_log_store_rewind_delay",
1055                "Kv log store rewind delay",
1056                buckets,
1057            )
1058        };
1059
1060        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1061            kv_log_store_rewind_delay_opts,
1062            &["actor_id", "connector", "sink_id", "sink_name"],
1063            registry
1064        )
1065        .unwrap();
1066
1067        let kv_log_store_buffer_unconsumed_item_count =
1068            register_guarded_int_gauge_vec_with_registry!(
1069                "kv_log_store_buffer_unconsumed_item_count",
1070                "Number of Unconsumed Item in buffer",
1071                &["actor_id", "connector", "sink_id", "sink_name"],
1072                registry
1073            )
1074            .unwrap();
1075
1076        let kv_log_store_buffer_unconsumed_row_count =
1077            register_guarded_int_gauge_vec_with_registry!(
1078                "kv_log_store_buffer_unconsumed_row_count",
1079                "Number of Unconsumed Row in buffer",
1080                &["actor_id", "connector", "sink_id", "sink_name"],
1081                registry
1082            )
1083            .unwrap();
1084
1085        let kv_log_store_buffer_unconsumed_epoch_count =
1086            register_guarded_int_gauge_vec_with_registry!(
1087                "kv_log_store_buffer_unconsumed_epoch_count",
1088                "Number of Unconsumed Epoch in buffer",
1089                &["actor_id", "connector", "sink_id", "sink_name"],
1090                registry
1091            )
1092            .unwrap();
1093
1094        let kv_log_store_buffer_unconsumed_min_epoch =
1095            register_guarded_int_gauge_vec_with_registry!(
1096                "kv_log_store_buffer_unconsumed_min_epoch",
1097                "Number of Unconsumed Epoch in buffer",
1098                &["actor_id", "connector", "sink_id", "sink_name"],
1099                registry
1100            )
1101            .unwrap();
1102
1103        let lru_runtime_loop_count = register_int_counter_with_registry!(
1104            "lru_runtime_loop_count",
1105            "The counts of the eviction loop in LRU manager per second",
1106            registry
1107        )
1108        .unwrap();
1109
1110        let lru_latest_sequence = register_int_gauge_with_registry!(
1111            "lru_latest_sequence",
1112            "Current LRU global sequence",
1113            registry,
1114        )
1115        .unwrap();
1116
1117        let lru_watermark_sequence = register_int_gauge_with_registry!(
1118            "lru_watermark_sequence",
1119            "Current LRU watermark sequence",
1120            registry,
1121        )
1122        .unwrap();
1123
1124        let lru_eviction_policy = register_int_gauge_with_registry!(
1125            "lru_eviction_policy",
1126            "Current LRU eviction policy",
1127            registry,
1128        )
1129        .unwrap();
1130
1131        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1132            "jemalloc_allocated_bytes",
1133            "The allocated memory jemalloc, got from jemalloc_ctl",
1134            registry
1135        )
1136        .unwrap();
1137
1138        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1139            "jemalloc_active_bytes",
1140            "The active memory jemalloc, got from jemalloc_ctl",
1141            registry
1142        )
1143        .unwrap();
1144
1145        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1146            "jemalloc_resident_bytes",
1147            "The active memory jemalloc, got from jemalloc_ctl",
1148            registry
1149        )
1150        .unwrap();
1151
1152        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1153            "jemalloc_metadata_bytes",
1154            "The active memory jemalloc, got from jemalloc_ctl",
1155            registry
1156        )
1157        .unwrap();
1158
1159        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1160            "jvm_allocated_bytes",
1161            "The allocated jvm memory",
1162            registry
1163        )
1164        .unwrap();
1165
1166        let jvm_active_bytes = register_int_gauge_with_registry!(
1167            "jvm_active_bytes",
1168            "The active jvm memory",
1169            registry
1170        )
1171        .unwrap();
1172
1173        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1174            "stream_materialize_cache_hit_count",
1175            "Materialize executor cache hit count",
1176            &["actor_id", "table_id", "fragment_id"],
1177            registry
1178        )
1179        .unwrap()
1180        .relabel_debug_1(level);
1181
1182        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1183            "stream_materialize_data_exist_count",
1184            "Materialize executor data exist count",
1185            &["actor_id", "table_id", "fragment_id"],
1186            registry
1187        )
1188        .unwrap()
1189        .relabel_debug_1(level);
1190
1191        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1192            "stream_materialize_cache_total_count",
1193            "Materialize executor cache total operation",
1194            &["actor_id", "table_id", "fragment_id"],
1195            registry
1196        )
1197        .unwrap()
1198        .relabel_debug_1(level);
1199
1200        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1201            "stream_memory_usage",
1202            "Memory usage for stream executors",
1203            &["actor_id", "table_id", "desc"],
1204            registry
1205        )
1206        .unwrap()
1207        .relabel_debug_1(level);
1208
1209        Self {
1210            level,
1211            executor_row_count,
1212            mem_stream_node_output_row_count: stream_node_output_row_count,
1213            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1214            actor_execution_time,
1215            actor_scheduled_duration,
1216            actor_scheduled_cnt,
1217            actor_fast_poll_duration,
1218            actor_fast_poll_cnt,
1219            actor_slow_poll_duration,
1220            actor_slow_poll_cnt,
1221            actor_poll_duration,
1222            actor_poll_cnt,
1223            actor_idle_duration,
1224            actor_idle_cnt,
1225            actor_count,
1226            actor_in_record_cnt,
1227            actor_out_record_cnt,
1228            actor_current_epoch,
1229            source_output_row_count,
1230            source_split_change_count,
1231            source_backfill_row_count,
1232            sink_input_row_count,
1233            sink_input_bytes,
1234            sink_chunk_buffer_size,
1235            exchange_frag_recv_size,
1236            merge_barrier_align_duration,
1237            actor_output_buffer_blocking_duration_ns,
1238            actor_input_buffer_blocking_duration_ns,
1239            join_lookup_miss_count,
1240            join_lookup_total_count,
1241            join_insert_cache_miss_count,
1242            join_actor_input_waiting_duration_ns,
1243            join_match_duration_ns,
1244            join_cached_entry_count,
1245            join_matched_join_keys,
1246            barrier_align_duration,
1247            agg_lookup_miss_count,
1248            agg_total_lookup_count,
1249            agg_cached_entry_count,
1250            agg_chunk_lookup_miss_count,
1251            agg_chunk_total_lookup_count,
1252            agg_dirty_groups_count,
1253            agg_dirty_groups_heap_size,
1254            agg_distinct_cache_miss_count,
1255            agg_distinct_total_cache_count,
1256            agg_distinct_cached_entry_count,
1257            agg_state_cache_lookup_count,
1258            agg_state_cache_miss_count,
1259            group_top_n_cache_miss_count,
1260            group_top_n_total_query_cache_count,
1261            group_top_n_cached_entry_count,
1262            group_top_n_appendonly_cache_miss_count,
1263            group_top_n_appendonly_total_query_cache_count,
1264            group_top_n_appendonly_cached_entry_count,
1265            lookup_cache_miss_count,
1266            lookup_total_query_cache_count,
1267            lookup_cached_entry_count,
1268            temporal_join_cache_miss_count,
1269            temporal_join_total_query_cache_count,
1270            temporal_join_cached_entry_count,
1271            backfill_snapshot_read_row_count,
1272            backfill_upstream_output_row_count,
1273            cdc_backfill_snapshot_read_row_count,
1274            cdc_backfill_upstream_output_row_count,
1275            snapshot_backfill_consume_row_count,
1276            over_window_cached_entry_count,
1277            over_window_cache_lookup_count,
1278            over_window_cache_miss_count,
1279            over_window_range_cache_entry_count,
1280            over_window_range_cache_lookup_count,
1281            over_window_range_cache_left_miss_count,
1282            over_window_range_cache_right_miss_count,
1283            over_window_accessed_entry_count,
1284            over_window_compute_count,
1285            over_window_same_output_count,
1286            barrier_inflight_latency,
1287            barrier_sync_latency,
1288            barrier_batch_size,
1289            barrier_manager_progress,
1290            kv_log_store_storage_write_count,
1291            kv_log_store_storage_write_size,
1292            kv_log_store_rewind_count,
1293            kv_log_store_rewind_delay,
1294            kv_log_store_storage_read_count,
1295            kv_log_store_storage_read_size,
1296            kv_log_store_buffer_unconsumed_item_count,
1297            kv_log_store_buffer_unconsumed_row_count,
1298            kv_log_store_buffer_unconsumed_epoch_count,
1299            kv_log_store_buffer_unconsumed_min_epoch,
1300            sync_kv_log_store_read_count,
1301            sync_kv_log_store_read_size,
1302            sync_kv_log_store_write_pause_duration_ns,
1303            sync_kv_log_store_state,
1304            sync_kv_log_store_wait_next_poll_ns,
1305            sync_kv_log_store_storage_write_count,
1306            sync_kv_log_store_storage_write_size,
1307            sync_kv_log_store_buffer_unconsumed_item_count,
1308            sync_kv_log_store_buffer_unconsumed_row_count,
1309            sync_kv_log_store_buffer_unconsumed_epoch_count,
1310            sync_kv_log_store_buffer_unconsumed_min_epoch,
1311            lru_runtime_loop_count,
1312            lru_latest_sequence,
1313            lru_watermark_sequence,
1314            lru_eviction_policy,
1315            jemalloc_allocated_bytes,
1316            jemalloc_active_bytes,
1317            jemalloc_resident_bytes,
1318            jemalloc_metadata_bytes,
1319            jvm_allocated_bytes,
1320            jvm_active_bytes,
1321            stream_memory_usage,
1322            materialize_cache_hit_count,
1323            materialize_data_exist_count,
1324            materialize_cache_total_count,
1325            materialize_input_row_count,
1326            materialize_current_epoch,
1327            pg_cdc_state_table_lsn,
1328            pg_cdc_jni_commit_offset_lsn,
1329        }
1330    }
1331
1332    /// Create a new `StreamingMetrics` instance used in tests or other places.
1333    pub fn unused() -> Self {
1334        global_streaming_metrics(MetricLevel::Disabled)
1335    }
1336
1337    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1338        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1339        let actor_execution_time = self
1340            .actor_execution_time
1341            .with_guarded_label_values(label_list);
1342        let actor_scheduled_duration = self
1343            .actor_scheduled_duration
1344            .with_guarded_label_values(label_list);
1345        let actor_scheduled_cnt = self
1346            .actor_scheduled_cnt
1347            .with_guarded_label_values(label_list);
1348        let actor_fast_poll_duration = self
1349            .actor_fast_poll_duration
1350            .with_guarded_label_values(label_list);
1351        let actor_fast_poll_cnt = self
1352            .actor_fast_poll_cnt
1353            .with_guarded_label_values(label_list);
1354        let actor_slow_poll_duration = self
1355            .actor_slow_poll_duration
1356            .with_guarded_label_values(label_list);
1357        let actor_slow_poll_cnt = self
1358            .actor_slow_poll_cnt
1359            .with_guarded_label_values(label_list);
1360        let actor_poll_duration = self
1361            .actor_poll_duration
1362            .with_guarded_label_values(label_list);
1363        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1364        let actor_idle_duration = self
1365            .actor_idle_duration
1366            .with_guarded_label_values(label_list);
1367        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1368        ActorMetrics {
1369            actor_execution_time,
1370            actor_scheduled_duration,
1371            actor_scheduled_cnt,
1372            actor_fast_poll_duration,
1373            actor_fast_poll_cnt,
1374            actor_slow_poll_duration,
1375            actor_slow_poll_cnt,
1376            actor_poll_duration,
1377            actor_poll_cnt,
1378            actor_idle_duration,
1379            actor_idle_cnt,
1380        }
1381    }
1382
1383    pub(crate) fn new_actor_input_metrics(
1384        &self,
1385        actor_id: ActorId,
1386        fragment_id: FragmentId,
1387        upstream_fragment_id: FragmentId,
1388    ) -> ActorInputMetrics {
1389        let actor_id_str = actor_id.to_string();
1390        let fragment_id_str = fragment_id.to_string();
1391        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1392        ActorInputMetrics {
1393            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1394                &actor_id_str,
1395                &fragment_id_str,
1396                &upstream_fragment_id_str,
1397            ]),
1398            actor_input_buffer_blocking_duration_ns: self
1399                .actor_input_buffer_blocking_duration_ns
1400                .with_guarded_label_values(&[
1401                    &actor_id_str,
1402                    &fragment_id_str,
1403                    &upstream_fragment_id_str,
1404                ]),
1405        }
1406    }
1407
1408    pub fn new_sink_exec_metrics(
1409        &self,
1410        id: SinkId,
1411        actor_id: ActorId,
1412        fragment_id: FragmentId,
1413    ) -> SinkExecutorMetrics {
1414        let label_list: &[&str; 3] = &[
1415            &id.to_string(),
1416            &actor_id.to_string(),
1417            &fragment_id.to_string(),
1418        ];
1419        SinkExecutorMetrics {
1420            sink_input_row_count: self
1421                .sink_input_row_count
1422                .with_guarded_label_values(label_list),
1423            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1424            sink_chunk_buffer_size: self
1425                .sink_chunk_buffer_size
1426                .with_guarded_label_values(label_list),
1427        }
1428    }
1429
1430    pub fn new_group_top_n_metrics(
1431        &self,
1432        table_id: u32,
1433        actor_id: ActorId,
1434        fragment_id: FragmentId,
1435    ) -> GroupTopNMetrics {
1436        let label_list: &[&str; 3] = &[
1437            &table_id.to_string(),
1438            &actor_id.to_string(),
1439            &fragment_id.to_string(),
1440        ];
1441
1442        GroupTopNMetrics {
1443            group_top_n_cache_miss_count: self
1444                .group_top_n_cache_miss_count
1445                .with_guarded_label_values(label_list),
1446            group_top_n_total_query_cache_count: self
1447                .group_top_n_total_query_cache_count
1448                .with_guarded_label_values(label_list),
1449            group_top_n_cached_entry_count: self
1450                .group_top_n_cached_entry_count
1451                .with_guarded_label_values(label_list),
1452        }
1453    }
1454
1455    pub fn new_append_only_group_top_n_metrics(
1456        &self,
1457        table_id: u32,
1458        actor_id: ActorId,
1459        fragment_id: FragmentId,
1460    ) -> GroupTopNMetrics {
1461        let label_list: &[&str; 3] = &[
1462            &table_id.to_string(),
1463            &actor_id.to_string(),
1464            &fragment_id.to_string(),
1465        ];
1466
1467        GroupTopNMetrics {
1468            group_top_n_cache_miss_count: self
1469                .group_top_n_appendonly_cache_miss_count
1470                .with_guarded_label_values(label_list),
1471            group_top_n_total_query_cache_count: self
1472                .group_top_n_appendonly_total_query_cache_count
1473                .with_guarded_label_values(label_list),
1474            group_top_n_cached_entry_count: self
1475                .group_top_n_appendonly_cached_entry_count
1476                .with_guarded_label_values(label_list),
1477        }
1478    }
1479
1480    pub fn new_lookup_executor_metrics(
1481        &self,
1482        table_id: TableId,
1483        actor_id: ActorId,
1484        fragment_id: FragmentId,
1485    ) -> LookupExecutorMetrics {
1486        let label_list: &[&str; 3] = &[
1487            &table_id.to_string(),
1488            &actor_id.to_string(),
1489            &fragment_id.to_string(),
1490        ];
1491
1492        LookupExecutorMetrics {
1493            lookup_cache_miss_count: self
1494                .lookup_cache_miss_count
1495                .with_guarded_label_values(label_list),
1496            lookup_total_query_cache_count: self
1497                .lookup_total_query_cache_count
1498                .with_guarded_label_values(label_list),
1499            lookup_cached_entry_count: self
1500                .lookup_cached_entry_count
1501                .with_guarded_label_values(label_list),
1502        }
1503    }
1504
1505    pub fn new_hash_agg_metrics(
1506        &self,
1507        table_id: u32,
1508        actor_id: ActorId,
1509        fragment_id: FragmentId,
1510    ) -> HashAggMetrics {
1511        let label_list: &[&str; 3] = &[
1512            &table_id.to_string(),
1513            &actor_id.to_string(),
1514            &fragment_id.to_string(),
1515        ];
1516        HashAggMetrics {
1517            agg_lookup_miss_count: self
1518                .agg_lookup_miss_count
1519                .with_guarded_label_values(label_list),
1520            agg_total_lookup_count: self
1521                .agg_total_lookup_count
1522                .with_guarded_label_values(label_list),
1523            agg_cached_entry_count: self
1524                .agg_cached_entry_count
1525                .with_guarded_label_values(label_list),
1526            agg_chunk_lookup_miss_count: self
1527                .agg_chunk_lookup_miss_count
1528                .with_guarded_label_values(label_list),
1529            agg_chunk_total_lookup_count: self
1530                .agg_chunk_total_lookup_count
1531                .with_guarded_label_values(label_list),
1532            agg_dirty_groups_count: self
1533                .agg_dirty_groups_count
1534                .with_guarded_label_values(label_list),
1535            agg_dirty_groups_heap_size: self
1536                .agg_dirty_groups_heap_size
1537                .with_guarded_label_values(label_list),
1538            agg_state_cache_lookup_count: self
1539                .agg_state_cache_lookup_count
1540                .with_guarded_label_values(label_list),
1541            agg_state_cache_miss_count: self
1542                .agg_state_cache_miss_count
1543                .with_guarded_label_values(label_list),
1544        }
1545    }
1546
1547    pub fn new_agg_distinct_dedup_metrics(
1548        &self,
1549        table_id: u32,
1550        actor_id: ActorId,
1551        fragment_id: FragmentId,
1552    ) -> AggDistinctDedupMetrics {
1553        let label_list: &[&str; 3] = &[
1554            &table_id.to_string(),
1555            &actor_id.to_string(),
1556            &fragment_id.to_string(),
1557        ];
1558        AggDistinctDedupMetrics {
1559            agg_distinct_cache_miss_count: self
1560                .agg_distinct_cache_miss_count
1561                .with_guarded_label_values(label_list),
1562            agg_distinct_total_cache_count: self
1563                .agg_distinct_total_cache_count
1564                .with_guarded_label_values(label_list),
1565            agg_distinct_cached_entry_count: self
1566                .agg_distinct_cached_entry_count
1567                .with_guarded_label_values(label_list),
1568        }
1569    }
1570
1571    pub fn new_temporal_join_metrics(
1572        &self,
1573        table_id: TableId,
1574        actor_id: ActorId,
1575        fragment_id: FragmentId,
1576    ) -> TemporalJoinMetrics {
1577        let label_list: &[&str; 3] = &[
1578            &table_id.to_string(),
1579            &actor_id.to_string(),
1580            &fragment_id.to_string(),
1581        ];
1582        TemporalJoinMetrics {
1583            temporal_join_cache_miss_count: self
1584                .temporal_join_cache_miss_count
1585                .with_guarded_label_values(label_list),
1586            temporal_join_total_query_cache_count: self
1587                .temporal_join_total_query_cache_count
1588                .with_guarded_label_values(label_list),
1589            temporal_join_cached_entry_count: self
1590                .temporal_join_cached_entry_count
1591                .with_guarded_label_values(label_list),
1592        }
1593    }
1594
1595    pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1596        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1597        BackfillMetrics {
1598            backfill_snapshot_read_row_count: self
1599                .backfill_snapshot_read_row_count
1600                .with_guarded_label_values(label_list),
1601            backfill_upstream_output_row_count: self
1602                .backfill_upstream_output_row_count
1603                .with_guarded_label_values(label_list),
1604        }
1605    }
1606
1607    pub fn new_cdc_backfill_metrics(
1608        &self,
1609        table_id: TableId,
1610        actor_id: ActorId,
1611    ) -> CdcBackfillMetrics {
1612        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1613        CdcBackfillMetrics {
1614            cdc_backfill_snapshot_read_row_count: self
1615                .cdc_backfill_snapshot_read_row_count
1616                .with_guarded_label_values(label_list),
1617            cdc_backfill_upstream_output_row_count: self
1618                .cdc_backfill_upstream_output_row_count
1619                .with_guarded_label_values(label_list),
1620        }
1621    }
1622
1623    pub fn new_over_window_metrics(
1624        &self,
1625        table_id: u32,
1626        actor_id: ActorId,
1627        fragment_id: FragmentId,
1628    ) -> OverWindowMetrics {
1629        let label_list: &[&str; 3] = &[
1630            &table_id.to_string(),
1631            &actor_id.to_string(),
1632            &fragment_id.to_string(),
1633        ];
1634        OverWindowMetrics {
1635            over_window_cached_entry_count: self
1636                .over_window_cached_entry_count
1637                .with_guarded_label_values(label_list),
1638            over_window_cache_lookup_count: self
1639                .over_window_cache_lookup_count
1640                .with_guarded_label_values(label_list),
1641            over_window_cache_miss_count: self
1642                .over_window_cache_miss_count
1643                .with_guarded_label_values(label_list),
1644            over_window_range_cache_entry_count: self
1645                .over_window_range_cache_entry_count
1646                .with_guarded_label_values(label_list),
1647            over_window_range_cache_lookup_count: self
1648                .over_window_range_cache_lookup_count
1649                .with_guarded_label_values(label_list),
1650            over_window_range_cache_left_miss_count: self
1651                .over_window_range_cache_left_miss_count
1652                .with_guarded_label_values(label_list),
1653            over_window_range_cache_right_miss_count: self
1654                .over_window_range_cache_right_miss_count
1655                .with_guarded_label_values(label_list),
1656            over_window_accessed_entry_count: self
1657                .over_window_accessed_entry_count
1658                .with_guarded_label_values(label_list),
1659            over_window_compute_count: self
1660                .over_window_compute_count
1661                .with_guarded_label_values(label_list),
1662            over_window_same_output_count: self
1663                .over_window_same_output_count
1664                .with_guarded_label_values(label_list),
1665        }
1666    }
1667
1668    pub fn new_materialize_metrics(
1669        &self,
1670        table_id: TableId,
1671        actor_id: ActorId,
1672        fragment_id: FragmentId,
1673    ) -> MaterializeMetrics {
1674        let label_list: &[&str; 3] = &[
1675            &actor_id.to_string(),
1676            &table_id.to_string(),
1677            &fragment_id.to_string(),
1678        ];
1679        MaterializeMetrics {
1680            materialize_cache_hit_count: self
1681                .materialize_cache_hit_count
1682                .with_guarded_label_values(label_list),
1683            materialize_data_exist_count: self
1684                .materialize_data_exist_count
1685                .with_guarded_label_values(label_list),
1686            materialize_cache_total_count: self
1687                .materialize_cache_total_count
1688                .with_guarded_label_values(label_list),
1689            materialize_input_row_count: self
1690                .materialize_input_row_count
1691                .with_guarded_label_values(label_list),
1692            materialize_current_epoch: self
1693                .materialize_current_epoch
1694                .with_guarded_label_values(label_list),
1695        }
1696    }
1697}
1698
1699pub(crate) struct ActorInputMetrics {
1700    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1701    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1702}
1703
1704/// Tokio metrics for actors
1705pub struct ActorMetrics {
1706    pub actor_execution_time: LabelGuardedGauge,
1707    pub actor_scheduled_duration: LabelGuardedGauge,
1708    pub actor_scheduled_cnt: LabelGuardedIntGauge,
1709    pub actor_fast_poll_duration: LabelGuardedGauge,
1710    pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1711    pub actor_slow_poll_duration: LabelGuardedGauge,
1712    pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1713    pub actor_poll_duration: LabelGuardedGauge,
1714    pub actor_poll_cnt: LabelGuardedIntGauge,
1715    pub actor_idle_duration: LabelGuardedGauge,
1716    pub actor_idle_cnt: LabelGuardedIntGauge,
1717}
1718
1719pub struct SinkExecutorMetrics {
1720    pub sink_input_row_count: LabelGuardedIntCounter,
1721    pub sink_input_bytes: LabelGuardedIntCounter,
1722    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1723}
1724
1725pub struct MaterializeMetrics {
1726    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1727    pub materialize_data_exist_count: LabelGuardedIntCounter,
1728    pub materialize_cache_total_count: LabelGuardedIntCounter,
1729    pub materialize_input_row_count: LabelGuardedIntCounter,
1730    pub materialize_current_epoch: LabelGuardedIntGauge,
1731}
1732
1733pub struct GroupTopNMetrics {
1734    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1735    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1736    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1737}
1738
1739pub struct LookupExecutorMetrics {
1740    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1741    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1742    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1743}
1744
1745pub struct HashAggMetrics {
1746    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1747    pub agg_total_lookup_count: LabelGuardedIntCounter,
1748    pub agg_cached_entry_count: LabelGuardedIntGauge,
1749    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1750    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1751    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1752    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1753    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1754    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1755}
1756
1757pub struct AggDistinctDedupMetrics {
1758    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1759    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1760    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1761}
1762
1763pub struct TemporalJoinMetrics {
1764    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1765    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1766    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1767}
1768
1769pub struct BackfillMetrics {
1770    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1771    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1772}
1773
1774#[derive(Clone)]
1775pub struct CdcBackfillMetrics {
1776    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1777    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1778}
1779
1780pub struct OverWindowMetrics {
1781    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1782    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1783    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1784    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1785    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1786    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1787    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1788    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1789    pub over_window_compute_count: LabelGuardedIntCounter,
1790    pub over_window_same_output_count: LabelGuardedIntCounter,
1791}