risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec,
58    actor_scheduled_duration: LabelGuardedGaugeVec,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60    actor_fast_poll_duration: LabelGuardedGaugeVec,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62    actor_slow_poll_duration: LabelGuardedGaugeVec,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64    actor_poll_duration: LabelGuardedGaugeVec,
65    actor_poll_cnt: LabelGuardedIntGaugeVec,
66    actor_idle_duration: LabelGuardedGaugeVec,
67    actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec,
77    pub source_split_change_count: LabelGuardedIntCounterVec,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec,
82    sink_input_bytes: LabelGuardedIntCounterVec,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161    over_window_compute_count: LabelGuardedIntCounterVec,
162    over_window_same_output_count: LabelGuardedIntCounterVec,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217    // PostgreSQL CDC LSN monitoring
218    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221    // Gap Fill
222    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228    GLOBAL_STREAMING_METRICS
229        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230        .clone()
231}
232
233impl StreamingMetrics {
234    fn new(registry: &Registry, level: MetricLevel) -> Self {
235        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236            "stream_executor_row_count",
237            "Total number of rows that have been output from each executor",
238            &["actor_id", "fragment_id", "executor_identity"],
239            registry
240        )
241        .unwrap()
242        .relabel_debug_1(level);
243
244        let stream_node_output_row_count = CountMap::new();
245        let stream_node_output_blocking_duration_ns = CountMap::new();
246
247        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248            "stream_source_output_rows_counts",
249            "Total number of rows that have been output from source",
250            &["source_id", "source_name", "actor_id", "fragment_id"],
251            registry
252        )
253        .unwrap();
254
255        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256            "stream_source_split_change_event_count",
257            "Total number of split change events that have been operated by source",
258            &["source_id", "source_name", "actor_id", "fragment_id"],
259            registry
260        )
261        .unwrap();
262
263        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264            "stream_source_backfill_rows_counts",
265            "Total number of rows that have been backfilled for source",
266            &["source_id", "source_name", "actor_id", "fragment_id"],
267            registry
268        )
269        .unwrap();
270
271        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272            "stream_sink_input_row_count",
273            "Total number of rows streamed into sink executors",
274            &["sink_id", "actor_id", "fragment_id"],
275            registry
276        )
277        .unwrap();
278
279        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280            "stream_sink_input_bytes",
281            "Total size of chunks streamed into sink executors",
282            &["sink_id", "actor_id", "fragment_id"],
283            registry
284        )
285        .unwrap();
286
287        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288            "stream_mview_input_row_count",
289            "Total number of rows streamed into materialize executors",
290            &["actor_id", "table_id", "fragment_id"],
291            registry
292        )
293        .unwrap()
294        .relabel_debug_1(level);
295
296        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297            "stream_mview_current_epoch",
298            "The current epoch of the materialized executor",
299            &["actor_id", "table_id", "fragment_id"],
300            registry
301        )
302        .unwrap()
303        .relabel_debug_1(level);
304
305        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306            "stream_pg_cdc_state_table_lsn",
307            "Current LSN value stored in PostgreSQL CDC state table",
308            &["source_id"],
309            registry,
310        )
311        .unwrap();
312
313        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314            "stream_pg_cdc_jni_commit_offset_lsn",
315            "LSN value when JNI commit offset is called for PostgreSQL CDC",
316            &["source_id"],
317            registry,
318        )
319        .unwrap();
320
321        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
322            "stream_sink_chunk_buffer_size",
323            "Total size of chunks buffered in a barrier",
324            &["sink_id", "actor_id", "fragment_id"],
325            registry
326        )
327        .unwrap();
328
329        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
330            "stream_actor_actor_execution_time",
331            "Total execution time (s) of an actor",
332            &["actor_id"],
333            registry
334        )
335        .unwrap();
336
337        let actor_output_buffer_blocking_duration_ns =
338            register_guarded_int_counter_vec_with_registry!(
339                "stream_actor_output_buffer_blocking_duration_ns",
340                "Total blocking duration (ns) of output buffer",
341                &["actor_id", "fragment_id", "downstream_fragment_id"],
342                registry
343            )
344            .unwrap()
345            // mask the first label `actor_id` if the level is less verbose than `Debug`
346            .relabel_debug_1(level);
347
348        let actor_input_buffer_blocking_duration_ns =
349            register_guarded_int_counter_vec_with_registry!(
350                "stream_actor_input_buffer_blocking_duration_ns",
351                "Total blocking duration (ns) of input buffer",
352                &["actor_id", "fragment_id", "upstream_fragment_id"],
353                registry
354            )
355            .unwrap()
356            // mask the first label `actor_id` if the level is less verbose than `Debug`
357            .relabel_debug_1(level);
358
359        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
360            "stream_exchange_frag_recv_size",
361            "Total size of messages that have been received from upstream Fragment",
362            &["up_fragment_id", "down_fragment_id"],
363            registry
364        )
365        .unwrap();
366
367        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
368            "stream_actor_fast_poll_duration",
369            "tokio's metrics",
370            &["actor_id"],
371            registry
372        )
373        .unwrap();
374
375        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
376            "stream_actor_fast_poll_cnt",
377            "tokio's metrics",
378            &["actor_id"],
379            registry
380        )
381        .unwrap();
382
383        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
384            "stream_actor_slow_poll_duration",
385            "tokio's metrics",
386            &["actor_id"],
387            registry
388        )
389        .unwrap();
390
391        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
392            "stream_actor_slow_poll_cnt",
393            "tokio's metrics",
394            &["actor_id"],
395            registry
396        )
397        .unwrap();
398
399        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
400            "stream_actor_poll_duration",
401            "tokio's metrics",
402            &["actor_id"],
403            registry
404        )
405        .unwrap();
406
407        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
408            "stream_actor_poll_cnt",
409            "tokio's metrics",
410            &["actor_id"],
411            registry
412        )
413        .unwrap();
414
415        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
416            "stream_actor_scheduled_duration",
417            "tokio's metrics",
418            &["actor_id"],
419            registry
420        )
421        .unwrap();
422
423        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
424            "stream_actor_scheduled_cnt",
425            "tokio's metrics",
426            &["actor_id"],
427            registry
428        )
429        .unwrap();
430
431        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
432            "stream_actor_idle_duration",
433            "tokio's metrics",
434            &["actor_id"],
435            registry
436        )
437        .unwrap();
438
439        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
440            "stream_actor_idle_cnt",
441            "tokio's metrics",
442            &["actor_id"],
443            registry
444        )
445        .unwrap();
446
447        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
448            "stream_actor_in_record_cnt",
449            "Total number of rows actor received",
450            &["actor_id", "fragment_id", "upstream_fragment_id"],
451            registry
452        )
453        .unwrap()
454        .relabel_debug_1(level);
455
456        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
457            "stream_actor_out_record_cnt",
458            "Total number of rows actor sent",
459            &["actor_id", "fragment_id"],
460            registry
461        )
462        .unwrap()
463        .relabel_debug_1(level);
464
465        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
466            "stream_actor_current_epoch",
467            "Current epoch of actor",
468            &["actor_id", "fragment_id"],
469            registry
470        )
471        .unwrap()
472        .relabel_debug_1(level);
473
474        let actor_count = register_guarded_int_gauge_vec_with_registry!(
475            "stream_actor_count",
476            "Total number of actors (parallelism)",
477            &["fragment_id"],
478            registry
479        )
480        .unwrap();
481
482        let opts = histogram_opts!(
483            "stream_merge_barrier_align_duration",
484            "Duration of merge align barrier",
485            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
486        );
487        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
488            opts,
489            &["actor_id", "fragment_id"],
490            registry
491        )
492        .unwrap()
493        .relabel_debug_1(level);
494
495        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
496            "stream_join_lookup_miss_count",
497            "Join executor lookup miss duration",
498            &["side", "join_table_id", "actor_id", "fragment_id"],
499            registry
500        )
501        .unwrap();
502
503        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
504            "stream_join_lookup_total_count",
505            "Join executor lookup total operation",
506            &["side", "join_table_id", "actor_id", "fragment_id"],
507            registry
508        )
509        .unwrap();
510
511        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
512            "stream_join_insert_cache_miss_count",
513            "Join executor cache miss when insert operation",
514            &["side", "join_table_id", "actor_id", "fragment_id"],
515            registry
516        )
517        .unwrap();
518
519        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
520            "stream_join_actor_input_waiting_duration_ns",
521            "Total waiting duration (ns) of input buffer of join actor",
522            &["actor_id", "fragment_id"],
523            registry
524        )
525        .unwrap();
526
527        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
528            "stream_join_match_duration_ns",
529            "Matching duration for each side",
530            &["actor_id", "fragment_id", "side"],
531            registry
532        )
533        .unwrap();
534
535        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
536            "stream_barrier_align_duration_ns",
537            "Duration of join align barrier",
538            &["actor_id", "fragment_id", "wait_side", "executor"],
539            registry
540        )
541        .unwrap()
542        .relabel_debug_1(level);
543
544        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
545            "stream_join_cached_entry_count",
546            "Number of cached entries in streaming join operators",
547            &["actor_id", "fragment_id", "side"],
548            registry
549        )
550        .unwrap();
551
552        let join_matched_join_keys_opts = histogram_opts!(
553            "stream_join_matched_join_keys",
554            "The number of keys matched in the opposite side",
555            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
556        );
557
558        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
559            join_matched_join_keys_opts,
560            &["actor_id", "fragment_id", "table_id"],
561            registry
562        )
563        .unwrap()
564        .relabel_debug_1(level);
565
566        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
567            "stream_agg_lookup_miss_count",
568            "Aggregation executor lookup miss duration",
569            &["table_id", "actor_id", "fragment_id"],
570            registry
571        )
572        .unwrap();
573
574        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
575            "stream_agg_lookup_total_count",
576            "Aggregation executor lookup total operation",
577            &["table_id", "actor_id", "fragment_id"],
578            registry
579        )
580        .unwrap();
581
582        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
583            "stream_agg_distinct_cache_miss_count",
584            "Aggregation executor dinsinct miss duration",
585            &["table_id", "actor_id", "fragment_id"],
586            registry
587        )
588        .unwrap();
589
590        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
591            "stream_agg_distinct_total_cache_count",
592            "Aggregation executor distinct total operation",
593            &["table_id", "actor_id", "fragment_id"],
594            registry
595        )
596        .unwrap();
597
598        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
599            "stream_agg_distinct_cached_entry_count",
600            "Total entry counts in distinct aggregation executor cache",
601            &["table_id", "actor_id", "fragment_id"],
602            registry
603        )
604        .unwrap();
605
606        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
607            "stream_agg_dirty_groups_count",
608            "Total dirty group counts in aggregation executor",
609            &["table_id", "actor_id", "fragment_id"],
610            registry
611        )
612        .unwrap();
613
614        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
615            "stream_agg_dirty_groups_heap_size",
616            "Total dirty group heap size in aggregation executor",
617            &["table_id", "actor_id", "fragment_id"],
618            registry
619        )
620        .unwrap();
621
622        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
623            "stream_agg_state_cache_lookup_count",
624            "Aggregation executor state cache lookup count",
625            &["table_id", "actor_id", "fragment_id"],
626            registry
627        )
628        .unwrap();
629
630        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
631            "stream_agg_state_cache_miss_count",
632            "Aggregation executor state cache miss count",
633            &["table_id", "actor_id", "fragment_id"],
634            registry
635        )
636        .unwrap();
637
638        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
639            "stream_group_top_n_cache_miss_count",
640            "Group top n executor cache miss count",
641            &["table_id", "actor_id", "fragment_id"],
642            registry
643        )
644        .unwrap();
645
646        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
647            "stream_group_top_n_total_query_cache_count",
648            "Group top n executor query cache total count",
649            &["table_id", "actor_id", "fragment_id"],
650            registry
651        )
652        .unwrap();
653
654        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
655            "stream_group_top_n_cached_entry_count",
656            "Total entry counts in group top n executor cache",
657            &["table_id", "actor_id", "fragment_id"],
658            registry
659        )
660        .unwrap();
661
662        let group_top_n_appendonly_cache_miss_count =
663            register_guarded_int_counter_vec_with_registry!(
664                "stream_group_top_n_appendonly_cache_miss_count",
665                "Group top n appendonly executor cache miss count",
666                &["table_id", "actor_id", "fragment_id"],
667                registry
668            )
669            .unwrap();
670
671        let group_top_n_appendonly_total_query_cache_count =
672            register_guarded_int_counter_vec_with_registry!(
673                "stream_group_top_n_appendonly_total_query_cache_count",
674                "Group top n appendonly executor total cache count",
675                &["table_id", "actor_id", "fragment_id"],
676                registry
677            )
678            .unwrap();
679
680        let group_top_n_appendonly_cached_entry_count =
681            register_guarded_int_gauge_vec_with_registry!(
682                "stream_group_top_n_appendonly_cached_entry_count",
683                "Total entry counts in group top n appendonly executor cache",
684                &["table_id", "actor_id", "fragment_id"],
685                registry
686            )
687            .unwrap();
688
689        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
690            "stream_lookup_cache_miss_count",
691            "Lookup executor cache miss count",
692            &["table_id", "actor_id", "fragment_id"],
693            registry
694        )
695        .unwrap();
696
697        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
698            "stream_lookup_total_query_cache_count",
699            "Lookup executor query cache total count",
700            &["table_id", "actor_id", "fragment_id"],
701            registry
702        )
703        .unwrap();
704
705        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
706            "stream_lookup_cached_entry_count",
707            "Total entry counts in lookup executor cache",
708            &["table_id", "actor_id", "fragment_id"],
709            registry
710        )
711        .unwrap();
712
713        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
714            "stream_temporal_join_cache_miss_count",
715            "Temporal join executor cache miss count",
716            &["table_id", "actor_id", "fragment_id"],
717            registry
718        )
719        .unwrap();
720
721        let temporal_join_total_query_cache_count =
722            register_guarded_int_counter_vec_with_registry!(
723                "stream_temporal_join_total_query_cache_count",
724                "Temporal join executor query cache total count",
725                &["table_id", "actor_id", "fragment_id"],
726                registry
727            )
728            .unwrap();
729
730        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
731            "stream_temporal_join_cached_entry_count",
732            "Total entry count in temporal join executor cache",
733            &["table_id", "actor_id", "fragment_id"],
734            registry
735        )
736        .unwrap();
737
738        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
739            "stream_agg_cached_entry_count",
740            "Number of cached keys in streaming aggregation operators",
741            &["table_id", "actor_id", "fragment_id"],
742            registry
743        )
744        .unwrap();
745
746        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
747            "stream_agg_chunk_lookup_miss_count",
748            "Aggregation executor chunk-level lookup miss duration",
749            &["table_id", "actor_id", "fragment_id"],
750            registry
751        )
752        .unwrap();
753
754        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
755            "stream_agg_chunk_lookup_total_count",
756            "Aggregation executor chunk-level lookup total operation",
757            &["table_id", "actor_id", "fragment_id"],
758            registry
759        )
760        .unwrap();
761
762        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
763            "stream_backfill_snapshot_read_row_count",
764            "Total number of rows that have been read from the backfill snapshot",
765            &["table_id", "actor_id"],
766            registry
767        )
768        .unwrap();
769
770        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
771            "stream_backfill_upstream_output_row_count",
772            "Total number of rows that have been output from the backfill upstream",
773            &["table_id", "actor_id"],
774            registry
775        )
776        .unwrap();
777
778        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
779            "stream_cdc_backfill_snapshot_read_row_count",
780            "Total number of rows that have been read from the cdc_backfill snapshot",
781            &["table_id", "actor_id"],
782            registry
783        )
784        .unwrap();
785
786        let cdc_backfill_upstream_output_row_count =
787            register_guarded_int_counter_vec_with_registry!(
788                "stream_cdc_backfill_upstream_output_row_count",
789                "Total number of rows that have been output from the cdc_backfill upstream",
790                &["table_id", "actor_id"],
791                registry
792            )
793            .unwrap();
794
795        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
796            "stream_snapshot_backfill_consume_snapshot_row_count",
797            "Total number of rows that have been output from snapshot backfill",
798            &["table_id", "actor_id", "stage"],
799            registry
800        )
801        .unwrap();
802
803        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
804            "stream_over_window_cached_entry_count",
805            "Total entry (partition) count in over window executor cache",
806            &["table_id", "actor_id", "fragment_id"],
807            registry
808        )
809        .unwrap();
810
811        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
812            "stream_over_window_cache_lookup_count",
813            "Over window executor cache lookup count",
814            &["table_id", "actor_id", "fragment_id"],
815            registry
816        )
817        .unwrap();
818
819        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
820            "stream_over_window_cache_miss_count",
821            "Over window executor cache miss count",
822            &["table_id", "actor_id", "fragment_id"],
823            registry
824        )
825        .unwrap();
826
827        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
828            "stream_over_window_range_cache_entry_count",
829            "Over window partition range cache entry count",
830            &["table_id", "actor_id", "fragment_id"],
831            registry,
832        )
833        .unwrap();
834
835        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
836            "stream_over_window_range_cache_lookup_count",
837            "Over window partition range cache lookup count",
838            &["table_id", "actor_id", "fragment_id"],
839            registry
840        )
841        .unwrap();
842
843        let over_window_range_cache_left_miss_count =
844            register_guarded_int_counter_vec_with_registry!(
845                "stream_over_window_range_cache_left_miss_count",
846                "Over window partition range cache left miss count",
847                &["table_id", "actor_id", "fragment_id"],
848                registry
849            )
850            .unwrap();
851
852        let over_window_range_cache_right_miss_count =
853            register_guarded_int_counter_vec_with_registry!(
854                "stream_over_window_range_cache_right_miss_count",
855                "Over window partition range cache right miss count",
856                &["table_id", "actor_id", "fragment_id"],
857                registry
858            )
859            .unwrap();
860
861        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
862            "stream_over_window_accessed_entry_count",
863            "Over window accessed entry count",
864            &["table_id", "actor_id", "fragment_id"],
865            registry
866        )
867        .unwrap();
868
869        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
870            "stream_over_window_compute_count",
871            "Over window compute count",
872            &["table_id", "actor_id", "fragment_id"],
873            registry
874        )
875        .unwrap();
876
877        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
878            "stream_over_window_same_output_count",
879            "Over window same output count",
880            &["table_id", "actor_id", "fragment_id"],
881            registry
882        )
883        .unwrap();
884
885        let opts = histogram_opts!(
886            "stream_barrier_inflight_duration_seconds",
887            "barrier_inflight_latency",
888            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
889        );
890        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
891
892        let opts = histogram_opts!(
893            "stream_barrier_sync_storage_duration_seconds",
894            "barrier_sync_latency",
895            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
896        );
897        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
898
899        let opts = histogram_opts!(
900            "stream_barrier_batch_size",
901            "barrier_batch_size",
902            exponential_buckets(1.0, 2.0, 8).unwrap()
903        );
904        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
905
906        let barrier_manager_progress = register_int_counter_with_registry!(
907            "stream_barrier_manager_progress",
908            "The number of actors that have processed the earliest in-flight barriers",
909            registry
910        )
911        .unwrap();
912
913        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
914            "sync_kv_log_store_wait_next_poll_ns",
915            "Total duration (ns) of waiting for next poll",
916            &["actor_id", "target", "fragment_id", "relation"],
917            registry
918        )
919        .unwrap();
920
921        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
922            "sync_kv_log_store_read_count",
923            "read row count throughput of sync_kv log store",
924            &["type", "actor_id", "target", "fragment_id", "relation"],
925            registry
926        )
927        .unwrap();
928
929        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
930            "sync_kv_log_store_read_size",
931            "read size throughput of sync_kv log store",
932            &["type", "actor_id", "target", "fragment_id", "relation"],
933            registry
934        )
935        .unwrap();
936
937        let sync_kv_log_store_write_pause_duration_ns =
938            register_guarded_int_counter_vec_with_registry!(
939                "sync_kv_log_store_write_pause_duration_ns",
940                "Duration (ns) of sync_kv log store write pause",
941                &["actor_id", "target", "fragment_id", "relation"],
942                registry
943            )
944            .unwrap();
945
946        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
947            "sync_kv_log_store_state",
948            "clean/unclean state transition for sync_kv log store",
949            &["state", "actor_id", "target", "fragment_id", "relation"],
950            registry
951        )
952        .unwrap();
953
954        let sync_kv_log_store_storage_write_count =
955            register_guarded_int_counter_vec_with_registry!(
956                "sync_kv_log_store_storage_write_count",
957                "Write row count throughput of sync_kv log store",
958                &["actor_id", "target", "fragment_id", "relation"],
959                registry
960            )
961            .unwrap();
962
963        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
964            "sync_kv_log_store_storage_write_size",
965            "Write size throughput of sync_kv log store",
966            &["actor_id", "target", "fragment_id", "relation"],
967            registry
968        )
969        .unwrap();
970
971        let sync_kv_log_store_buffer_unconsumed_item_count =
972            register_guarded_int_gauge_vec_with_registry!(
973                "sync_kv_log_store_buffer_unconsumed_item_count",
974                "Number of Unconsumed Item in buffer",
975                &["actor_id", "target", "fragment_id", "relation"],
976                registry
977            )
978            .unwrap();
979
980        let sync_kv_log_store_buffer_unconsumed_row_count =
981            register_guarded_int_gauge_vec_with_registry!(
982                "sync_kv_log_store_buffer_unconsumed_row_count",
983                "Number of Unconsumed Row in buffer",
984                &["actor_id", "target", "fragment_id", "relation"],
985                registry
986            )
987            .unwrap();
988
989        let sync_kv_log_store_buffer_unconsumed_epoch_count =
990            register_guarded_int_gauge_vec_with_registry!(
991                "sync_kv_log_store_buffer_unconsumed_epoch_count",
992                "Number of Unconsumed Epoch in buffer",
993                &["actor_id", "target", "fragment_id", "relation"],
994                registry
995            )
996            .unwrap();
997
998        let sync_kv_log_store_buffer_unconsumed_min_epoch =
999            register_guarded_int_gauge_vec_with_registry!(
1000                "sync_kv_log_store_buffer_unconsumed_min_epoch",
1001                "Number of Unconsumed Epoch in buffer",
1002                &["actor_id", "target", "fragment_id", "relation"],
1003                registry
1004            )
1005            .unwrap();
1006
1007        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1008            "kv_log_store_storage_write_count",
1009            "Write row count throughput of kv log store",
1010            &["actor_id", "connector", "sink_id", "sink_name"],
1011            registry
1012        )
1013        .unwrap();
1014
1015        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1016            "kv_log_store_storage_write_size",
1017            "Write size throughput of kv log store",
1018            &["actor_id", "connector", "sink_id", "sink_name"],
1019            registry
1020        )
1021        .unwrap();
1022
1023        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1024            "kv_log_store_storage_read_count",
1025            "Write row count throughput of kv log store",
1026            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1027            registry
1028        )
1029        .unwrap();
1030
1031        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1032            "kv_log_store_storage_read_size",
1033            "Write size throughput of kv log store",
1034            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1035            registry
1036        )
1037        .unwrap();
1038
1039        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1040            "kv_log_store_rewind_count",
1041            "Kv log store rewind rate",
1042            &["actor_id", "connector", "sink_id", "sink_name"],
1043            registry
1044        )
1045        .unwrap();
1046
1047        let kv_log_store_rewind_delay_opts = {
1048            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1049            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1050                - REWIND_BASE_DELAY.as_secs_f64().log2())
1051            .ceil() as usize;
1052            let buckets = exponential_buckets(
1053                REWIND_BASE_DELAY.as_secs_f64(),
1054                REWIND_BACKOFF_FACTOR as _,
1055                bucket_count,
1056            )
1057            .unwrap();
1058            histogram_opts!(
1059                "kv_log_store_rewind_delay",
1060                "Kv log store rewind delay",
1061                buckets,
1062            )
1063        };
1064
1065        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1066            kv_log_store_rewind_delay_opts,
1067            &["actor_id", "connector", "sink_id", "sink_name"],
1068            registry
1069        )
1070        .unwrap();
1071
1072        let kv_log_store_buffer_unconsumed_item_count =
1073            register_guarded_int_gauge_vec_with_registry!(
1074                "kv_log_store_buffer_unconsumed_item_count",
1075                "Number of Unconsumed Item in buffer",
1076                &["actor_id", "connector", "sink_id", "sink_name"],
1077                registry
1078            )
1079            .unwrap();
1080
1081        let kv_log_store_buffer_unconsumed_row_count =
1082            register_guarded_int_gauge_vec_with_registry!(
1083                "kv_log_store_buffer_unconsumed_row_count",
1084                "Number of Unconsumed Row in buffer",
1085                &["actor_id", "connector", "sink_id", "sink_name"],
1086                registry
1087            )
1088            .unwrap();
1089
1090        let kv_log_store_buffer_unconsumed_epoch_count =
1091            register_guarded_int_gauge_vec_with_registry!(
1092                "kv_log_store_buffer_unconsumed_epoch_count",
1093                "Number of Unconsumed Epoch in buffer",
1094                &["actor_id", "connector", "sink_id", "sink_name"],
1095                registry
1096            )
1097            .unwrap();
1098
1099        let kv_log_store_buffer_unconsumed_min_epoch =
1100            register_guarded_int_gauge_vec_with_registry!(
1101                "kv_log_store_buffer_unconsumed_min_epoch",
1102                "Number of Unconsumed Epoch in buffer",
1103                &["actor_id", "connector", "sink_id", "sink_name"],
1104                registry
1105            )
1106            .unwrap();
1107
1108        let lru_runtime_loop_count = register_int_counter_with_registry!(
1109            "lru_runtime_loop_count",
1110            "The counts of the eviction loop in LRU manager per second",
1111            registry
1112        )
1113        .unwrap();
1114
1115        let lru_latest_sequence = register_int_gauge_with_registry!(
1116            "lru_latest_sequence",
1117            "Current LRU global sequence",
1118            registry,
1119        )
1120        .unwrap();
1121
1122        let lru_watermark_sequence = register_int_gauge_with_registry!(
1123            "lru_watermark_sequence",
1124            "Current LRU watermark sequence",
1125            registry,
1126        )
1127        .unwrap();
1128
1129        let lru_eviction_policy = register_int_gauge_with_registry!(
1130            "lru_eviction_policy",
1131            "Current LRU eviction policy",
1132            registry,
1133        )
1134        .unwrap();
1135
1136        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1137            "jemalloc_allocated_bytes",
1138            "The allocated memory jemalloc, got from jemalloc_ctl",
1139            registry
1140        )
1141        .unwrap();
1142
1143        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1144            "jemalloc_active_bytes",
1145            "The active memory jemalloc, got from jemalloc_ctl",
1146            registry
1147        )
1148        .unwrap();
1149
1150        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1151            "jemalloc_resident_bytes",
1152            "The active memory jemalloc, got from jemalloc_ctl",
1153            registry
1154        )
1155        .unwrap();
1156
1157        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1158            "jemalloc_metadata_bytes",
1159            "The active memory jemalloc, got from jemalloc_ctl",
1160            registry
1161        )
1162        .unwrap();
1163
1164        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1165            "jvm_allocated_bytes",
1166            "The allocated jvm memory",
1167            registry
1168        )
1169        .unwrap();
1170
1171        let jvm_active_bytes = register_int_gauge_with_registry!(
1172            "jvm_active_bytes",
1173            "The active jvm memory",
1174            registry
1175        )
1176        .unwrap();
1177
1178        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1179            "stream_materialize_cache_hit_count",
1180            "Materialize executor cache hit count",
1181            &["actor_id", "table_id", "fragment_id"],
1182            registry
1183        )
1184        .unwrap()
1185        .relabel_debug_1(level);
1186
1187        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1188            "stream_materialize_data_exist_count",
1189            "Materialize executor data exist count",
1190            &["actor_id", "table_id", "fragment_id"],
1191            registry
1192        )
1193        .unwrap()
1194        .relabel_debug_1(level);
1195
1196        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1197            "stream_materialize_cache_total_count",
1198            "Materialize executor cache total operation",
1199            &["actor_id", "table_id", "fragment_id"],
1200            registry
1201        )
1202        .unwrap()
1203        .relabel_debug_1(level);
1204
1205        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1206            "stream_memory_usage",
1207            "Memory usage for stream executors",
1208            &["actor_id", "table_id", "desc"],
1209            registry
1210        )
1211        .unwrap()
1212        .relabel_debug_1(level);
1213
1214        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1215            "gap_fill_generated_rows_count",
1216            "Total number of rows generated by gap fill executor",
1217            &["actor_id", "fragment_id"],
1218            registry
1219        )
1220        .unwrap()
1221        .relabel_debug_1(level);
1222
1223        Self {
1224            level,
1225            executor_row_count,
1226            mem_stream_node_output_row_count: stream_node_output_row_count,
1227            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1228            actor_execution_time,
1229            actor_scheduled_duration,
1230            actor_scheduled_cnt,
1231            actor_fast_poll_duration,
1232            actor_fast_poll_cnt,
1233            actor_slow_poll_duration,
1234            actor_slow_poll_cnt,
1235            actor_poll_duration,
1236            actor_poll_cnt,
1237            actor_idle_duration,
1238            actor_idle_cnt,
1239            actor_count,
1240            actor_in_record_cnt,
1241            actor_out_record_cnt,
1242            actor_current_epoch,
1243            source_output_row_count,
1244            source_split_change_count,
1245            source_backfill_row_count,
1246            sink_input_row_count,
1247            sink_input_bytes,
1248            sink_chunk_buffer_size,
1249            exchange_frag_recv_size,
1250            merge_barrier_align_duration,
1251            actor_output_buffer_blocking_duration_ns,
1252            actor_input_buffer_blocking_duration_ns,
1253            join_lookup_miss_count,
1254            join_lookup_total_count,
1255            join_insert_cache_miss_count,
1256            join_actor_input_waiting_duration_ns,
1257            join_match_duration_ns,
1258            join_cached_entry_count,
1259            join_matched_join_keys,
1260            barrier_align_duration,
1261            agg_lookup_miss_count,
1262            agg_total_lookup_count,
1263            agg_cached_entry_count,
1264            agg_chunk_lookup_miss_count,
1265            agg_chunk_total_lookup_count,
1266            agg_dirty_groups_count,
1267            agg_dirty_groups_heap_size,
1268            agg_distinct_cache_miss_count,
1269            agg_distinct_total_cache_count,
1270            agg_distinct_cached_entry_count,
1271            agg_state_cache_lookup_count,
1272            agg_state_cache_miss_count,
1273            group_top_n_cache_miss_count,
1274            group_top_n_total_query_cache_count,
1275            group_top_n_cached_entry_count,
1276            group_top_n_appendonly_cache_miss_count,
1277            group_top_n_appendonly_total_query_cache_count,
1278            group_top_n_appendonly_cached_entry_count,
1279            lookup_cache_miss_count,
1280            lookup_total_query_cache_count,
1281            lookup_cached_entry_count,
1282            temporal_join_cache_miss_count,
1283            temporal_join_total_query_cache_count,
1284            temporal_join_cached_entry_count,
1285            backfill_snapshot_read_row_count,
1286            backfill_upstream_output_row_count,
1287            cdc_backfill_snapshot_read_row_count,
1288            cdc_backfill_upstream_output_row_count,
1289            snapshot_backfill_consume_row_count,
1290            over_window_cached_entry_count,
1291            over_window_cache_lookup_count,
1292            over_window_cache_miss_count,
1293            over_window_range_cache_entry_count,
1294            over_window_range_cache_lookup_count,
1295            over_window_range_cache_left_miss_count,
1296            over_window_range_cache_right_miss_count,
1297            over_window_accessed_entry_count,
1298            over_window_compute_count,
1299            over_window_same_output_count,
1300            barrier_inflight_latency,
1301            barrier_sync_latency,
1302            barrier_batch_size,
1303            barrier_manager_progress,
1304            kv_log_store_storage_write_count,
1305            kv_log_store_storage_write_size,
1306            kv_log_store_rewind_count,
1307            kv_log_store_rewind_delay,
1308            kv_log_store_storage_read_count,
1309            kv_log_store_storage_read_size,
1310            kv_log_store_buffer_unconsumed_item_count,
1311            kv_log_store_buffer_unconsumed_row_count,
1312            kv_log_store_buffer_unconsumed_epoch_count,
1313            kv_log_store_buffer_unconsumed_min_epoch,
1314            sync_kv_log_store_read_count,
1315            sync_kv_log_store_read_size,
1316            sync_kv_log_store_write_pause_duration_ns,
1317            sync_kv_log_store_state,
1318            sync_kv_log_store_wait_next_poll_ns,
1319            sync_kv_log_store_storage_write_count,
1320            sync_kv_log_store_storage_write_size,
1321            sync_kv_log_store_buffer_unconsumed_item_count,
1322            sync_kv_log_store_buffer_unconsumed_row_count,
1323            sync_kv_log_store_buffer_unconsumed_epoch_count,
1324            sync_kv_log_store_buffer_unconsumed_min_epoch,
1325            lru_runtime_loop_count,
1326            lru_latest_sequence,
1327            lru_watermark_sequence,
1328            lru_eviction_policy,
1329            jemalloc_allocated_bytes,
1330            jemalloc_active_bytes,
1331            jemalloc_resident_bytes,
1332            jemalloc_metadata_bytes,
1333            jvm_allocated_bytes,
1334            jvm_active_bytes,
1335            stream_memory_usage,
1336            materialize_cache_hit_count,
1337            materialize_data_exist_count,
1338            materialize_cache_total_count,
1339            materialize_input_row_count,
1340            materialize_current_epoch,
1341            pg_cdc_state_table_lsn,
1342            pg_cdc_jni_commit_offset_lsn,
1343            gap_fill_generated_rows_count,
1344        }
1345    }
1346
1347    /// Create a new `StreamingMetrics` instance used in tests or other places.
1348    pub fn unused() -> Self {
1349        global_streaming_metrics(MetricLevel::Disabled)
1350    }
1351
1352    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1353        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1354        let actor_execution_time = self
1355            .actor_execution_time
1356            .with_guarded_label_values(label_list);
1357        let actor_scheduled_duration = self
1358            .actor_scheduled_duration
1359            .with_guarded_label_values(label_list);
1360        let actor_scheduled_cnt = self
1361            .actor_scheduled_cnt
1362            .with_guarded_label_values(label_list);
1363        let actor_fast_poll_duration = self
1364            .actor_fast_poll_duration
1365            .with_guarded_label_values(label_list);
1366        let actor_fast_poll_cnt = self
1367            .actor_fast_poll_cnt
1368            .with_guarded_label_values(label_list);
1369        let actor_slow_poll_duration = self
1370            .actor_slow_poll_duration
1371            .with_guarded_label_values(label_list);
1372        let actor_slow_poll_cnt = self
1373            .actor_slow_poll_cnt
1374            .with_guarded_label_values(label_list);
1375        let actor_poll_duration = self
1376            .actor_poll_duration
1377            .with_guarded_label_values(label_list);
1378        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1379        let actor_idle_duration = self
1380            .actor_idle_duration
1381            .with_guarded_label_values(label_list);
1382        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1383        ActorMetrics {
1384            actor_execution_time,
1385            actor_scheduled_duration,
1386            actor_scheduled_cnt,
1387            actor_fast_poll_duration,
1388            actor_fast_poll_cnt,
1389            actor_slow_poll_duration,
1390            actor_slow_poll_cnt,
1391            actor_poll_duration,
1392            actor_poll_cnt,
1393            actor_idle_duration,
1394            actor_idle_cnt,
1395        }
1396    }
1397
1398    pub(crate) fn new_actor_input_metrics(
1399        &self,
1400        actor_id: ActorId,
1401        fragment_id: FragmentId,
1402        upstream_fragment_id: FragmentId,
1403    ) -> ActorInputMetrics {
1404        let actor_id_str = actor_id.to_string();
1405        let fragment_id_str = fragment_id.to_string();
1406        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1407        ActorInputMetrics {
1408            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1409                &actor_id_str,
1410                &fragment_id_str,
1411                &upstream_fragment_id_str,
1412            ]),
1413            actor_input_buffer_blocking_duration_ns: self
1414                .actor_input_buffer_blocking_duration_ns
1415                .with_guarded_label_values(&[
1416                    &actor_id_str,
1417                    &fragment_id_str,
1418                    &upstream_fragment_id_str,
1419                ]),
1420        }
1421    }
1422
1423    pub fn new_sink_exec_metrics(
1424        &self,
1425        id: SinkId,
1426        actor_id: ActorId,
1427        fragment_id: FragmentId,
1428    ) -> SinkExecutorMetrics {
1429        let label_list: &[&str; 3] = &[
1430            &id.to_string(),
1431            &actor_id.to_string(),
1432            &fragment_id.to_string(),
1433        ];
1434        SinkExecutorMetrics {
1435            sink_input_row_count: self
1436                .sink_input_row_count
1437                .with_guarded_label_values(label_list),
1438            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1439            sink_chunk_buffer_size: self
1440                .sink_chunk_buffer_size
1441                .with_guarded_label_values(label_list),
1442        }
1443    }
1444
1445    pub fn new_group_top_n_metrics(
1446        &self,
1447        table_id: TableId,
1448        actor_id: ActorId,
1449        fragment_id: FragmentId,
1450    ) -> GroupTopNMetrics {
1451        let label_list: &[&str; 3] = &[
1452            &table_id.to_string(),
1453            &actor_id.to_string(),
1454            &fragment_id.to_string(),
1455        ];
1456
1457        GroupTopNMetrics {
1458            group_top_n_cache_miss_count: self
1459                .group_top_n_cache_miss_count
1460                .with_guarded_label_values(label_list),
1461            group_top_n_total_query_cache_count: self
1462                .group_top_n_total_query_cache_count
1463                .with_guarded_label_values(label_list),
1464            group_top_n_cached_entry_count: self
1465                .group_top_n_cached_entry_count
1466                .with_guarded_label_values(label_list),
1467        }
1468    }
1469
1470    pub fn new_append_only_group_top_n_metrics(
1471        &self,
1472        table_id: TableId,
1473        actor_id: ActorId,
1474        fragment_id: FragmentId,
1475    ) -> GroupTopNMetrics {
1476        let label_list: &[&str; 3] = &[
1477            &table_id.to_string(),
1478            &actor_id.to_string(),
1479            &fragment_id.to_string(),
1480        ];
1481
1482        GroupTopNMetrics {
1483            group_top_n_cache_miss_count: self
1484                .group_top_n_appendonly_cache_miss_count
1485                .with_guarded_label_values(label_list),
1486            group_top_n_total_query_cache_count: self
1487                .group_top_n_appendonly_total_query_cache_count
1488                .with_guarded_label_values(label_list),
1489            group_top_n_cached_entry_count: self
1490                .group_top_n_appendonly_cached_entry_count
1491                .with_guarded_label_values(label_list),
1492        }
1493    }
1494
1495    pub fn new_lookup_executor_metrics(
1496        &self,
1497        table_id: TableId,
1498        actor_id: ActorId,
1499        fragment_id: FragmentId,
1500    ) -> LookupExecutorMetrics {
1501        let label_list: &[&str; 3] = &[
1502            &table_id.to_string(),
1503            &actor_id.to_string(),
1504            &fragment_id.to_string(),
1505        ];
1506
1507        LookupExecutorMetrics {
1508            lookup_cache_miss_count: self
1509                .lookup_cache_miss_count
1510                .with_guarded_label_values(label_list),
1511            lookup_total_query_cache_count: self
1512                .lookup_total_query_cache_count
1513                .with_guarded_label_values(label_list),
1514            lookup_cached_entry_count: self
1515                .lookup_cached_entry_count
1516                .with_guarded_label_values(label_list),
1517        }
1518    }
1519
1520    pub fn new_hash_agg_metrics(
1521        &self,
1522        table_id: TableId,
1523        actor_id: ActorId,
1524        fragment_id: FragmentId,
1525    ) -> HashAggMetrics {
1526        let label_list: &[&str; 3] = &[
1527            &table_id.to_string(),
1528            &actor_id.to_string(),
1529            &fragment_id.to_string(),
1530        ];
1531        HashAggMetrics {
1532            agg_lookup_miss_count: self
1533                .agg_lookup_miss_count
1534                .with_guarded_label_values(label_list),
1535            agg_total_lookup_count: self
1536                .agg_total_lookup_count
1537                .with_guarded_label_values(label_list),
1538            agg_cached_entry_count: self
1539                .agg_cached_entry_count
1540                .with_guarded_label_values(label_list),
1541            agg_chunk_lookup_miss_count: self
1542                .agg_chunk_lookup_miss_count
1543                .with_guarded_label_values(label_list),
1544            agg_chunk_total_lookup_count: self
1545                .agg_chunk_total_lookup_count
1546                .with_guarded_label_values(label_list),
1547            agg_dirty_groups_count: self
1548                .agg_dirty_groups_count
1549                .with_guarded_label_values(label_list),
1550            agg_dirty_groups_heap_size: self
1551                .agg_dirty_groups_heap_size
1552                .with_guarded_label_values(label_list),
1553            agg_state_cache_lookup_count: self
1554                .agg_state_cache_lookup_count
1555                .with_guarded_label_values(label_list),
1556            agg_state_cache_miss_count: self
1557                .agg_state_cache_miss_count
1558                .with_guarded_label_values(label_list),
1559        }
1560    }
1561
1562    pub fn new_agg_distinct_dedup_metrics(
1563        &self,
1564        table_id: TableId,
1565        actor_id: ActorId,
1566        fragment_id: FragmentId,
1567    ) -> AggDistinctDedupMetrics {
1568        let label_list: &[&str; 3] = &[
1569            &table_id.to_string(),
1570            &actor_id.to_string(),
1571            &fragment_id.to_string(),
1572        ];
1573        AggDistinctDedupMetrics {
1574            agg_distinct_cache_miss_count: self
1575                .agg_distinct_cache_miss_count
1576                .with_guarded_label_values(label_list),
1577            agg_distinct_total_cache_count: self
1578                .agg_distinct_total_cache_count
1579                .with_guarded_label_values(label_list),
1580            agg_distinct_cached_entry_count: self
1581                .agg_distinct_cached_entry_count
1582                .with_guarded_label_values(label_list),
1583        }
1584    }
1585
1586    pub fn new_temporal_join_metrics(
1587        &self,
1588        table_id: TableId,
1589        actor_id: ActorId,
1590        fragment_id: FragmentId,
1591    ) -> TemporalJoinMetrics {
1592        let label_list: &[&str; 3] = &[
1593            &table_id.to_string(),
1594            &actor_id.to_string(),
1595            &fragment_id.to_string(),
1596        ];
1597        TemporalJoinMetrics {
1598            temporal_join_cache_miss_count: self
1599                .temporal_join_cache_miss_count
1600                .with_guarded_label_values(label_list),
1601            temporal_join_total_query_cache_count: self
1602                .temporal_join_total_query_cache_count
1603                .with_guarded_label_values(label_list),
1604            temporal_join_cached_entry_count: self
1605                .temporal_join_cached_entry_count
1606                .with_guarded_label_values(label_list),
1607        }
1608    }
1609
1610    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1611        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1612        BackfillMetrics {
1613            backfill_snapshot_read_row_count: self
1614                .backfill_snapshot_read_row_count
1615                .with_guarded_label_values(label_list),
1616            backfill_upstream_output_row_count: self
1617                .backfill_upstream_output_row_count
1618                .with_guarded_label_values(label_list),
1619        }
1620    }
1621
1622    pub fn new_cdc_backfill_metrics(
1623        &self,
1624        table_id: TableId,
1625        actor_id: ActorId,
1626    ) -> CdcBackfillMetrics {
1627        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1628        CdcBackfillMetrics {
1629            cdc_backfill_snapshot_read_row_count: self
1630                .cdc_backfill_snapshot_read_row_count
1631                .with_guarded_label_values(label_list),
1632            cdc_backfill_upstream_output_row_count: self
1633                .cdc_backfill_upstream_output_row_count
1634                .with_guarded_label_values(label_list),
1635        }
1636    }
1637
1638    pub fn new_over_window_metrics(
1639        &self,
1640        table_id: TableId,
1641        actor_id: ActorId,
1642        fragment_id: FragmentId,
1643    ) -> OverWindowMetrics {
1644        let label_list: &[&str; 3] = &[
1645            &table_id.to_string(),
1646            &actor_id.to_string(),
1647            &fragment_id.to_string(),
1648        ];
1649        OverWindowMetrics {
1650            over_window_cached_entry_count: self
1651                .over_window_cached_entry_count
1652                .with_guarded_label_values(label_list),
1653            over_window_cache_lookup_count: self
1654                .over_window_cache_lookup_count
1655                .with_guarded_label_values(label_list),
1656            over_window_cache_miss_count: self
1657                .over_window_cache_miss_count
1658                .with_guarded_label_values(label_list),
1659            over_window_range_cache_entry_count: self
1660                .over_window_range_cache_entry_count
1661                .with_guarded_label_values(label_list),
1662            over_window_range_cache_lookup_count: self
1663                .over_window_range_cache_lookup_count
1664                .with_guarded_label_values(label_list),
1665            over_window_range_cache_left_miss_count: self
1666                .over_window_range_cache_left_miss_count
1667                .with_guarded_label_values(label_list),
1668            over_window_range_cache_right_miss_count: self
1669                .over_window_range_cache_right_miss_count
1670                .with_guarded_label_values(label_list),
1671            over_window_accessed_entry_count: self
1672                .over_window_accessed_entry_count
1673                .with_guarded_label_values(label_list),
1674            over_window_compute_count: self
1675                .over_window_compute_count
1676                .with_guarded_label_values(label_list),
1677            over_window_same_output_count: self
1678                .over_window_same_output_count
1679                .with_guarded_label_values(label_list),
1680        }
1681    }
1682
1683    pub fn new_materialize_metrics(
1684        &self,
1685        table_id: TableId,
1686        actor_id: ActorId,
1687        fragment_id: FragmentId,
1688    ) -> MaterializeMetrics {
1689        let label_list: &[&str; 3] = &[
1690            &actor_id.to_string(),
1691            &table_id.to_string(),
1692            &fragment_id.to_string(),
1693        ];
1694        MaterializeMetrics {
1695            materialize_cache_hit_count: self
1696                .materialize_cache_hit_count
1697                .with_guarded_label_values(label_list),
1698            materialize_data_exist_count: self
1699                .materialize_data_exist_count
1700                .with_guarded_label_values(label_list),
1701            materialize_cache_total_count: self
1702                .materialize_cache_total_count
1703                .with_guarded_label_values(label_list),
1704            materialize_input_row_count: self
1705                .materialize_input_row_count
1706                .with_guarded_label_values(label_list),
1707            materialize_current_epoch: self
1708                .materialize_current_epoch
1709                .with_guarded_label_values(label_list),
1710        }
1711    }
1712}
1713
1714pub(crate) struct ActorInputMetrics {
1715    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1716    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1717}
1718
1719/// Tokio metrics for actors
1720pub struct ActorMetrics {
1721    pub actor_execution_time: LabelGuardedGauge,
1722    pub actor_scheduled_duration: LabelGuardedGauge,
1723    pub actor_scheduled_cnt: LabelGuardedIntGauge,
1724    pub actor_fast_poll_duration: LabelGuardedGauge,
1725    pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1726    pub actor_slow_poll_duration: LabelGuardedGauge,
1727    pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1728    pub actor_poll_duration: LabelGuardedGauge,
1729    pub actor_poll_cnt: LabelGuardedIntGauge,
1730    pub actor_idle_duration: LabelGuardedGauge,
1731    pub actor_idle_cnt: LabelGuardedIntGauge,
1732}
1733
1734pub struct SinkExecutorMetrics {
1735    pub sink_input_row_count: LabelGuardedIntCounter,
1736    pub sink_input_bytes: LabelGuardedIntCounter,
1737    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1738}
1739
1740pub struct MaterializeMetrics {
1741    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1742    pub materialize_data_exist_count: LabelGuardedIntCounter,
1743    pub materialize_cache_total_count: LabelGuardedIntCounter,
1744    pub materialize_input_row_count: LabelGuardedIntCounter,
1745    pub materialize_current_epoch: LabelGuardedIntGauge,
1746}
1747
1748pub struct GroupTopNMetrics {
1749    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1750    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1751    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1752}
1753
1754pub struct LookupExecutorMetrics {
1755    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1756    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1757    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1758}
1759
1760pub struct HashAggMetrics {
1761    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1762    pub agg_total_lookup_count: LabelGuardedIntCounter,
1763    pub agg_cached_entry_count: LabelGuardedIntGauge,
1764    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1765    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1766    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1767    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1768    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1769    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1770}
1771
1772pub struct AggDistinctDedupMetrics {
1773    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1774    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1775    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1776}
1777
1778pub struct TemporalJoinMetrics {
1779    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1780    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1781    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1782}
1783
1784pub struct BackfillMetrics {
1785    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1786    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1787}
1788
1789#[derive(Clone)]
1790pub struct CdcBackfillMetrics {
1791    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1792    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1793}
1794
1795pub struct OverWindowMetrics {
1796    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1797    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1798    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1799    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1800    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1801    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1802    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1803    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1804    pub over_window_compute_count: LabelGuardedIntCounter,
1805    pub over_window_same_output_count: LabelGuardedIntCounter,
1806}