risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec,
58    actor_scheduled_duration: LabelGuardedGaugeVec,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60    actor_fast_poll_duration: LabelGuardedGaugeVec,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62    actor_slow_poll_duration: LabelGuardedGaugeVec,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64    actor_poll_duration: LabelGuardedGaugeVec,
65    actor_poll_cnt: LabelGuardedIntGaugeVec,
66    actor_idle_duration: LabelGuardedGaugeVec,
67    actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec,
77    pub source_split_change_count: LabelGuardedIntCounterVec,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec,
82    sink_input_bytes: LabelGuardedIntCounterVec,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94    actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161    over_window_compute_count: LabelGuardedIntCounterVec,
162    over_window_same_output_count: LabelGuardedIntCounterVec,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217    // PostgreSQL CDC LSN monitoring
218    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221    // Gap Fill
222    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228    GLOBAL_STREAMING_METRICS
229        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230        .clone()
231}
232
233impl StreamingMetrics {
234    fn new(registry: &Registry, level: MetricLevel) -> Self {
235        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236            "stream_executor_row_count",
237            "Total number of rows that have been output from each executor",
238            &["actor_id", "fragment_id", "executor_identity"],
239            registry
240        )
241        .unwrap()
242        .relabel_debug_1(level);
243
244        let stream_node_output_row_count = CountMap::new();
245        let stream_node_output_blocking_duration_ns = CountMap::new();
246
247        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248            "stream_source_output_rows_counts",
249            "Total number of rows that have been output from source",
250            &["source_id", "source_name", "actor_id", "fragment_id"],
251            registry
252        )
253        .unwrap();
254
255        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256            "stream_source_split_change_event_count",
257            "Total number of split change events that have been operated by source",
258            &["source_id", "source_name", "actor_id", "fragment_id"],
259            registry
260        )
261        .unwrap();
262
263        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264            "stream_source_backfill_rows_counts",
265            "Total number of rows that have been backfilled for source",
266            &["source_id", "source_name", "actor_id", "fragment_id"],
267            registry
268        )
269        .unwrap();
270
271        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272            "stream_sink_input_row_count",
273            "Total number of rows streamed into sink executors",
274            &["sink_id", "actor_id", "fragment_id"],
275            registry
276        )
277        .unwrap();
278
279        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280            "stream_sink_input_bytes",
281            "Total size of chunks streamed into sink executors",
282            &["sink_id", "actor_id", "fragment_id"],
283            registry
284        )
285        .unwrap();
286
287        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288            "stream_mview_input_row_count",
289            "Total number of rows streamed into materialize executors",
290            &["actor_id", "table_id", "fragment_id"],
291            registry
292        )
293        .unwrap()
294        .relabel_debug_1(level);
295
296        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297            "stream_mview_current_epoch",
298            "The current epoch of the materialized executor",
299            &["actor_id", "table_id", "fragment_id"],
300            registry
301        )
302        .unwrap()
303        .relabel_debug_1(level);
304
305        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306            "stream_pg_cdc_state_table_lsn",
307            "Current LSN value stored in PostgreSQL CDC state table",
308            &["source_id"],
309            registry,
310        )
311        .unwrap();
312
313        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314            "stream_pg_cdc_jni_commit_offset_lsn",
315            "LSN value when JNI commit offset is called for PostgreSQL CDC",
316            &["source_id"],
317            registry,
318        )
319        .unwrap();
320
321        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
322            "stream_sink_chunk_buffer_size",
323            "Total size of chunks buffered in a barrier",
324            &["sink_id", "actor_id", "fragment_id"],
325            registry
326        )
327        .unwrap();
328
329        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
330            "stream_actor_actor_execution_time",
331            "Total execution time (s) of an actor",
332            &["actor_id"],
333            registry
334        )
335        .unwrap();
336
337        let actor_output_buffer_blocking_duration_ns =
338            register_guarded_int_counter_vec_with_registry!(
339                "stream_actor_output_buffer_blocking_duration_ns",
340                "Total blocking duration (ns) of output buffer",
341                &["actor_id", "fragment_id", "downstream_fragment_id"],
342                registry
343            )
344            .unwrap()
345            // mask the first label `actor_id` if the level is less verbose than `Debug`
346            .relabel_debug_1(level);
347
348        let actor_input_buffer_blocking_duration_ns =
349            register_guarded_int_counter_vec_with_registry!(
350                "stream_actor_input_buffer_blocking_duration_ns",
351                "Total blocking duration (ns) of input buffer",
352                &["actor_id", "fragment_id", "upstream_fragment_id"],
353                registry
354            )
355            .unwrap();
356
357        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
358            "stream_exchange_frag_recv_size",
359            "Total size of messages that have been received from upstream Fragment",
360            &["up_fragment_id", "down_fragment_id"],
361            registry
362        )
363        .unwrap();
364
365        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
366            "stream_actor_fast_poll_duration",
367            "tokio's metrics",
368            &["actor_id"],
369            registry
370        )
371        .unwrap();
372
373        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
374            "stream_actor_fast_poll_cnt",
375            "tokio's metrics",
376            &["actor_id"],
377            registry
378        )
379        .unwrap();
380
381        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
382            "stream_actor_slow_poll_duration",
383            "tokio's metrics",
384            &["actor_id"],
385            registry
386        )
387        .unwrap();
388
389        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
390            "stream_actor_slow_poll_cnt",
391            "tokio's metrics",
392            &["actor_id"],
393            registry
394        )
395        .unwrap();
396
397        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
398            "stream_actor_poll_duration",
399            "tokio's metrics",
400            &["actor_id"],
401            registry
402        )
403        .unwrap();
404
405        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
406            "stream_actor_poll_cnt",
407            "tokio's metrics",
408            &["actor_id"],
409            registry
410        )
411        .unwrap();
412
413        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
414            "stream_actor_scheduled_duration",
415            "tokio's metrics",
416            &["actor_id"],
417            registry
418        )
419        .unwrap();
420
421        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
422            "stream_actor_scheduled_cnt",
423            "tokio's metrics",
424            &["actor_id"],
425            registry
426        )
427        .unwrap();
428
429        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
430            "stream_actor_idle_duration",
431            "tokio's metrics",
432            &["actor_id"],
433            registry
434        )
435        .unwrap();
436
437        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
438            "stream_actor_idle_cnt",
439            "tokio's metrics",
440            &["actor_id"],
441            registry
442        )
443        .unwrap();
444
445        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
446            "stream_actor_in_record_cnt",
447            "Total number of rows actor received",
448            &["actor_id", "fragment_id", "upstream_fragment_id"],
449            registry
450        )
451        .unwrap()
452        .relabel_debug_1(level);
453
454        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
455            "stream_actor_out_record_cnt",
456            "Total number of rows actor sent",
457            &["actor_id", "fragment_id"],
458            registry
459        )
460        .unwrap()
461        .relabel_debug_1(level);
462
463        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
464            "stream_actor_current_epoch",
465            "Current epoch of actor",
466            &["actor_id", "fragment_id"],
467            registry
468        )
469        .unwrap()
470        .relabel_debug_1(level);
471
472        let actor_count = register_guarded_int_gauge_vec_with_registry!(
473            "stream_actor_count",
474            "Total number of actors (parallelism)",
475            &["fragment_id"],
476            registry
477        )
478        .unwrap();
479
480        let opts = histogram_opts!(
481            "stream_merge_barrier_align_duration",
482            "Duration of merge align barrier",
483            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
484        );
485        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
486            opts,
487            &["actor_id", "fragment_id"],
488            registry
489        )
490        .unwrap()
491        .relabel_debug_1(level);
492
493        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
494            "stream_join_lookup_miss_count",
495            "Join executor lookup miss duration",
496            &["side", "join_table_id", "actor_id", "fragment_id"],
497            registry
498        )
499        .unwrap();
500
501        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
502            "stream_join_lookup_total_count",
503            "Join executor lookup total operation",
504            &["side", "join_table_id", "actor_id", "fragment_id"],
505            registry
506        )
507        .unwrap();
508
509        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
510            "stream_join_insert_cache_miss_count",
511            "Join executor cache miss when insert operation",
512            &["side", "join_table_id", "actor_id", "fragment_id"],
513            registry
514        )
515        .unwrap();
516
517        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
518            "stream_join_actor_input_waiting_duration_ns",
519            "Total waiting duration (ns) of input buffer of join actor",
520            &["actor_id", "fragment_id"],
521            registry
522        )
523        .unwrap();
524
525        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
526            "stream_join_match_duration_ns",
527            "Matching duration for each side",
528            &["actor_id", "fragment_id", "side"],
529            registry
530        )
531        .unwrap();
532
533        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
534            "stream_barrier_align_duration_ns",
535            "Duration of join align barrier",
536            &["actor_id", "fragment_id", "wait_side", "executor"],
537            registry
538        )
539        .unwrap()
540        .relabel_debug_1(level);
541
542        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
543            "stream_join_cached_entry_count",
544            "Number of cached entries in streaming join operators",
545            &["actor_id", "fragment_id", "side"],
546            registry
547        )
548        .unwrap();
549
550        let join_matched_join_keys_opts = histogram_opts!(
551            "stream_join_matched_join_keys",
552            "The number of keys matched in the opposite side",
553            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
554        );
555
556        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
557            join_matched_join_keys_opts,
558            &["actor_id", "fragment_id", "table_id"],
559            registry
560        )
561        .unwrap()
562        .relabel_debug_1(level);
563
564        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
565            "stream_agg_lookup_miss_count",
566            "Aggregation executor lookup miss duration",
567            &["table_id", "actor_id", "fragment_id"],
568            registry
569        )
570        .unwrap();
571
572        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
573            "stream_agg_lookup_total_count",
574            "Aggregation executor lookup total operation",
575            &["table_id", "actor_id", "fragment_id"],
576            registry
577        )
578        .unwrap();
579
580        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
581            "stream_agg_distinct_cache_miss_count",
582            "Aggregation executor dinsinct miss duration",
583            &["table_id", "actor_id", "fragment_id"],
584            registry
585        )
586        .unwrap();
587
588        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
589            "stream_agg_distinct_total_cache_count",
590            "Aggregation executor distinct total operation",
591            &["table_id", "actor_id", "fragment_id"],
592            registry
593        )
594        .unwrap();
595
596        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
597            "stream_agg_distinct_cached_entry_count",
598            "Total entry counts in distinct aggregation executor cache",
599            &["table_id", "actor_id", "fragment_id"],
600            registry
601        )
602        .unwrap();
603
604        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
605            "stream_agg_dirty_groups_count",
606            "Total dirty group counts in aggregation executor",
607            &["table_id", "actor_id", "fragment_id"],
608            registry
609        )
610        .unwrap();
611
612        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
613            "stream_agg_dirty_groups_heap_size",
614            "Total dirty group heap size in aggregation executor",
615            &["table_id", "actor_id", "fragment_id"],
616            registry
617        )
618        .unwrap();
619
620        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
621            "stream_agg_state_cache_lookup_count",
622            "Aggregation executor state cache lookup count",
623            &["table_id", "actor_id", "fragment_id"],
624            registry
625        )
626        .unwrap();
627
628        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
629            "stream_agg_state_cache_miss_count",
630            "Aggregation executor state cache miss count",
631            &["table_id", "actor_id", "fragment_id"],
632            registry
633        )
634        .unwrap();
635
636        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
637            "stream_group_top_n_cache_miss_count",
638            "Group top n executor cache miss count",
639            &["table_id", "actor_id", "fragment_id"],
640            registry
641        )
642        .unwrap();
643
644        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
645            "stream_group_top_n_total_query_cache_count",
646            "Group top n executor query cache total count",
647            &["table_id", "actor_id", "fragment_id"],
648            registry
649        )
650        .unwrap();
651
652        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
653            "stream_group_top_n_cached_entry_count",
654            "Total entry counts in group top n executor cache",
655            &["table_id", "actor_id", "fragment_id"],
656            registry
657        )
658        .unwrap();
659
660        let group_top_n_appendonly_cache_miss_count =
661            register_guarded_int_counter_vec_with_registry!(
662                "stream_group_top_n_appendonly_cache_miss_count",
663                "Group top n appendonly executor cache miss count",
664                &["table_id", "actor_id", "fragment_id"],
665                registry
666            )
667            .unwrap();
668
669        let group_top_n_appendonly_total_query_cache_count =
670            register_guarded_int_counter_vec_with_registry!(
671                "stream_group_top_n_appendonly_total_query_cache_count",
672                "Group top n appendonly executor total cache count",
673                &["table_id", "actor_id", "fragment_id"],
674                registry
675            )
676            .unwrap();
677
678        let group_top_n_appendonly_cached_entry_count =
679            register_guarded_int_gauge_vec_with_registry!(
680                "stream_group_top_n_appendonly_cached_entry_count",
681                "Total entry counts in group top n appendonly executor cache",
682                &["table_id", "actor_id", "fragment_id"],
683                registry
684            )
685            .unwrap();
686
687        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
688            "stream_lookup_cache_miss_count",
689            "Lookup executor cache miss count",
690            &["table_id", "actor_id", "fragment_id"],
691            registry
692        )
693        .unwrap();
694
695        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
696            "stream_lookup_total_query_cache_count",
697            "Lookup executor query cache total count",
698            &["table_id", "actor_id", "fragment_id"],
699            registry
700        )
701        .unwrap();
702
703        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
704            "stream_lookup_cached_entry_count",
705            "Total entry counts in lookup executor cache",
706            &["table_id", "actor_id", "fragment_id"],
707            registry
708        )
709        .unwrap();
710
711        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
712            "stream_temporal_join_cache_miss_count",
713            "Temporal join executor cache miss count",
714            &["table_id", "actor_id", "fragment_id"],
715            registry
716        )
717        .unwrap();
718
719        let temporal_join_total_query_cache_count =
720            register_guarded_int_counter_vec_with_registry!(
721                "stream_temporal_join_total_query_cache_count",
722                "Temporal join executor query cache total count",
723                &["table_id", "actor_id", "fragment_id"],
724                registry
725            )
726            .unwrap();
727
728        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
729            "stream_temporal_join_cached_entry_count",
730            "Total entry count in temporal join executor cache",
731            &["table_id", "actor_id", "fragment_id"],
732            registry
733        )
734        .unwrap();
735
736        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
737            "stream_agg_cached_entry_count",
738            "Number of cached keys in streaming aggregation operators",
739            &["table_id", "actor_id", "fragment_id"],
740            registry
741        )
742        .unwrap();
743
744        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
745            "stream_agg_chunk_lookup_miss_count",
746            "Aggregation executor chunk-level lookup miss duration",
747            &["table_id", "actor_id", "fragment_id"],
748            registry
749        )
750        .unwrap();
751
752        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
753            "stream_agg_chunk_lookup_total_count",
754            "Aggregation executor chunk-level lookup total operation",
755            &["table_id", "actor_id", "fragment_id"],
756            registry
757        )
758        .unwrap();
759
760        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
761            "stream_backfill_snapshot_read_row_count",
762            "Total number of rows that have been read from the backfill snapshot",
763            &["table_id", "actor_id"],
764            registry
765        )
766        .unwrap();
767
768        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
769            "stream_backfill_upstream_output_row_count",
770            "Total number of rows that have been output from the backfill upstream",
771            &["table_id", "actor_id"],
772            registry
773        )
774        .unwrap();
775
776        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
777            "stream_cdc_backfill_snapshot_read_row_count",
778            "Total number of rows that have been read from the cdc_backfill snapshot",
779            &["table_id", "actor_id"],
780            registry
781        )
782        .unwrap();
783
784        let cdc_backfill_upstream_output_row_count =
785            register_guarded_int_counter_vec_with_registry!(
786                "stream_cdc_backfill_upstream_output_row_count",
787                "Total number of rows that have been output from the cdc_backfill upstream",
788                &["table_id", "actor_id"],
789                registry
790            )
791            .unwrap();
792
793        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
794            "stream_snapshot_backfill_consume_snapshot_row_count",
795            "Total number of rows that have been output from snapshot backfill",
796            &["table_id", "actor_id", "stage"],
797            registry
798        )
799        .unwrap();
800
801        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
802            "stream_over_window_cached_entry_count",
803            "Total entry (partition) count in over window executor cache",
804            &["table_id", "actor_id", "fragment_id"],
805            registry
806        )
807        .unwrap();
808
809        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
810            "stream_over_window_cache_lookup_count",
811            "Over window executor cache lookup count",
812            &["table_id", "actor_id", "fragment_id"],
813            registry
814        )
815        .unwrap();
816
817        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
818            "stream_over_window_cache_miss_count",
819            "Over window executor cache miss count",
820            &["table_id", "actor_id", "fragment_id"],
821            registry
822        )
823        .unwrap();
824
825        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
826            "stream_over_window_range_cache_entry_count",
827            "Over window partition range cache entry count",
828            &["table_id", "actor_id", "fragment_id"],
829            registry,
830        )
831        .unwrap();
832
833        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
834            "stream_over_window_range_cache_lookup_count",
835            "Over window partition range cache lookup count",
836            &["table_id", "actor_id", "fragment_id"],
837            registry
838        )
839        .unwrap();
840
841        let over_window_range_cache_left_miss_count =
842            register_guarded_int_counter_vec_with_registry!(
843                "stream_over_window_range_cache_left_miss_count",
844                "Over window partition range cache left miss count",
845                &["table_id", "actor_id", "fragment_id"],
846                registry
847            )
848            .unwrap();
849
850        let over_window_range_cache_right_miss_count =
851            register_guarded_int_counter_vec_with_registry!(
852                "stream_over_window_range_cache_right_miss_count",
853                "Over window partition range cache right miss count",
854                &["table_id", "actor_id", "fragment_id"],
855                registry
856            )
857            .unwrap();
858
859        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
860            "stream_over_window_accessed_entry_count",
861            "Over window accessed entry count",
862            &["table_id", "actor_id", "fragment_id"],
863            registry
864        )
865        .unwrap();
866
867        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
868            "stream_over_window_compute_count",
869            "Over window compute count",
870            &["table_id", "actor_id", "fragment_id"],
871            registry
872        )
873        .unwrap();
874
875        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
876            "stream_over_window_same_output_count",
877            "Over window same output count",
878            &["table_id", "actor_id", "fragment_id"],
879            registry
880        )
881        .unwrap();
882
883        let opts = histogram_opts!(
884            "stream_barrier_inflight_duration_seconds",
885            "barrier_inflight_latency",
886            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
887        );
888        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
889
890        let opts = histogram_opts!(
891            "stream_barrier_sync_storage_duration_seconds",
892            "barrier_sync_latency",
893            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
894        );
895        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
896
897        let opts = histogram_opts!(
898            "stream_barrier_batch_size",
899            "barrier_batch_size",
900            exponential_buckets(1.0, 2.0, 8).unwrap()
901        );
902        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
903
904        let barrier_manager_progress = register_int_counter_with_registry!(
905            "stream_barrier_manager_progress",
906            "The number of actors that have processed the earliest in-flight barriers",
907            registry
908        )
909        .unwrap();
910
911        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
912            "sync_kv_log_store_wait_next_poll_ns",
913            "Total duration (ns) of waiting for next poll",
914            &["actor_id", "target", "fragment_id", "relation"],
915            registry
916        )
917        .unwrap();
918
919        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
920            "sync_kv_log_store_read_count",
921            "read row count throughput of sync_kv log store",
922            &["type", "actor_id", "target", "fragment_id", "relation"],
923            registry
924        )
925        .unwrap();
926
927        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
928            "sync_kv_log_store_read_size",
929            "read size throughput of sync_kv log store",
930            &["type", "actor_id", "target", "fragment_id", "relation"],
931            registry
932        )
933        .unwrap();
934
935        let sync_kv_log_store_write_pause_duration_ns =
936            register_guarded_int_counter_vec_with_registry!(
937                "sync_kv_log_store_write_pause_duration_ns",
938                "Duration (ns) of sync_kv log store write pause",
939                &["actor_id", "target", "fragment_id", "relation"],
940                registry
941            )
942            .unwrap();
943
944        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
945            "sync_kv_log_store_state",
946            "clean/unclean state transition for sync_kv log store",
947            &["state", "actor_id", "target", "fragment_id", "relation"],
948            registry
949        )
950        .unwrap();
951
952        let sync_kv_log_store_storage_write_count =
953            register_guarded_int_counter_vec_with_registry!(
954                "sync_kv_log_store_storage_write_count",
955                "Write row count throughput of sync_kv log store",
956                &["actor_id", "target", "fragment_id", "relation"],
957                registry
958            )
959            .unwrap();
960
961        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
962            "sync_kv_log_store_storage_write_size",
963            "Write size throughput of sync_kv log store",
964            &["actor_id", "target", "fragment_id", "relation"],
965            registry
966        )
967        .unwrap();
968
969        let sync_kv_log_store_buffer_unconsumed_item_count =
970            register_guarded_int_gauge_vec_with_registry!(
971                "sync_kv_log_store_buffer_unconsumed_item_count",
972                "Number of Unconsumed Item in buffer",
973                &["actor_id", "target", "fragment_id", "relation"],
974                registry
975            )
976            .unwrap();
977
978        let sync_kv_log_store_buffer_unconsumed_row_count =
979            register_guarded_int_gauge_vec_with_registry!(
980                "sync_kv_log_store_buffer_unconsumed_row_count",
981                "Number of Unconsumed Row in buffer",
982                &["actor_id", "target", "fragment_id", "relation"],
983                registry
984            )
985            .unwrap();
986
987        let sync_kv_log_store_buffer_unconsumed_epoch_count =
988            register_guarded_int_gauge_vec_with_registry!(
989                "sync_kv_log_store_buffer_unconsumed_epoch_count",
990                "Number of Unconsumed Epoch in buffer",
991                &["actor_id", "target", "fragment_id", "relation"],
992                registry
993            )
994            .unwrap();
995
996        let sync_kv_log_store_buffer_unconsumed_min_epoch =
997            register_guarded_int_gauge_vec_with_registry!(
998                "sync_kv_log_store_buffer_unconsumed_min_epoch",
999                "Number of Unconsumed Epoch in buffer",
1000                &["actor_id", "target", "fragment_id", "relation"],
1001                registry
1002            )
1003            .unwrap();
1004
1005        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1006            "kv_log_store_storage_write_count",
1007            "Write row count throughput of kv log store",
1008            &["actor_id", "connector", "sink_id", "sink_name"],
1009            registry
1010        )
1011        .unwrap();
1012
1013        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1014            "kv_log_store_storage_write_size",
1015            "Write size throughput of kv log store",
1016            &["actor_id", "connector", "sink_id", "sink_name"],
1017            registry
1018        )
1019        .unwrap();
1020
1021        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1022            "kv_log_store_storage_read_count",
1023            "Write row count throughput of kv log store",
1024            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1025            registry
1026        )
1027        .unwrap();
1028
1029        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1030            "kv_log_store_storage_read_size",
1031            "Write size throughput of kv log store",
1032            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1033            registry
1034        )
1035        .unwrap();
1036
1037        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1038            "kv_log_store_rewind_count",
1039            "Kv log store rewind rate",
1040            &["actor_id", "connector", "sink_id", "sink_name"],
1041            registry
1042        )
1043        .unwrap();
1044
1045        let kv_log_store_rewind_delay_opts = {
1046            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1047            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1048                - REWIND_BASE_DELAY.as_secs_f64().log2())
1049            .ceil() as usize;
1050            let buckets = exponential_buckets(
1051                REWIND_BASE_DELAY.as_secs_f64(),
1052                REWIND_BACKOFF_FACTOR as _,
1053                bucket_count,
1054            )
1055            .unwrap();
1056            histogram_opts!(
1057                "kv_log_store_rewind_delay",
1058                "Kv log store rewind delay",
1059                buckets,
1060            )
1061        };
1062
1063        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1064            kv_log_store_rewind_delay_opts,
1065            &["actor_id", "connector", "sink_id", "sink_name"],
1066            registry
1067        )
1068        .unwrap();
1069
1070        let kv_log_store_buffer_unconsumed_item_count =
1071            register_guarded_int_gauge_vec_with_registry!(
1072                "kv_log_store_buffer_unconsumed_item_count",
1073                "Number of Unconsumed Item in buffer",
1074                &["actor_id", "connector", "sink_id", "sink_name"],
1075                registry
1076            )
1077            .unwrap();
1078
1079        let kv_log_store_buffer_unconsumed_row_count =
1080            register_guarded_int_gauge_vec_with_registry!(
1081                "kv_log_store_buffer_unconsumed_row_count",
1082                "Number of Unconsumed Row in buffer",
1083                &["actor_id", "connector", "sink_id", "sink_name"],
1084                registry
1085            )
1086            .unwrap();
1087
1088        let kv_log_store_buffer_unconsumed_epoch_count =
1089            register_guarded_int_gauge_vec_with_registry!(
1090                "kv_log_store_buffer_unconsumed_epoch_count",
1091                "Number of Unconsumed Epoch in buffer",
1092                &["actor_id", "connector", "sink_id", "sink_name"],
1093                registry
1094            )
1095            .unwrap();
1096
1097        let kv_log_store_buffer_unconsumed_min_epoch =
1098            register_guarded_int_gauge_vec_with_registry!(
1099                "kv_log_store_buffer_unconsumed_min_epoch",
1100                "Number of Unconsumed Epoch in buffer",
1101                &["actor_id", "connector", "sink_id", "sink_name"],
1102                registry
1103            )
1104            .unwrap();
1105
1106        let lru_runtime_loop_count = register_int_counter_with_registry!(
1107            "lru_runtime_loop_count",
1108            "The counts of the eviction loop in LRU manager per second",
1109            registry
1110        )
1111        .unwrap();
1112
1113        let lru_latest_sequence = register_int_gauge_with_registry!(
1114            "lru_latest_sequence",
1115            "Current LRU global sequence",
1116            registry,
1117        )
1118        .unwrap();
1119
1120        let lru_watermark_sequence = register_int_gauge_with_registry!(
1121            "lru_watermark_sequence",
1122            "Current LRU watermark sequence",
1123            registry,
1124        )
1125        .unwrap();
1126
1127        let lru_eviction_policy = register_int_gauge_with_registry!(
1128            "lru_eviction_policy",
1129            "Current LRU eviction policy",
1130            registry,
1131        )
1132        .unwrap();
1133
1134        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1135            "jemalloc_allocated_bytes",
1136            "The allocated memory jemalloc, got from jemalloc_ctl",
1137            registry
1138        )
1139        .unwrap();
1140
1141        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1142            "jemalloc_active_bytes",
1143            "The active memory jemalloc, got from jemalloc_ctl",
1144            registry
1145        )
1146        .unwrap();
1147
1148        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1149            "jemalloc_resident_bytes",
1150            "The active memory jemalloc, got from jemalloc_ctl",
1151            registry
1152        )
1153        .unwrap();
1154
1155        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1156            "jemalloc_metadata_bytes",
1157            "The active memory jemalloc, got from jemalloc_ctl",
1158            registry
1159        )
1160        .unwrap();
1161
1162        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1163            "jvm_allocated_bytes",
1164            "The allocated jvm memory",
1165            registry
1166        )
1167        .unwrap();
1168
1169        let jvm_active_bytes = register_int_gauge_with_registry!(
1170            "jvm_active_bytes",
1171            "The active jvm memory",
1172            registry
1173        )
1174        .unwrap();
1175
1176        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1177            "stream_materialize_cache_hit_count",
1178            "Materialize executor cache hit count",
1179            &["actor_id", "table_id", "fragment_id"],
1180            registry
1181        )
1182        .unwrap()
1183        .relabel_debug_1(level);
1184
1185        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1186            "stream_materialize_data_exist_count",
1187            "Materialize executor data exist count",
1188            &["actor_id", "table_id", "fragment_id"],
1189            registry
1190        )
1191        .unwrap()
1192        .relabel_debug_1(level);
1193
1194        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1195            "stream_materialize_cache_total_count",
1196            "Materialize executor cache total operation",
1197            &["actor_id", "table_id", "fragment_id"],
1198            registry
1199        )
1200        .unwrap()
1201        .relabel_debug_1(level);
1202
1203        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1204            "stream_memory_usage",
1205            "Memory usage for stream executors",
1206            &["actor_id", "table_id", "desc"],
1207            registry
1208        )
1209        .unwrap()
1210        .relabel_debug_1(level);
1211
1212        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1213            "gap_fill_generated_rows_count",
1214            "Total number of rows generated by gap fill executor",
1215            &["actor_id", "fragment_id"],
1216            registry
1217        )
1218        .unwrap()
1219        .relabel_debug_1(level);
1220
1221        Self {
1222            level,
1223            executor_row_count,
1224            mem_stream_node_output_row_count: stream_node_output_row_count,
1225            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1226            actor_execution_time,
1227            actor_scheduled_duration,
1228            actor_scheduled_cnt,
1229            actor_fast_poll_duration,
1230            actor_fast_poll_cnt,
1231            actor_slow_poll_duration,
1232            actor_slow_poll_cnt,
1233            actor_poll_duration,
1234            actor_poll_cnt,
1235            actor_idle_duration,
1236            actor_idle_cnt,
1237            actor_count,
1238            actor_in_record_cnt,
1239            actor_out_record_cnt,
1240            actor_current_epoch,
1241            source_output_row_count,
1242            source_split_change_count,
1243            source_backfill_row_count,
1244            sink_input_row_count,
1245            sink_input_bytes,
1246            sink_chunk_buffer_size,
1247            exchange_frag_recv_size,
1248            merge_barrier_align_duration,
1249            actor_output_buffer_blocking_duration_ns,
1250            actor_input_buffer_blocking_duration_ns,
1251            join_lookup_miss_count,
1252            join_lookup_total_count,
1253            join_insert_cache_miss_count,
1254            join_actor_input_waiting_duration_ns,
1255            join_match_duration_ns,
1256            join_cached_entry_count,
1257            join_matched_join_keys,
1258            barrier_align_duration,
1259            agg_lookup_miss_count,
1260            agg_total_lookup_count,
1261            agg_cached_entry_count,
1262            agg_chunk_lookup_miss_count,
1263            agg_chunk_total_lookup_count,
1264            agg_dirty_groups_count,
1265            agg_dirty_groups_heap_size,
1266            agg_distinct_cache_miss_count,
1267            agg_distinct_total_cache_count,
1268            agg_distinct_cached_entry_count,
1269            agg_state_cache_lookup_count,
1270            agg_state_cache_miss_count,
1271            group_top_n_cache_miss_count,
1272            group_top_n_total_query_cache_count,
1273            group_top_n_cached_entry_count,
1274            group_top_n_appendonly_cache_miss_count,
1275            group_top_n_appendonly_total_query_cache_count,
1276            group_top_n_appendonly_cached_entry_count,
1277            lookup_cache_miss_count,
1278            lookup_total_query_cache_count,
1279            lookup_cached_entry_count,
1280            temporal_join_cache_miss_count,
1281            temporal_join_total_query_cache_count,
1282            temporal_join_cached_entry_count,
1283            backfill_snapshot_read_row_count,
1284            backfill_upstream_output_row_count,
1285            cdc_backfill_snapshot_read_row_count,
1286            cdc_backfill_upstream_output_row_count,
1287            snapshot_backfill_consume_row_count,
1288            over_window_cached_entry_count,
1289            over_window_cache_lookup_count,
1290            over_window_cache_miss_count,
1291            over_window_range_cache_entry_count,
1292            over_window_range_cache_lookup_count,
1293            over_window_range_cache_left_miss_count,
1294            over_window_range_cache_right_miss_count,
1295            over_window_accessed_entry_count,
1296            over_window_compute_count,
1297            over_window_same_output_count,
1298            barrier_inflight_latency,
1299            barrier_sync_latency,
1300            barrier_batch_size,
1301            barrier_manager_progress,
1302            kv_log_store_storage_write_count,
1303            kv_log_store_storage_write_size,
1304            kv_log_store_rewind_count,
1305            kv_log_store_rewind_delay,
1306            kv_log_store_storage_read_count,
1307            kv_log_store_storage_read_size,
1308            kv_log_store_buffer_unconsumed_item_count,
1309            kv_log_store_buffer_unconsumed_row_count,
1310            kv_log_store_buffer_unconsumed_epoch_count,
1311            kv_log_store_buffer_unconsumed_min_epoch,
1312            sync_kv_log_store_read_count,
1313            sync_kv_log_store_read_size,
1314            sync_kv_log_store_write_pause_duration_ns,
1315            sync_kv_log_store_state,
1316            sync_kv_log_store_wait_next_poll_ns,
1317            sync_kv_log_store_storage_write_count,
1318            sync_kv_log_store_storage_write_size,
1319            sync_kv_log_store_buffer_unconsumed_item_count,
1320            sync_kv_log_store_buffer_unconsumed_row_count,
1321            sync_kv_log_store_buffer_unconsumed_epoch_count,
1322            sync_kv_log_store_buffer_unconsumed_min_epoch,
1323            lru_runtime_loop_count,
1324            lru_latest_sequence,
1325            lru_watermark_sequence,
1326            lru_eviction_policy,
1327            jemalloc_allocated_bytes,
1328            jemalloc_active_bytes,
1329            jemalloc_resident_bytes,
1330            jemalloc_metadata_bytes,
1331            jvm_allocated_bytes,
1332            jvm_active_bytes,
1333            stream_memory_usage,
1334            materialize_cache_hit_count,
1335            materialize_data_exist_count,
1336            materialize_cache_total_count,
1337            materialize_input_row_count,
1338            materialize_current_epoch,
1339            pg_cdc_state_table_lsn,
1340            pg_cdc_jni_commit_offset_lsn,
1341            gap_fill_generated_rows_count,
1342        }
1343    }
1344
1345    /// Create a new `StreamingMetrics` instance used in tests or other places.
1346    pub fn unused() -> Self {
1347        global_streaming_metrics(MetricLevel::Disabled)
1348    }
1349
1350    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1351        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1352        let actor_execution_time = self
1353            .actor_execution_time
1354            .with_guarded_label_values(label_list);
1355        let actor_scheduled_duration = self
1356            .actor_scheduled_duration
1357            .with_guarded_label_values(label_list);
1358        let actor_scheduled_cnt = self
1359            .actor_scheduled_cnt
1360            .with_guarded_label_values(label_list);
1361        let actor_fast_poll_duration = self
1362            .actor_fast_poll_duration
1363            .with_guarded_label_values(label_list);
1364        let actor_fast_poll_cnt = self
1365            .actor_fast_poll_cnt
1366            .with_guarded_label_values(label_list);
1367        let actor_slow_poll_duration = self
1368            .actor_slow_poll_duration
1369            .with_guarded_label_values(label_list);
1370        let actor_slow_poll_cnt = self
1371            .actor_slow_poll_cnt
1372            .with_guarded_label_values(label_list);
1373        let actor_poll_duration = self
1374            .actor_poll_duration
1375            .with_guarded_label_values(label_list);
1376        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1377        let actor_idle_duration = self
1378            .actor_idle_duration
1379            .with_guarded_label_values(label_list);
1380        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1381        ActorMetrics {
1382            actor_execution_time,
1383            actor_scheduled_duration,
1384            actor_scheduled_cnt,
1385            actor_fast_poll_duration,
1386            actor_fast_poll_cnt,
1387            actor_slow_poll_duration,
1388            actor_slow_poll_cnt,
1389            actor_poll_duration,
1390            actor_poll_cnt,
1391            actor_idle_duration,
1392            actor_idle_cnt,
1393        }
1394    }
1395
1396    pub(crate) fn new_actor_input_metrics(
1397        &self,
1398        actor_id: ActorId,
1399        fragment_id: FragmentId,
1400        upstream_fragment_id: FragmentId,
1401    ) -> ActorInputMetrics {
1402        let actor_id_str = actor_id.to_string();
1403        let fragment_id_str = fragment_id.to_string();
1404        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1405        ActorInputMetrics {
1406            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1407                &actor_id_str,
1408                &fragment_id_str,
1409                &upstream_fragment_id_str,
1410            ]),
1411            actor_input_buffer_blocking_duration_ns: self
1412                .actor_input_buffer_blocking_duration_ns
1413                .with_guarded_label_values(&[
1414                    &actor_id_str,
1415                    &fragment_id_str,
1416                    &upstream_fragment_id_str,
1417                ]),
1418        }
1419    }
1420
1421    pub fn new_sink_exec_metrics(
1422        &self,
1423        id: SinkId,
1424        actor_id: ActorId,
1425        fragment_id: FragmentId,
1426    ) -> SinkExecutorMetrics {
1427        let label_list: &[&str; 3] = &[
1428            &id.to_string(),
1429            &actor_id.to_string(),
1430            &fragment_id.to_string(),
1431        ];
1432        SinkExecutorMetrics {
1433            sink_input_row_count: self
1434                .sink_input_row_count
1435                .with_guarded_label_values(label_list),
1436            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1437            sink_chunk_buffer_size: self
1438                .sink_chunk_buffer_size
1439                .with_guarded_label_values(label_list),
1440        }
1441    }
1442
1443    pub fn new_group_top_n_metrics(
1444        &self,
1445        table_id: TableId,
1446        actor_id: ActorId,
1447        fragment_id: FragmentId,
1448    ) -> GroupTopNMetrics {
1449        let label_list: &[&str; 3] = &[
1450            &table_id.to_string(),
1451            &actor_id.to_string(),
1452            &fragment_id.to_string(),
1453        ];
1454
1455        GroupTopNMetrics {
1456            group_top_n_cache_miss_count: self
1457                .group_top_n_cache_miss_count
1458                .with_guarded_label_values(label_list),
1459            group_top_n_total_query_cache_count: self
1460                .group_top_n_total_query_cache_count
1461                .with_guarded_label_values(label_list),
1462            group_top_n_cached_entry_count: self
1463                .group_top_n_cached_entry_count
1464                .with_guarded_label_values(label_list),
1465        }
1466    }
1467
1468    pub fn new_append_only_group_top_n_metrics(
1469        &self,
1470        table_id: TableId,
1471        actor_id: ActorId,
1472        fragment_id: FragmentId,
1473    ) -> GroupTopNMetrics {
1474        let label_list: &[&str; 3] = &[
1475            &table_id.to_string(),
1476            &actor_id.to_string(),
1477            &fragment_id.to_string(),
1478        ];
1479
1480        GroupTopNMetrics {
1481            group_top_n_cache_miss_count: self
1482                .group_top_n_appendonly_cache_miss_count
1483                .with_guarded_label_values(label_list),
1484            group_top_n_total_query_cache_count: self
1485                .group_top_n_appendonly_total_query_cache_count
1486                .with_guarded_label_values(label_list),
1487            group_top_n_cached_entry_count: self
1488                .group_top_n_appendonly_cached_entry_count
1489                .with_guarded_label_values(label_list),
1490        }
1491    }
1492
1493    pub fn new_lookup_executor_metrics(
1494        &self,
1495        table_id: TableId,
1496        actor_id: ActorId,
1497        fragment_id: FragmentId,
1498    ) -> LookupExecutorMetrics {
1499        let label_list: &[&str; 3] = &[
1500            &table_id.to_string(),
1501            &actor_id.to_string(),
1502            &fragment_id.to_string(),
1503        ];
1504
1505        LookupExecutorMetrics {
1506            lookup_cache_miss_count: self
1507                .lookup_cache_miss_count
1508                .with_guarded_label_values(label_list),
1509            lookup_total_query_cache_count: self
1510                .lookup_total_query_cache_count
1511                .with_guarded_label_values(label_list),
1512            lookup_cached_entry_count: self
1513                .lookup_cached_entry_count
1514                .with_guarded_label_values(label_list),
1515        }
1516    }
1517
1518    pub fn new_hash_agg_metrics(
1519        &self,
1520        table_id: TableId,
1521        actor_id: ActorId,
1522        fragment_id: FragmentId,
1523    ) -> HashAggMetrics {
1524        let label_list: &[&str; 3] = &[
1525            &table_id.to_string(),
1526            &actor_id.to_string(),
1527            &fragment_id.to_string(),
1528        ];
1529        HashAggMetrics {
1530            agg_lookup_miss_count: self
1531                .agg_lookup_miss_count
1532                .with_guarded_label_values(label_list),
1533            agg_total_lookup_count: self
1534                .agg_total_lookup_count
1535                .with_guarded_label_values(label_list),
1536            agg_cached_entry_count: self
1537                .agg_cached_entry_count
1538                .with_guarded_label_values(label_list),
1539            agg_chunk_lookup_miss_count: self
1540                .agg_chunk_lookup_miss_count
1541                .with_guarded_label_values(label_list),
1542            agg_chunk_total_lookup_count: self
1543                .agg_chunk_total_lookup_count
1544                .with_guarded_label_values(label_list),
1545            agg_dirty_groups_count: self
1546                .agg_dirty_groups_count
1547                .with_guarded_label_values(label_list),
1548            agg_dirty_groups_heap_size: self
1549                .agg_dirty_groups_heap_size
1550                .with_guarded_label_values(label_list),
1551            agg_state_cache_lookup_count: self
1552                .agg_state_cache_lookup_count
1553                .with_guarded_label_values(label_list),
1554            agg_state_cache_miss_count: self
1555                .agg_state_cache_miss_count
1556                .with_guarded_label_values(label_list),
1557        }
1558    }
1559
1560    pub fn new_agg_distinct_dedup_metrics(
1561        &self,
1562        table_id: TableId,
1563        actor_id: ActorId,
1564        fragment_id: FragmentId,
1565    ) -> AggDistinctDedupMetrics {
1566        let label_list: &[&str; 3] = &[
1567            &table_id.to_string(),
1568            &actor_id.to_string(),
1569            &fragment_id.to_string(),
1570        ];
1571        AggDistinctDedupMetrics {
1572            agg_distinct_cache_miss_count: self
1573                .agg_distinct_cache_miss_count
1574                .with_guarded_label_values(label_list),
1575            agg_distinct_total_cache_count: self
1576                .agg_distinct_total_cache_count
1577                .with_guarded_label_values(label_list),
1578            agg_distinct_cached_entry_count: self
1579                .agg_distinct_cached_entry_count
1580                .with_guarded_label_values(label_list),
1581        }
1582    }
1583
1584    pub fn new_temporal_join_metrics(
1585        &self,
1586        table_id: TableId,
1587        actor_id: ActorId,
1588        fragment_id: FragmentId,
1589    ) -> TemporalJoinMetrics {
1590        let label_list: &[&str; 3] = &[
1591            &table_id.to_string(),
1592            &actor_id.to_string(),
1593            &fragment_id.to_string(),
1594        ];
1595        TemporalJoinMetrics {
1596            temporal_join_cache_miss_count: self
1597                .temporal_join_cache_miss_count
1598                .with_guarded_label_values(label_list),
1599            temporal_join_total_query_cache_count: self
1600                .temporal_join_total_query_cache_count
1601                .with_guarded_label_values(label_list),
1602            temporal_join_cached_entry_count: self
1603                .temporal_join_cached_entry_count
1604                .with_guarded_label_values(label_list),
1605        }
1606    }
1607
1608    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1609        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1610        BackfillMetrics {
1611            backfill_snapshot_read_row_count: self
1612                .backfill_snapshot_read_row_count
1613                .with_guarded_label_values(label_list),
1614            backfill_upstream_output_row_count: self
1615                .backfill_upstream_output_row_count
1616                .with_guarded_label_values(label_list),
1617        }
1618    }
1619
1620    pub fn new_cdc_backfill_metrics(
1621        &self,
1622        table_id: TableId,
1623        actor_id: ActorId,
1624    ) -> CdcBackfillMetrics {
1625        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1626        CdcBackfillMetrics {
1627            cdc_backfill_snapshot_read_row_count: self
1628                .cdc_backfill_snapshot_read_row_count
1629                .with_guarded_label_values(label_list),
1630            cdc_backfill_upstream_output_row_count: self
1631                .cdc_backfill_upstream_output_row_count
1632                .with_guarded_label_values(label_list),
1633        }
1634    }
1635
1636    pub fn new_over_window_metrics(
1637        &self,
1638        table_id: TableId,
1639        actor_id: ActorId,
1640        fragment_id: FragmentId,
1641    ) -> OverWindowMetrics {
1642        let label_list: &[&str; 3] = &[
1643            &table_id.to_string(),
1644            &actor_id.to_string(),
1645            &fragment_id.to_string(),
1646        ];
1647        OverWindowMetrics {
1648            over_window_cached_entry_count: self
1649                .over_window_cached_entry_count
1650                .with_guarded_label_values(label_list),
1651            over_window_cache_lookup_count: self
1652                .over_window_cache_lookup_count
1653                .with_guarded_label_values(label_list),
1654            over_window_cache_miss_count: self
1655                .over_window_cache_miss_count
1656                .with_guarded_label_values(label_list),
1657            over_window_range_cache_entry_count: self
1658                .over_window_range_cache_entry_count
1659                .with_guarded_label_values(label_list),
1660            over_window_range_cache_lookup_count: self
1661                .over_window_range_cache_lookup_count
1662                .with_guarded_label_values(label_list),
1663            over_window_range_cache_left_miss_count: self
1664                .over_window_range_cache_left_miss_count
1665                .with_guarded_label_values(label_list),
1666            over_window_range_cache_right_miss_count: self
1667                .over_window_range_cache_right_miss_count
1668                .with_guarded_label_values(label_list),
1669            over_window_accessed_entry_count: self
1670                .over_window_accessed_entry_count
1671                .with_guarded_label_values(label_list),
1672            over_window_compute_count: self
1673                .over_window_compute_count
1674                .with_guarded_label_values(label_list),
1675            over_window_same_output_count: self
1676                .over_window_same_output_count
1677                .with_guarded_label_values(label_list),
1678        }
1679    }
1680
1681    pub fn new_materialize_metrics(
1682        &self,
1683        table_id: TableId,
1684        actor_id: ActorId,
1685        fragment_id: FragmentId,
1686    ) -> MaterializeMetrics {
1687        let label_list: &[&str; 3] = &[
1688            &actor_id.to_string(),
1689            &table_id.to_string(),
1690            &fragment_id.to_string(),
1691        ];
1692        MaterializeMetrics {
1693            materialize_cache_hit_count: self
1694                .materialize_cache_hit_count
1695                .with_guarded_label_values(label_list),
1696            materialize_data_exist_count: self
1697                .materialize_data_exist_count
1698                .with_guarded_label_values(label_list),
1699            materialize_cache_total_count: self
1700                .materialize_cache_total_count
1701                .with_guarded_label_values(label_list),
1702            materialize_input_row_count: self
1703                .materialize_input_row_count
1704                .with_guarded_label_values(label_list),
1705            materialize_current_epoch: self
1706                .materialize_current_epoch
1707                .with_guarded_label_values(label_list),
1708        }
1709    }
1710}
1711
1712pub(crate) struct ActorInputMetrics {
1713    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1714    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1715}
1716
1717/// Tokio metrics for actors
1718pub struct ActorMetrics {
1719    pub actor_execution_time: LabelGuardedGauge,
1720    pub actor_scheduled_duration: LabelGuardedGauge,
1721    pub actor_scheduled_cnt: LabelGuardedIntGauge,
1722    pub actor_fast_poll_duration: LabelGuardedGauge,
1723    pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1724    pub actor_slow_poll_duration: LabelGuardedGauge,
1725    pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1726    pub actor_poll_duration: LabelGuardedGauge,
1727    pub actor_poll_cnt: LabelGuardedIntGauge,
1728    pub actor_idle_duration: LabelGuardedGauge,
1729    pub actor_idle_cnt: LabelGuardedIntGauge,
1730}
1731
1732pub struct SinkExecutorMetrics {
1733    pub sink_input_row_count: LabelGuardedIntCounter,
1734    pub sink_input_bytes: LabelGuardedIntCounter,
1735    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1736}
1737
1738pub struct MaterializeMetrics {
1739    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1740    pub materialize_data_exist_count: LabelGuardedIntCounter,
1741    pub materialize_cache_total_count: LabelGuardedIntCounter,
1742    pub materialize_input_row_count: LabelGuardedIntCounter,
1743    pub materialize_current_epoch: LabelGuardedIntGauge,
1744}
1745
1746pub struct GroupTopNMetrics {
1747    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1748    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1749    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1750}
1751
1752pub struct LookupExecutorMetrics {
1753    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1754    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1755    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1756}
1757
1758pub struct HashAggMetrics {
1759    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1760    pub agg_total_lookup_count: LabelGuardedIntCounter,
1761    pub agg_cached_entry_count: LabelGuardedIntGauge,
1762    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1763    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1764    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1765    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1766    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1767    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1768}
1769
1770pub struct AggDistinctDedupMetrics {
1771    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1772    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1773    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1774}
1775
1776pub struct TemporalJoinMetrics {
1777    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1778    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1779    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1780}
1781
1782pub struct BackfillMetrics {
1783    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1784    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1785}
1786
1787#[derive(Clone)]
1788pub struct CdcBackfillMetrics {
1789    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1790    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1791}
1792
1793pub struct OverWindowMetrics {
1794    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1795    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1796    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1797    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1798    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1799    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1800    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1801    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1802    pub over_window_compute_count: LabelGuardedIntCounter,
1803    pub over_window_same_output_count: LabelGuardedIntCounter,
1804}