risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26    LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_scheduled_duration: RelabeledGuardedIntCounterVec,
58    actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
59    actor_poll_duration: RelabeledGuardedIntCounterVec,
60    actor_poll_cnt: RelabeledGuardedIntCounterVec,
61    actor_idle_duration: RelabeledGuardedIntCounterVec,
62    actor_idle_cnt: RelabeledGuardedIntCounterVec,
63
64    // Streaming actor
65    pub actor_count: LabelGuardedIntGaugeVec,
66    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
67    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
68    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
69
70    // Source
71    pub source_output_row_count: LabelGuardedIntCounterVec,
72    pub source_split_change_count: LabelGuardedIntCounterVec,
73    pub source_backfill_row_count: LabelGuardedIntCounterVec,
74
75    // Sink
76    sink_input_row_count: LabelGuardedIntCounterVec,
77    sink_input_bytes: LabelGuardedIntCounterVec,
78    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
79
80    // Exchange (see also `compute::ExchangeServiceMetrics`)
81    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
82
83    // Streaming Merge (We break out this metric from `barrier_align_duration` because
84    // the alignment happens on different levels)
85    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
86
87    // Backpressure
88    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
89    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90
91    // Streaming Join
92    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
93    pub join_lookup_total_count: LabelGuardedIntCounterVec,
94    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
95    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
96    pub join_match_duration_ns: LabelGuardedIntCounterVec,
97    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
98    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
99
100    // Streaming Join, Streaming Dynamic Filter and Streaming Union
101    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
102
103    // Streaming Aggregation
104    agg_lookup_miss_count: LabelGuardedIntCounterVec,
105    agg_total_lookup_count: LabelGuardedIntCounterVec,
106    agg_cached_entry_count: LabelGuardedIntGaugeVec,
107    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
108    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
109    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
110    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
111    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
112    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
113    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
114    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
115    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
116
117    // Streaming TopN
118    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
119    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
120    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
121    // TODO(rc): why not just use the above three?
122    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
123    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
124    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
125
126    // Lookup executor
127    lookup_cache_miss_count: LabelGuardedIntCounterVec,
128    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
129    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // temporal join
132    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
133    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
134    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // Backfill
137    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
138    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
139
140    // CDC Backfill
141    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
142    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
143
144    // Snapshot Backfill
145    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
146
147    // Over Window
148    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
149    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
150    over_window_cache_miss_count: LabelGuardedIntCounterVec,
151    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
152    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
153    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
154    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
155    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
156    over_window_compute_count: LabelGuardedIntCounterVec,
157    over_window_same_output_count: LabelGuardedIntCounterVec,
158
159    /// The duration from receipt of barrier to all actors collection.
160    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
161    /// to flow through the graph.
162    pub barrier_inflight_latency: Histogram,
163    /// The duration of sync to storage.
164    pub barrier_sync_latency: Histogram,
165    pub barrier_batch_size: Histogram,
166    /// The progress made by the earliest in-flight barriers in the local barrier manager.
167    pub barrier_manager_progress: IntCounter,
168
169    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
170    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
171    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
172    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
173    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
174    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
175    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
176    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
177    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
178    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
179
180    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
181    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
182    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
183    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
184    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
185    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
188    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
189    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
190    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
191
192    // Memory management
193    pub lru_runtime_loop_count: IntCounter,
194    pub lru_latest_sequence: IntGauge,
195    pub lru_watermark_sequence: IntGauge,
196    pub lru_eviction_policy: IntGauge,
197    pub jemalloc_allocated_bytes: IntGauge,
198    pub jemalloc_active_bytes: IntGauge,
199    pub jemalloc_resident_bytes: IntGauge,
200    pub jemalloc_metadata_bytes: IntGauge,
201    pub jvm_allocated_bytes: IntGauge,
202    pub jvm_active_bytes: IntGauge,
203    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
204
205    // Materialized view
206    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
207    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
208    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
209    materialize_input_row_count: RelabeledGuardedIntCounterVec,
210    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
211
212    // PostgreSQL CDC LSN monitoring
213    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
214    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
215
216    // MySQL CDC binlog monitoring
217    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
218    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
219
220    // Gap Fill
221    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
222}
223
224pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
225
226pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
227    GLOBAL_STREAMING_METRICS
228        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
229        .clone()
230}
231
232impl StreamingMetrics {
233    fn new(registry: &Registry, level: MetricLevel) -> Self {
234        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
235            "stream_executor_row_count",
236            "Total number of rows that have been output from each executor",
237            &["actor_id", "fragment_id", "executor_identity"],
238            registry
239        )
240        .unwrap()
241        .relabel_debug_1(level);
242
243        let stream_node_output_row_count = CountMap::new();
244        let stream_node_output_blocking_duration_ns = CountMap::new();
245
246        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
247            "stream_source_output_rows_counts",
248            "Total number of rows that have been output from source",
249            &["source_id", "source_name", "actor_id", "fragment_id"],
250            registry
251        )
252        .unwrap();
253
254        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
255            "stream_source_split_change_event_count",
256            "Total number of split change events that have been operated by source",
257            &["source_id", "source_name", "actor_id", "fragment_id"],
258            registry
259        )
260        .unwrap();
261
262        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
263            "stream_source_backfill_rows_counts",
264            "Total number of rows that have been backfilled for source",
265            &["source_id", "source_name", "actor_id", "fragment_id"],
266            registry
267        )
268        .unwrap();
269
270        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
271            "stream_sink_input_row_count",
272            "Total number of rows streamed into sink executors",
273            &["sink_id", "actor_id", "fragment_id"],
274            registry
275        )
276        .unwrap();
277
278        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
279            "stream_sink_input_bytes",
280            "Total size of chunks streamed into sink executors",
281            &["sink_id", "actor_id", "fragment_id"],
282            registry
283        )
284        .unwrap();
285
286        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
287            "stream_mview_input_row_count",
288            "Total number of rows streamed into materialize executors",
289            &["actor_id", "table_id", "fragment_id"],
290            registry
291        )
292        .unwrap()
293        .relabel_debug_1(level);
294
295        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
296            "stream_mview_current_epoch",
297            "The current epoch of the materialized executor",
298            &["actor_id", "table_id", "fragment_id"],
299            registry
300        )
301        .unwrap()
302        .relabel_debug_1(level);
303
304        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
305            "stream_pg_cdc_state_table_lsn",
306            "Current LSN value stored in PostgreSQL CDC state table",
307            &["source_id"],
308            registry,
309        )
310        .unwrap();
311
312        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
313            "stream_pg_cdc_jni_commit_offset_lsn",
314            "LSN value when JNI commit offset is called for PostgreSQL CDC",
315            &["source_id"],
316            registry,
317        )
318        .unwrap();
319
320        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
321            "stream_mysql_cdc_state_binlog_file_seq",
322            "Current binlog file sequence number stored in MySQL CDC state table",
323            &["source_id"],
324            registry,
325        )
326        .unwrap();
327
328        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
329            "stream_mysql_cdc_state_binlog_position",
330            "Current binlog position stored in MySQL CDC state table",
331            &["source_id"],
332            registry,
333        )
334        .unwrap();
335
336        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
337            "stream_sink_chunk_buffer_size",
338            "Total size of chunks buffered in a barrier",
339            &["sink_id", "actor_id", "fragment_id"],
340            registry
341        )
342        .unwrap();
343        let actor_output_buffer_blocking_duration_ns =
344            register_guarded_int_counter_vec_with_registry!(
345                "stream_actor_output_buffer_blocking_duration_ns",
346                "Total blocking duration (ns) of output buffer",
347                &["actor_id", "fragment_id", "downstream_fragment_id"],
348                registry
349            )
350            .unwrap()
351            // mask the first label `actor_id` if the level is less verbose than `Debug`
352            .relabel_debug_1(level);
353
354        let actor_input_buffer_blocking_duration_ns =
355            register_guarded_int_counter_vec_with_registry!(
356                "stream_actor_input_buffer_blocking_duration_ns",
357                "Total blocking duration (ns) of input buffer",
358                &["actor_id", "fragment_id", "upstream_fragment_id"],
359                registry
360            )
361            .unwrap()
362            // mask the first label `actor_id` if the level is less verbose than `Debug`
363            .relabel_debug_1(level);
364
365        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
366            "stream_exchange_frag_recv_size",
367            "Total size of messages that have been received from upstream Fragment",
368            &["up_fragment_id", "down_fragment_id"],
369            registry
370        )
371        .unwrap();
372
373        let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
374            "stream_actor_poll_duration",
375            "tokio's metrics",
376            &["actor_id", "fragment_id"],
377            registry
378        )
379        .unwrap()
380        .relabel_debug_1(level);
381
382        let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
383            "stream_actor_poll_cnt",
384            "tokio's metrics",
385            &["actor_id", "fragment_id"],
386            registry
387        )
388        .unwrap()
389        .relabel_debug_1(level);
390
391        let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
392            "stream_actor_scheduled_duration",
393            "tokio's metrics",
394            &["actor_id", "fragment_id"],
395            registry
396        )
397        .unwrap()
398        .relabel_debug_1(level);
399
400        let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
401            "stream_actor_scheduled_cnt",
402            "tokio's metrics",
403            &["actor_id", "fragment_id"],
404            registry
405        )
406        .unwrap()
407        .relabel_debug_1(level);
408
409        let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
410            "stream_actor_idle_duration",
411            "tokio's metrics",
412            &["actor_id", "fragment_id"],
413            registry
414        )
415        .unwrap()
416        .relabel_debug_1(level);
417
418        let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
419            "stream_actor_idle_cnt",
420            "tokio's metrics",
421            &["actor_id", "fragment_id"],
422            registry
423        )
424        .unwrap()
425        .relabel_debug_1(level);
426
427        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
428            "stream_actor_in_record_cnt",
429            "Total number of rows actor received",
430            &["actor_id", "fragment_id", "upstream_fragment_id"],
431            registry
432        )
433        .unwrap()
434        .relabel_debug_1(level);
435
436        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
437            "stream_actor_out_record_cnt",
438            "Total number of rows actor sent",
439            &["actor_id", "fragment_id"],
440            registry
441        )
442        .unwrap()
443        .relabel_debug_1(level);
444
445        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
446            "stream_actor_current_epoch",
447            "Current epoch of actor",
448            &["actor_id", "fragment_id"],
449            registry
450        )
451        .unwrap()
452        .relabel_debug_1(level);
453
454        let actor_count = register_guarded_int_gauge_vec_with_registry!(
455            "stream_actor_count",
456            "Total number of actors (parallelism)",
457            &["fragment_id"],
458            registry
459        )
460        .unwrap();
461
462        let opts = histogram_opts!(
463            "stream_merge_barrier_align_duration",
464            "Duration of merge align barrier",
465            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
466        );
467        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
468            opts,
469            &["actor_id", "fragment_id"],
470            registry
471        )
472        .unwrap()
473        .relabel_debug_1(level);
474
475        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
476            "stream_join_lookup_miss_count",
477            "Join executor lookup miss duration",
478            &["side", "join_table_id", "actor_id", "fragment_id"],
479            registry
480        )
481        .unwrap();
482
483        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
484            "stream_join_lookup_total_count",
485            "Join executor lookup total operation",
486            &["side", "join_table_id", "actor_id", "fragment_id"],
487            registry
488        )
489        .unwrap();
490
491        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
492            "stream_join_insert_cache_miss_count",
493            "Join executor cache miss when insert operation",
494            &["side", "join_table_id", "actor_id", "fragment_id"],
495            registry
496        )
497        .unwrap();
498
499        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
500            "stream_join_actor_input_waiting_duration_ns",
501            "Total waiting duration (ns) of input buffer of join actor",
502            &["actor_id", "fragment_id"],
503            registry
504        )
505        .unwrap();
506
507        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
508            "stream_join_match_duration_ns",
509            "Matching duration for each side",
510            &["actor_id", "fragment_id", "side"],
511            registry
512        )
513        .unwrap();
514
515        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
516            "stream_barrier_align_duration_ns",
517            "Duration of join align barrier",
518            &["actor_id", "fragment_id", "wait_side", "executor"],
519            registry
520        )
521        .unwrap()
522        .relabel_debug_1(level);
523
524        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
525            "stream_join_cached_entry_count",
526            "Number of cached entries in streaming join operators",
527            &["actor_id", "fragment_id", "side"],
528            registry
529        )
530        .unwrap();
531
532        let join_matched_join_keys_opts = histogram_opts!(
533            "stream_join_matched_join_keys",
534            "The number of keys matched in the opposite side",
535            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
536        );
537
538        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
539            join_matched_join_keys_opts,
540            &["actor_id", "fragment_id", "table_id"],
541            registry
542        )
543        .unwrap()
544        .relabel_debug_1(level);
545
546        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
547            "stream_agg_lookup_miss_count",
548            "Aggregation executor lookup miss duration",
549            &["table_id", "actor_id", "fragment_id"],
550            registry
551        )
552        .unwrap();
553
554        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
555            "stream_agg_lookup_total_count",
556            "Aggregation executor lookup total operation",
557            &["table_id", "actor_id", "fragment_id"],
558            registry
559        )
560        .unwrap();
561
562        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
563            "stream_agg_distinct_cache_miss_count",
564            "Aggregation executor dinsinct miss duration",
565            &["table_id", "actor_id", "fragment_id"],
566            registry
567        )
568        .unwrap();
569
570        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
571            "stream_agg_distinct_total_cache_count",
572            "Aggregation executor distinct total operation",
573            &["table_id", "actor_id", "fragment_id"],
574            registry
575        )
576        .unwrap();
577
578        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
579            "stream_agg_distinct_cached_entry_count",
580            "Total entry counts in distinct aggregation executor cache",
581            &["table_id", "actor_id", "fragment_id"],
582            registry
583        )
584        .unwrap();
585
586        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
587            "stream_agg_dirty_groups_count",
588            "Total dirty group counts in aggregation executor",
589            &["table_id", "actor_id", "fragment_id"],
590            registry
591        )
592        .unwrap();
593
594        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
595            "stream_agg_dirty_groups_heap_size",
596            "Total dirty group heap size in aggregation executor",
597            &["table_id", "actor_id", "fragment_id"],
598            registry
599        )
600        .unwrap();
601
602        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
603            "stream_agg_state_cache_lookup_count",
604            "Aggregation executor state cache lookup count",
605            &["table_id", "actor_id", "fragment_id"],
606            registry
607        )
608        .unwrap();
609
610        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
611            "stream_agg_state_cache_miss_count",
612            "Aggregation executor state cache miss count",
613            &["table_id", "actor_id", "fragment_id"],
614            registry
615        )
616        .unwrap();
617
618        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
619            "stream_group_top_n_cache_miss_count",
620            "Group top n executor cache miss count",
621            &["table_id", "actor_id", "fragment_id"],
622            registry
623        )
624        .unwrap();
625
626        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
627            "stream_group_top_n_total_query_cache_count",
628            "Group top n executor query cache total count",
629            &["table_id", "actor_id", "fragment_id"],
630            registry
631        )
632        .unwrap();
633
634        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
635            "stream_group_top_n_cached_entry_count",
636            "Total entry counts in group top n executor cache",
637            &["table_id", "actor_id", "fragment_id"],
638            registry
639        )
640        .unwrap();
641
642        let group_top_n_appendonly_cache_miss_count =
643            register_guarded_int_counter_vec_with_registry!(
644                "stream_group_top_n_appendonly_cache_miss_count",
645                "Group top n appendonly executor cache miss count",
646                &["table_id", "actor_id", "fragment_id"],
647                registry
648            )
649            .unwrap();
650
651        let group_top_n_appendonly_total_query_cache_count =
652            register_guarded_int_counter_vec_with_registry!(
653                "stream_group_top_n_appendonly_total_query_cache_count",
654                "Group top n appendonly executor total cache count",
655                &["table_id", "actor_id", "fragment_id"],
656                registry
657            )
658            .unwrap();
659
660        let group_top_n_appendonly_cached_entry_count =
661            register_guarded_int_gauge_vec_with_registry!(
662                "stream_group_top_n_appendonly_cached_entry_count",
663                "Total entry counts in group top n appendonly executor cache",
664                &["table_id", "actor_id", "fragment_id"],
665                registry
666            )
667            .unwrap();
668
669        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
670            "stream_lookup_cache_miss_count",
671            "Lookup executor cache miss count",
672            &["table_id", "actor_id", "fragment_id"],
673            registry
674        )
675        .unwrap();
676
677        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
678            "stream_lookup_total_query_cache_count",
679            "Lookup executor query cache total count",
680            &["table_id", "actor_id", "fragment_id"],
681            registry
682        )
683        .unwrap();
684
685        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
686            "stream_lookup_cached_entry_count",
687            "Total entry counts in lookup executor cache",
688            &["table_id", "actor_id", "fragment_id"],
689            registry
690        )
691        .unwrap();
692
693        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
694            "stream_temporal_join_cache_miss_count",
695            "Temporal join executor cache miss count",
696            &["table_id", "actor_id", "fragment_id"],
697            registry
698        )
699        .unwrap();
700
701        let temporal_join_total_query_cache_count =
702            register_guarded_int_counter_vec_with_registry!(
703                "stream_temporal_join_total_query_cache_count",
704                "Temporal join executor query cache total count",
705                &["table_id", "actor_id", "fragment_id"],
706                registry
707            )
708            .unwrap();
709
710        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
711            "stream_temporal_join_cached_entry_count",
712            "Total entry count in temporal join executor cache",
713            &["table_id", "actor_id", "fragment_id"],
714            registry
715        )
716        .unwrap();
717
718        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
719            "stream_agg_cached_entry_count",
720            "Number of cached keys in streaming aggregation operators",
721            &["table_id", "actor_id", "fragment_id"],
722            registry
723        )
724        .unwrap();
725
726        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
727            "stream_agg_chunk_lookup_miss_count",
728            "Aggregation executor chunk-level lookup miss duration",
729            &["table_id", "actor_id", "fragment_id"],
730            registry
731        )
732        .unwrap();
733
734        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
735            "stream_agg_chunk_lookup_total_count",
736            "Aggregation executor chunk-level lookup total operation",
737            &["table_id", "actor_id", "fragment_id"],
738            registry
739        )
740        .unwrap();
741
742        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
743            "stream_backfill_snapshot_read_row_count",
744            "Total number of rows that have been read from the backfill snapshot",
745            &["table_id", "actor_id"],
746            registry
747        )
748        .unwrap();
749
750        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
751            "stream_backfill_upstream_output_row_count",
752            "Total number of rows that have been output from the backfill upstream",
753            &["table_id", "actor_id"],
754            registry
755        )
756        .unwrap();
757
758        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
759            "stream_cdc_backfill_snapshot_read_row_count",
760            "Total number of rows that have been read from the cdc_backfill snapshot",
761            &["table_id", "actor_id"],
762            registry
763        )
764        .unwrap();
765
766        let cdc_backfill_upstream_output_row_count =
767            register_guarded_int_counter_vec_with_registry!(
768                "stream_cdc_backfill_upstream_output_row_count",
769                "Total number of rows that have been output from the cdc_backfill upstream",
770                &["table_id", "actor_id"],
771                registry
772            )
773            .unwrap();
774
775        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
776            "stream_snapshot_backfill_consume_snapshot_row_count",
777            "Total number of rows that have been output from snapshot backfill",
778            &["table_id", "actor_id", "stage"],
779            registry
780        )
781        .unwrap();
782
783        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
784            "stream_over_window_cached_entry_count",
785            "Total entry (partition) count in over window executor cache",
786            &["table_id", "actor_id", "fragment_id"],
787            registry
788        )
789        .unwrap();
790
791        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
792            "stream_over_window_cache_lookup_count",
793            "Over window executor cache lookup count",
794            &["table_id", "actor_id", "fragment_id"],
795            registry
796        )
797        .unwrap();
798
799        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
800            "stream_over_window_cache_miss_count",
801            "Over window executor cache miss count",
802            &["table_id", "actor_id", "fragment_id"],
803            registry
804        )
805        .unwrap();
806
807        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
808            "stream_over_window_range_cache_entry_count",
809            "Over window partition range cache entry count",
810            &["table_id", "actor_id", "fragment_id"],
811            registry,
812        )
813        .unwrap();
814
815        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
816            "stream_over_window_range_cache_lookup_count",
817            "Over window partition range cache lookup count",
818            &["table_id", "actor_id", "fragment_id"],
819            registry
820        )
821        .unwrap();
822
823        let over_window_range_cache_left_miss_count =
824            register_guarded_int_counter_vec_with_registry!(
825                "stream_over_window_range_cache_left_miss_count",
826                "Over window partition range cache left miss count",
827                &["table_id", "actor_id", "fragment_id"],
828                registry
829            )
830            .unwrap();
831
832        let over_window_range_cache_right_miss_count =
833            register_guarded_int_counter_vec_with_registry!(
834                "stream_over_window_range_cache_right_miss_count",
835                "Over window partition range cache right miss count",
836                &["table_id", "actor_id", "fragment_id"],
837                registry
838            )
839            .unwrap();
840
841        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
842            "stream_over_window_accessed_entry_count",
843            "Over window accessed entry count",
844            &["table_id", "actor_id", "fragment_id"],
845            registry
846        )
847        .unwrap();
848
849        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
850            "stream_over_window_compute_count",
851            "Over window compute count",
852            &["table_id", "actor_id", "fragment_id"],
853            registry
854        )
855        .unwrap();
856
857        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
858            "stream_over_window_same_output_count",
859            "Over window same output count",
860            &["table_id", "actor_id", "fragment_id"],
861            registry
862        )
863        .unwrap();
864
865        let opts = histogram_opts!(
866            "stream_barrier_inflight_duration_seconds",
867            "barrier_inflight_latency",
868            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
869        );
870        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
871
872        let opts = histogram_opts!(
873            "stream_barrier_sync_storage_duration_seconds",
874            "barrier_sync_latency",
875            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
876        );
877        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
878
879        let opts = histogram_opts!(
880            "stream_barrier_batch_size",
881            "barrier_batch_size",
882            exponential_buckets(1.0, 2.0, 8).unwrap()
883        );
884        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
885
886        let barrier_manager_progress = register_int_counter_with_registry!(
887            "stream_barrier_manager_progress",
888            "The number of actors that have processed the earliest in-flight barriers",
889            registry
890        )
891        .unwrap();
892
893        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
894            "sync_kv_log_store_wait_next_poll_ns",
895            "Total duration (ns) of waiting for next poll",
896            &["actor_id", "target", "fragment_id", "relation"],
897            registry
898        )
899        .unwrap();
900
901        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
902            "sync_kv_log_store_read_count",
903            "read row count throughput of sync_kv log store",
904            &["type", "actor_id", "target", "fragment_id", "relation"],
905            registry
906        )
907        .unwrap();
908
909        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
910            "sync_kv_log_store_read_size",
911            "read size throughput of sync_kv log store",
912            &["type", "actor_id", "target", "fragment_id", "relation"],
913            registry
914        )
915        .unwrap();
916
917        let sync_kv_log_store_write_pause_duration_ns =
918            register_guarded_int_counter_vec_with_registry!(
919                "sync_kv_log_store_write_pause_duration_ns",
920                "Duration (ns) of sync_kv log store write pause",
921                &["actor_id", "target", "fragment_id", "relation"],
922                registry
923            )
924            .unwrap();
925
926        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
927            "sync_kv_log_store_state",
928            "clean/unclean state transition for sync_kv log store",
929            &["state", "actor_id", "target", "fragment_id", "relation"],
930            registry
931        )
932        .unwrap();
933
934        let sync_kv_log_store_storage_write_count =
935            register_guarded_int_counter_vec_with_registry!(
936                "sync_kv_log_store_storage_write_count",
937                "Write row count throughput of sync_kv log store",
938                &["actor_id", "target", "fragment_id", "relation"],
939                registry
940            )
941            .unwrap();
942
943        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
944            "sync_kv_log_store_storage_write_size",
945            "Write size throughput of sync_kv log store",
946            &["actor_id", "target", "fragment_id", "relation"],
947            registry
948        )
949        .unwrap();
950
951        let sync_kv_log_store_buffer_unconsumed_item_count =
952            register_guarded_int_gauge_vec_with_registry!(
953                "sync_kv_log_store_buffer_unconsumed_item_count",
954                "Number of Unconsumed Item in buffer",
955                &["actor_id", "target", "fragment_id", "relation"],
956                registry
957            )
958            .unwrap();
959
960        let sync_kv_log_store_buffer_unconsumed_row_count =
961            register_guarded_int_gauge_vec_with_registry!(
962                "sync_kv_log_store_buffer_unconsumed_row_count",
963                "Number of Unconsumed Row in buffer",
964                &["actor_id", "target", "fragment_id", "relation"],
965                registry
966            )
967            .unwrap();
968
969        let sync_kv_log_store_buffer_unconsumed_epoch_count =
970            register_guarded_int_gauge_vec_with_registry!(
971                "sync_kv_log_store_buffer_unconsumed_epoch_count",
972                "Number of Unconsumed Epoch in buffer",
973                &["actor_id", "target", "fragment_id", "relation"],
974                registry
975            )
976            .unwrap();
977
978        let sync_kv_log_store_buffer_unconsumed_min_epoch =
979            register_guarded_int_gauge_vec_with_registry!(
980                "sync_kv_log_store_buffer_unconsumed_min_epoch",
981                "Number of Unconsumed Epoch in buffer",
982                &["actor_id", "target", "fragment_id", "relation"],
983                registry
984            )
985            .unwrap();
986
987        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
988            "kv_log_store_storage_write_count",
989            "Write row count throughput of kv log store",
990            &["actor_id", "connector", "sink_id", "sink_name"],
991            registry
992        )
993        .unwrap();
994
995        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
996            "kv_log_store_storage_write_size",
997            "Write size throughput of kv log store",
998            &["actor_id", "connector", "sink_id", "sink_name"],
999            registry
1000        )
1001        .unwrap();
1002
1003        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1004            "kv_log_store_storage_read_count",
1005            "Write row count throughput of kv log store",
1006            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1007            registry
1008        )
1009        .unwrap();
1010
1011        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1012            "kv_log_store_storage_read_size",
1013            "Write size throughput of kv log store",
1014            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1015            registry
1016        )
1017        .unwrap();
1018
1019        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1020            "kv_log_store_rewind_count",
1021            "Kv log store rewind rate",
1022            &["actor_id", "connector", "sink_id", "sink_name"],
1023            registry
1024        )
1025        .unwrap();
1026
1027        let kv_log_store_rewind_delay_opts = {
1028            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1029            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1030                - REWIND_BASE_DELAY.as_secs_f64().log2())
1031            .ceil() as usize;
1032            let buckets = exponential_buckets(
1033                REWIND_BASE_DELAY.as_secs_f64(),
1034                REWIND_BACKOFF_FACTOR as _,
1035                bucket_count,
1036            )
1037            .unwrap();
1038            histogram_opts!(
1039                "kv_log_store_rewind_delay",
1040                "Kv log store rewind delay",
1041                buckets,
1042            )
1043        };
1044
1045        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1046            kv_log_store_rewind_delay_opts,
1047            &["actor_id", "connector", "sink_id", "sink_name"],
1048            registry
1049        )
1050        .unwrap();
1051
1052        let kv_log_store_buffer_unconsumed_item_count =
1053            register_guarded_int_gauge_vec_with_registry!(
1054                "kv_log_store_buffer_unconsumed_item_count",
1055                "Number of Unconsumed Item in buffer",
1056                &["actor_id", "connector", "sink_id", "sink_name"],
1057                registry
1058            )
1059            .unwrap();
1060
1061        let kv_log_store_buffer_unconsumed_row_count =
1062            register_guarded_int_gauge_vec_with_registry!(
1063                "kv_log_store_buffer_unconsumed_row_count",
1064                "Number of Unconsumed Row in buffer",
1065                &["actor_id", "connector", "sink_id", "sink_name"],
1066                registry
1067            )
1068            .unwrap();
1069
1070        let kv_log_store_buffer_unconsumed_epoch_count =
1071            register_guarded_int_gauge_vec_with_registry!(
1072                "kv_log_store_buffer_unconsumed_epoch_count",
1073                "Number of Unconsumed Epoch in buffer",
1074                &["actor_id", "connector", "sink_id", "sink_name"],
1075                registry
1076            )
1077            .unwrap();
1078
1079        let kv_log_store_buffer_unconsumed_min_epoch =
1080            register_guarded_int_gauge_vec_with_registry!(
1081                "kv_log_store_buffer_unconsumed_min_epoch",
1082                "Number of Unconsumed Epoch in buffer",
1083                &["actor_id", "connector", "sink_id", "sink_name"],
1084                registry
1085            )
1086            .unwrap();
1087
1088        let lru_runtime_loop_count = register_int_counter_with_registry!(
1089            "lru_runtime_loop_count",
1090            "The counts of the eviction loop in LRU manager per second",
1091            registry
1092        )
1093        .unwrap();
1094
1095        let lru_latest_sequence = register_int_gauge_with_registry!(
1096            "lru_latest_sequence",
1097            "Current LRU global sequence",
1098            registry,
1099        )
1100        .unwrap();
1101
1102        let lru_watermark_sequence = register_int_gauge_with_registry!(
1103            "lru_watermark_sequence",
1104            "Current LRU watermark sequence",
1105            registry,
1106        )
1107        .unwrap();
1108
1109        let lru_eviction_policy = register_int_gauge_with_registry!(
1110            "lru_eviction_policy",
1111            "Current LRU eviction policy",
1112            registry,
1113        )
1114        .unwrap();
1115
1116        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1117            "jemalloc_allocated_bytes",
1118            "The allocated memory jemalloc, got from jemalloc_ctl",
1119            registry
1120        )
1121        .unwrap();
1122
1123        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1124            "jemalloc_active_bytes",
1125            "The active memory jemalloc, got from jemalloc_ctl",
1126            registry
1127        )
1128        .unwrap();
1129
1130        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1131            "jemalloc_resident_bytes",
1132            "The active memory jemalloc, got from jemalloc_ctl",
1133            registry
1134        )
1135        .unwrap();
1136
1137        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1138            "jemalloc_metadata_bytes",
1139            "The active memory jemalloc, got from jemalloc_ctl",
1140            registry
1141        )
1142        .unwrap();
1143
1144        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1145            "jvm_allocated_bytes",
1146            "The allocated jvm memory",
1147            registry
1148        )
1149        .unwrap();
1150
1151        let jvm_active_bytes = register_int_gauge_with_registry!(
1152            "jvm_active_bytes",
1153            "The active jvm memory",
1154            registry
1155        )
1156        .unwrap();
1157
1158        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1159            "stream_materialize_cache_hit_count",
1160            "Materialize executor cache hit count",
1161            &["actor_id", "table_id", "fragment_id"],
1162            registry
1163        )
1164        .unwrap()
1165        .relabel_debug_1(level);
1166
1167        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1168            "stream_materialize_data_exist_count",
1169            "Materialize executor data exist count",
1170            &["actor_id", "table_id", "fragment_id"],
1171            registry
1172        )
1173        .unwrap()
1174        .relabel_debug_1(level);
1175
1176        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1177            "stream_materialize_cache_total_count",
1178            "Materialize executor cache total operation",
1179            &["actor_id", "table_id", "fragment_id"],
1180            registry
1181        )
1182        .unwrap()
1183        .relabel_debug_1(level);
1184
1185        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1186            "stream_memory_usage",
1187            "Memory usage for stream executors",
1188            &["actor_id", "table_id", "desc"],
1189            registry
1190        )
1191        .unwrap()
1192        .relabel_debug_1(level);
1193
1194        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1195            "gap_fill_generated_rows_count",
1196            "Total number of rows generated by gap fill executor",
1197            &["actor_id", "fragment_id"],
1198            registry
1199        )
1200        .unwrap()
1201        .relabel_debug_1(level);
1202
1203        Self {
1204            level,
1205            executor_row_count,
1206            mem_stream_node_output_row_count: stream_node_output_row_count,
1207            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1208            actor_scheduled_duration,
1209            actor_scheduled_cnt,
1210            actor_poll_duration,
1211            actor_poll_cnt,
1212            actor_idle_duration,
1213            actor_idle_cnt,
1214            actor_count,
1215            actor_in_record_cnt,
1216            actor_out_record_cnt,
1217            actor_current_epoch,
1218            source_output_row_count,
1219            source_split_change_count,
1220            source_backfill_row_count,
1221            sink_input_row_count,
1222            sink_input_bytes,
1223            sink_chunk_buffer_size,
1224            exchange_frag_recv_size,
1225            merge_barrier_align_duration,
1226            actor_output_buffer_blocking_duration_ns,
1227            actor_input_buffer_blocking_duration_ns,
1228            join_lookup_miss_count,
1229            join_lookup_total_count,
1230            join_insert_cache_miss_count,
1231            join_actor_input_waiting_duration_ns,
1232            join_match_duration_ns,
1233            join_cached_entry_count,
1234            join_matched_join_keys,
1235            barrier_align_duration,
1236            agg_lookup_miss_count,
1237            agg_total_lookup_count,
1238            agg_cached_entry_count,
1239            agg_chunk_lookup_miss_count,
1240            agg_chunk_total_lookup_count,
1241            agg_dirty_groups_count,
1242            agg_dirty_groups_heap_size,
1243            agg_distinct_cache_miss_count,
1244            agg_distinct_total_cache_count,
1245            agg_distinct_cached_entry_count,
1246            agg_state_cache_lookup_count,
1247            agg_state_cache_miss_count,
1248            group_top_n_cache_miss_count,
1249            group_top_n_total_query_cache_count,
1250            group_top_n_cached_entry_count,
1251            group_top_n_appendonly_cache_miss_count,
1252            group_top_n_appendonly_total_query_cache_count,
1253            group_top_n_appendonly_cached_entry_count,
1254            lookup_cache_miss_count,
1255            lookup_total_query_cache_count,
1256            lookup_cached_entry_count,
1257            temporal_join_cache_miss_count,
1258            temporal_join_total_query_cache_count,
1259            temporal_join_cached_entry_count,
1260            backfill_snapshot_read_row_count,
1261            backfill_upstream_output_row_count,
1262            cdc_backfill_snapshot_read_row_count,
1263            cdc_backfill_upstream_output_row_count,
1264            snapshot_backfill_consume_row_count,
1265            over_window_cached_entry_count,
1266            over_window_cache_lookup_count,
1267            over_window_cache_miss_count,
1268            over_window_range_cache_entry_count,
1269            over_window_range_cache_lookup_count,
1270            over_window_range_cache_left_miss_count,
1271            over_window_range_cache_right_miss_count,
1272            over_window_accessed_entry_count,
1273            over_window_compute_count,
1274            over_window_same_output_count,
1275            barrier_inflight_latency,
1276            barrier_sync_latency,
1277            barrier_batch_size,
1278            barrier_manager_progress,
1279            kv_log_store_storage_write_count,
1280            kv_log_store_storage_write_size,
1281            kv_log_store_rewind_count,
1282            kv_log_store_rewind_delay,
1283            kv_log_store_storage_read_count,
1284            kv_log_store_storage_read_size,
1285            kv_log_store_buffer_unconsumed_item_count,
1286            kv_log_store_buffer_unconsumed_row_count,
1287            kv_log_store_buffer_unconsumed_epoch_count,
1288            kv_log_store_buffer_unconsumed_min_epoch,
1289            sync_kv_log_store_read_count,
1290            sync_kv_log_store_read_size,
1291            sync_kv_log_store_write_pause_duration_ns,
1292            sync_kv_log_store_state,
1293            sync_kv_log_store_wait_next_poll_ns,
1294            sync_kv_log_store_storage_write_count,
1295            sync_kv_log_store_storage_write_size,
1296            sync_kv_log_store_buffer_unconsumed_item_count,
1297            sync_kv_log_store_buffer_unconsumed_row_count,
1298            sync_kv_log_store_buffer_unconsumed_epoch_count,
1299            sync_kv_log_store_buffer_unconsumed_min_epoch,
1300            lru_runtime_loop_count,
1301            lru_latest_sequence,
1302            lru_watermark_sequence,
1303            lru_eviction_policy,
1304            jemalloc_allocated_bytes,
1305            jemalloc_active_bytes,
1306            jemalloc_resident_bytes,
1307            jemalloc_metadata_bytes,
1308            jvm_allocated_bytes,
1309            jvm_active_bytes,
1310            stream_memory_usage,
1311            materialize_cache_hit_count,
1312            materialize_data_exist_count,
1313            materialize_cache_total_count,
1314            materialize_input_row_count,
1315            materialize_current_epoch,
1316            pg_cdc_state_table_lsn,
1317            pg_cdc_jni_commit_offset_lsn,
1318            mysql_cdc_state_binlog_file_seq,
1319            mysql_cdc_state_binlog_position,
1320            gap_fill_generated_rows_count,
1321        }
1322    }
1323
1324    /// Create a new `StreamingMetrics` instance used in tests or other places.
1325    pub fn unused() -> Self {
1326        global_streaming_metrics(MetricLevel::Disabled)
1327    }
1328
1329    pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1330        let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1331        let actor_scheduled_duration = self
1332            .actor_scheduled_duration
1333            .with_guarded_label_values(label_list);
1334        let actor_scheduled_cnt = self
1335            .actor_scheduled_cnt
1336            .with_guarded_label_values(label_list);
1337        let actor_poll_duration = self
1338            .actor_poll_duration
1339            .with_guarded_label_values(label_list);
1340        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1341        let actor_idle_duration = self
1342            .actor_idle_duration
1343            .with_guarded_label_values(label_list);
1344        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1345        ActorMetrics {
1346            actor_scheduled_duration,
1347            actor_scheduled_cnt,
1348            actor_poll_duration,
1349            actor_poll_cnt,
1350            actor_idle_duration,
1351            actor_idle_cnt,
1352        }
1353    }
1354
1355    pub(crate) fn new_actor_input_metrics(
1356        &self,
1357        actor_id: ActorId,
1358        fragment_id: FragmentId,
1359        upstream_fragment_id: FragmentId,
1360    ) -> ActorInputMetrics {
1361        let actor_id_str = actor_id.to_string();
1362        let fragment_id_str = fragment_id.to_string();
1363        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1364        ActorInputMetrics {
1365            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1366                &actor_id_str,
1367                &fragment_id_str,
1368                &upstream_fragment_id_str,
1369            ]),
1370            actor_input_buffer_blocking_duration_ns: self
1371                .actor_input_buffer_blocking_duration_ns
1372                .with_guarded_label_values(&[
1373                    &actor_id_str,
1374                    &fragment_id_str,
1375                    &upstream_fragment_id_str,
1376                ]),
1377        }
1378    }
1379
1380    pub fn new_sink_exec_metrics(
1381        &self,
1382        id: SinkId,
1383        actor_id: ActorId,
1384        fragment_id: FragmentId,
1385    ) -> SinkExecutorMetrics {
1386        let label_list: &[&str; 3] = &[
1387            &id.to_string(),
1388            &actor_id.to_string(),
1389            &fragment_id.to_string(),
1390        ];
1391        SinkExecutorMetrics {
1392            sink_input_row_count: self
1393                .sink_input_row_count
1394                .with_guarded_label_values(label_list),
1395            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1396            sink_chunk_buffer_size: self
1397                .sink_chunk_buffer_size
1398                .with_guarded_label_values(label_list),
1399        }
1400    }
1401
1402    pub fn new_group_top_n_metrics(
1403        &self,
1404        table_id: TableId,
1405        actor_id: ActorId,
1406        fragment_id: FragmentId,
1407    ) -> GroupTopNMetrics {
1408        let label_list: &[&str; 3] = &[
1409            &table_id.to_string(),
1410            &actor_id.to_string(),
1411            &fragment_id.to_string(),
1412        ];
1413
1414        GroupTopNMetrics {
1415            group_top_n_cache_miss_count: self
1416                .group_top_n_cache_miss_count
1417                .with_guarded_label_values(label_list),
1418            group_top_n_total_query_cache_count: self
1419                .group_top_n_total_query_cache_count
1420                .with_guarded_label_values(label_list),
1421            group_top_n_cached_entry_count: self
1422                .group_top_n_cached_entry_count
1423                .with_guarded_label_values(label_list),
1424        }
1425    }
1426
1427    pub fn new_append_only_group_top_n_metrics(
1428        &self,
1429        table_id: TableId,
1430        actor_id: ActorId,
1431        fragment_id: FragmentId,
1432    ) -> GroupTopNMetrics {
1433        let label_list: &[&str; 3] = &[
1434            &table_id.to_string(),
1435            &actor_id.to_string(),
1436            &fragment_id.to_string(),
1437        ];
1438
1439        GroupTopNMetrics {
1440            group_top_n_cache_miss_count: self
1441                .group_top_n_appendonly_cache_miss_count
1442                .with_guarded_label_values(label_list),
1443            group_top_n_total_query_cache_count: self
1444                .group_top_n_appendonly_total_query_cache_count
1445                .with_guarded_label_values(label_list),
1446            group_top_n_cached_entry_count: self
1447                .group_top_n_appendonly_cached_entry_count
1448                .with_guarded_label_values(label_list),
1449        }
1450    }
1451
1452    pub fn new_lookup_executor_metrics(
1453        &self,
1454        table_id: TableId,
1455        actor_id: ActorId,
1456        fragment_id: FragmentId,
1457    ) -> LookupExecutorMetrics {
1458        let label_list: &[&str; 3] = &[
1459            &table_id.to_string(),
1460            &actor_id.to_string(),
1461            &fragment_id.to_string(),
1462        ];
1463
1464        LookupExecutorMetrics {
1465            lookup_cache_miss_count: self
1466                .lookup_cache_miss_count
1467                .with_guarded_label_values(label_list),
1468            lookup_total_query_cache_count: self
1469                .lookup_total_query_cache_count
1470                .with_guarded_label_values(label_list),
1471            lookup_cached_entry_count: self
1472                .lookup_cached_entry_count
1473                .with_guarded_label_values(label_list),
1474        }
1475    }
1476
1477    pub fn new_hash_agg_metrics(
1478        &self,
1479        table_id: TableId,
1480        actor_id: ActorId,
1481        fragment_id: FragmentId,
1482    ) -> HashAggMetrics {
1483        let label_list: &[&str; 3] = &[
1484            &table_id.to_string(),
1485            &actor_id.to_string(),
1486            &fragment_id.to_string(),
1487        ];
1488        HashAggMetrics {
1489            agg_lookup_miss_count: self
1490                .agg_lookup_miss_count
1491                .with_guarded_label_values(label_list),
1492            agg_total_lookup_count: self
1493                .agg_total_lookup_count
1494                .with_guarded_label_values(label_list),
1495            agg_cached_entry_count: self
1496                .agg_cached_entry_count
1497                .with_guarded_label_values(label_list),
1498            agg_chunk_lookup_miss_count: self
1499                .agg_chunk_lookup_miss_count
1500                .with_guarded_label_values(label_list),
1501            agg_chunk_total_lookup_count: self
1502                .agg_chunk_total_lookup_count
1503                .with_guarded_label_values(label_list),
1504            agg_dirty_groups_count: self
1505                .agg_dirty_groups_count
1506                .with_guarded_label_values(label_list),
1507            agg_dirty_groups_heap_size: self
1508                .agg_dirty_groups_heap_size
1509                .with_guarded_label_values(label_list),
1510            agg_state_cache_lookup_count: self
1511                .agg_state_cache_lookup_count
1512                .with_guarded_label_values(label_list),
1513            agg_state_cache_miss_count: self
1514                .agg_state_cache_miss_count
1515                .with_guarded_label_values(label_list),
1516        }
1517    }
1518
1519    pub fn new_agg_distinct_dedup_metrics(
1520        &self,
1521        table_id: TableId,
1522        actor_id: ActorId,
1523        fragment_id: FragmentId,
1524    ) -> AggDistinctDedupMetrics {
1525        let label_list: &[&str; 3] = &[
1526            &table_id.to_string(),
1527            &actor_id.to_string(),
1528            &fragment_id.to_string(),
1529        ];
1530        AggDistinctDedupMetrics {
1531            agg_distinct_cache_miss_count: self
1532                .agg_distinct_cache_miss_count
1533                .with_guarded_label_values(label_list),
1534            agg_distinct_total_cache_count: self
1535                .agg_distinct_total_cache_count
1536                .with_guarded_label_values(label_list),
1537            agg_distinct_cached_entry_count: self
1538                .agg_distinct_cached_entry_count
1539                .with_guarded_label_values(label_list),
1540        }
1541    }
1542
1543    pub fn new_temporal_join_metrics(
1544        &self,
1545        table_id: TableId,
1546        actor_id: ActorId,
1547        fragment_id: FragmentId,
1548    ) -> TemporalJoinMetrics {
1549        let label_list: &[&str; 3] = &[
1550            &table_id.to_string(),
1551            &actor_id.to_string(),
1552            &fragment_id.to_string(),
1553        ];
1554        TemporalJoinMetrics {
1555            temporal_join_cache_miss_count: self
1556                .temporal_join_cache_miss_count
1557                .with_guarded_label_values(label_list),
1558            temporal_join_total_query_cache_count: self
1559                .temporal_join_total_query_cache_count
1560                .with_guarded_label_values(label_list),
1561            temporal_join_cached_entry_count: self
1562                .temporal_join_cached_entry_count
1563                .with_guarded_label_values(label_list),
1564        }
1565    }
1566
1567    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1568        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1569        BackfillMetrics {
1570            backfill_snapshot_read_row_count: self
1571                .backfill_snapshot_read_row_count
1572                .with_guarded_label_values(label_list),
1573            backfill_upstream_output_row_count: self
1574                .backfill_upstream_output_row_count
1575                .with_guarded_label_values(label_list),
1576        }
1577    }
1578
1579    pub fn new_cdc_backfill_metrics(
1580        &self,
1581        table_id: TableId,
1582        actor_id: ActorId,
1583    ) -> CdcBackfillMetrics {
1584        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1585        CdcBackfillMetrics {
1586            cdc_backfill_snapshot_read_row_count: self
1587                .cdc_backfill_snapshot_read_row_count
1588                .with_guarded_label_values(label_list),
1589            cdc_backfill_upstream_output_row_count: self
1590                .cdc_backfill_upstream_output_row_count
1591                .with_guarded_label_values(label_list),
1592        }
1593    }
1594
1595    pub fn new_over_window_metrics(
1596        &self,
1597        table_id: TableId,
1598        actor_id: ActorId,
1599        fragment_id: FragmentId,
1600    ) -> OverWindowMetrics {
1601        let label_list: &[&str; 3] = &[
1602            &table_id.to_string(),
1603            &actor_id.to_string(),
1604            &fragment_id.to_string(),
1605        ];
1606        OverWindowMetrics {
1607            over_window_cached_entry_count: self
1608                .over_window_cached_entry_count
1609                .with_guarded_label_values(label_list),
1610            over_window_cache_lookup_count: self
1611                .over_window_cache_lookup_count
1612                .with_guarded_label_values(label_list),
1613            over_window_cache_miss_count: self
1614                .over_window_cache_miss_count
1615                .with_guarded_label_values(label_list),
1616            over_window_range_cache_entry_count: self
1617                .over_window_range_cache_entry_count
1618                .with_guarded_label_values(label_list),
1619            over_window_range_cache_lookup_count: self
1620                .over_window_range_cache_lookup_count
1621                .with_guarded_label_values(label_list),
1622            over_window_range_cache_left_miss_count: self
1623                .over_window_range_cache_left_miss_count
1624                .with_guarded_label_values(label_list),
1625            over_window_range_cache_right_miss_count: self
1626                .over_window_range_cache_right_miss_count
1627                .with_guarded_label_values(label_list),
1628            over_window_accessed_entry_count: self
1629                .over_window_accessed_entry_count
1630                .with_guarded_label_values(label_list),
1631            over_window_compute_count: self
1632                .over_window_compute_count
1633                .with_guarded_label_values(label_list),
1634            over_window_same_output_count: self
1635                .over_window_same_output_count
1636                .with_guarded_label_values(label_list),
1637        }
1638    }
1639
1640    pub fn new_materialize_cache_metrics(
1641        &self,
1642        table_id: TableId,
1643        actor_id: ActorId,
1644        fragment_id: FragmentId,
1645    ) -> MaterializeCacheMetrics {
1646        let label_list: &[&str; 3] = &[
1647            &actor_id.to_string(),
1648            &table_id.to_string(),
1649            &fragment_id.to_string(),
1650        ];
1651        MaterializeCacheMetrics {
1652            materialize_cache_hit_count: self
1653                .materialize_cache_hit_count
1654                .with_guarded_label_values(label_list),
1655            materialize_data_exist_count: self
1656                .materialize_data_exist_count
1657                .with_guarded_label_values(label_list),
1658            materialize_cache_total_count: self
1659                .materialize_cache_total_count
1660                .with_guarded_label_values(label_list),
1661        }
1662    }
1663
1664    pub fn new_materialize_metrics(
1665        &self,
1666        table_id: TableId,
1667        actor_id: ActorId,
1668        fragment_id: FragmentId,
1669    ) -> MaterializeMetrics {
1670        let label_list: &[&str; 3] = &[
1671            &actor_id.to_string(),
1672            &table_id.to_string(),
1673            &fragment_id.to_string(),
1674        ];
1675        MaterializeMetrics {
1676            materialize_input_row_count: self
1677                .materialize_input_row_count
1678                .with_guarded_label_values(label_list),
1679            materialize_current_epoch: self
1680                .materialize_current_epoch
1681                .with_guarded_label_values(label_list),
1682        }
1683    }
1684}
1685
1686pub(crate) struct ActorInputMetrics {
1687    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1688    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1689}
1690
1691/// Tokio metrics for actors
1692pub struct ActorMetrics {
1693    pub actor_scheduled_duration: LabelGuardedIntCounter,
1694    pub actor_scheduled_cnt: LabelGuardedIntCounter,
1695    pub actor_poll_duration: LabelGuardedIntCounter,
1696    pub actor_poll_cnt: LabelGuardedIntCounter,
1697    pub actor_idle_duration: LabelGuardedIntCounter,
1698    pub actor_idle_cnt: LabelGuardedIntCounter,
1699}
1700
1701pub struct SinkExecutorMetrics {
1702    pub sink_input_row_count: LabelGuardedIntCounter,
1703    pub sink_input_bytes: LabelGuardedIntCounter,
1704    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1705}
1706
1707pub struct MaterializeCacheMetrics {
1708    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1709    pub materialize_data_exist_count: LabelGuardedIntCounter,
1710    pub materialize_cache_total_count: LabelGuardedIntCounter,
1711}
1712
1713pub struct MaterializeMetrics {
1714    pub materialize_input_row_count: LabelGuardedIntCounter,
1715    pub materialize_current_epoch: LabelGuardedIntGauge,
1716}
1717
1718pub struct GroupTopNMetrics {
1719    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1720    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1721    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1722}
1723
1724pub struct LookupExecutorMetrics {
1725    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1726    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1727    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1728}
1729
1730pub struct HashAggMetrics {
1731    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1732    pub agg_total_lookup_count: LabelGuardedIntCounter,
1733    pub agg_cached_entry_count: LabelGuardedIntGauge,
1734    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1735    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1736    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1737    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1738    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1739    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1740}
1741
1742pub struct AggDistinctDedupMetrics {
1743    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1744    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1745    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1746}
1747
1748pub struct TemporalJoinMetrics {
1749    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1750    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1751    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1752}
1753
1754pub struct BackfillMetrics {
1755    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1756    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1757}
1758
1759#[derive(Clone)]
1760pub struct CdcBackfillMetrics {
1761    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1762    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1763}
1764
1765pub struct OverWindowMetrics {
1766    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1767    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1768    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1769    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1770    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1771    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1772    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1773    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1774    pub over_window_compute_count: LabelGuardedIntCounter,
1775    pub over_window_same_output_count: LabelGuardedIntCounter,
1776}