risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26    LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46    pub level: MetricLevel,
47
48    // Executor metrics (disabled by default)
49    pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51    // Profiling Metrics:
52    // Aggregated per operator rather than per actor.
53    // These are purely in-memory, never collected by prometheus.
54    pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55    pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57    // Streaming actor metrics from tokio (disabled by default)
58    actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59    actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60    actor_poll_duration: RelabeledGuardedIntCounterVec,
61    actor_poll_cnt: RelabeledGuardedIntCounterVec,
62    actor_idle_duration: RelabeledGuardedIntCounterVec,
63    actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65    // Streaming actor
66    pub actor_count: LabelGuardedIntGaugeVec,
67    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69    pub fragment_channel_buffered_bytes: LabelGuardedIntGaugeVec,
70    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
71
72    // Source
73    pub source_output_row_count: LabelGuardedIntCounterVec,
74    pub source_split_change_count: LabelGuardedIntCounterVec,
75    pub source_backfill_row_count: LabelGuardedIntCounterVec,
76
77    // Sink
78    sink_input_row_count: LabelGuardedIntCounterVec,
79    sink_input_bytes: LabelGuardedIntCounterVec,
80    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
81
82    // Exchange (see also `compute::ExchangeServiceMetrics`)
83    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
84
85    // Streaming Merge (We breakout this metric from `barrier_align_duration` because
86    // the alignment happens on different levels)
87    pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
88
89    // Backpressure
90    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
92
93    // Streaming Join
94    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
95    pub join_lookup_total_count: LabelGuardedIntCounterVec,
96    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
97    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
98    pub join_match_duration_ns: LabelGuardedIntCounterVec,
99    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
100    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
101
102    // Streaming Join, Streaming Dynamic Filter and Streaming Union
103    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
104
105    // Streaming Aggregation
106    agg_lookup_miss_count: LabelGuardedIntCounterVec,
107    agg_total_lookup_count: LabelGuardedIntCounterVec,
108    agg_cached_entry_count: LabelGuardedIntGaugeVec,
109    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
112    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
113    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
114    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
115    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
116    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
117    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
118
119    // Streaming TopN
120    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
121    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
122    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
123    // TODO(rc): why not just use the above three?
124    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
125    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
126    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
127
128    // Lookup executor
129    lookup_cache_miss_count: LabelGuardedIntCounterVec,
130    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
131    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
132
133    // temporal join
134    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
135    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
136    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
137
138    // Backfill
139    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
140    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
141
142    // CDC Backfill
143    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
144    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
145
146    // Snapshot Backfill
147    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
148
149    // Over Window
150    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
151    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
152    over_window_cache_miss_count: LabelGuardedIntCounterVec,
153    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
154    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
157    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
158    over_window_compute_count: LabelGuardedIntCounterVec,
159    over_window_same_output_count: LabelGuardedIntCounterVec,
160
161    /// The duration from receipt of barrier to all actors collection.
162    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
163    /// to flow through the graph.
164    pub barrier_inflight_latency: Histogram,
165    /// The duration of sync to storage.
166    pub barrier_sync_latency: Histogram,
167    pub barrier_batch_size: Histogram,
168    /// The progress made by the earliest in-flight barriers in the local barrier manager.
169    pub barrier_manager_progress: IntCounter,
170
171    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
172    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
173    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
174    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
175    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
176    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
177    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
178    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
179    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
180    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
182
183    pub crossdb_last_consumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196    pub sync_kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
197
198    // Memory management
199    pub lru_runtime_loop_count: IntCounter,
200    pub lru_latest_sequence: IntGauge,
201    pub lru_watermark_sequence: IntGauge,
202    pub lru_eviction_policy: IntGauge,
203    pub jemalloc_allocated_bytes: IntGauge,
204    pub jemalloc_active_bytes: IntGauge,
205    pub jemalloc_resident_bytes: IntGauge,
206    pub jemalloc_metadata_bytes: IntGauge,
207    pub jvm_allocated_bytes: IntGauge,
208    pub jvm_active_bytes: IntGauge,
209    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
210
211    // Materialized view
212    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
213    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
214    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
215    materialize_input_row_count: RelabeledGuardedIntCounterVec,
216    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
217
218    // PostgreSQL CDC LSN monitoring
219    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
220    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
221
222    // MySQL CDC binlog monitoring
223    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
224    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
225
226    // SQL Server CDC LSN monitoring
227    pub sqlserver_cdc_state_change_lsn: LabelGuardedIntGaugeVec,
228    pub sqlserver_cdc_state_commit_lsn: LabelGuardedIntGaugeVec,
229    pub sqlserver_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
230
231    // Gap Fill
232    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
233
234    // State Table
235    pub state_table_iter_count: RelabeledGuardedIntCounterVec,
236    pub state_table_get_count: RelabeledGuardedIntCounterVec,
237    pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
238    pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
239}
240
241pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
242
243pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
244    GLOBAL_STREAMING_METRICS
245        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
246        .clone()
247}
248
249impl StreamingMetrics {
250    pub fn new(registry: &Registry, level: MetricLevel) -> Self {
251        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
252            "stream_executor_row_count",
253            "Total number of rows that have been output from each executor",
254            &["actor_id", "fragment_id", "executor_identity"],
255            registry
256        )
257        .unwrap()
258        .relabel_debug_1(level);
259
260        let stream_node_output_row_count = CountMap::new();
261        let stream_node_output_blocking_duration_ns = CountMap::new();
262
263        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
264            "stream_source_output_rows_counts",
265            "Total number of rows that have been output from source",
266            &["source_id", "source_name", "actor_id", "fragment_id"],
267            registry
268        )
269        .unwrap();
270
271        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
272            "stream_source_split_change_event_count",
273            "Total number of split change events that have been operated by source",
274            &["source_id", "source_name", "actor_id", "fragment_id"],
275            registry
276        )
277        .unwrap();
278
279        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
280            "stream_source_backfill_rows_counts",
281            "Total number of rows that have been backfilled for source",
282            &["source_id", "source_name", "actor_id", "fragment_id"],
283            registry
284        )
285        .unwrap();
286
287        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
288            "stream_sink_input_row_count",
289            "Total number of rows streamed into sink executors",
290            &["sink_id", "actor_id", "fragment_id"],
291            registry
292        )
293        .unwrap();
294
295        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
296            "stream_sink_input_bytes",
297            "Total size of chunks streamed into sink executors",
298            &["sink_id", "actor_id", "fragment_id"],
299            registry
300        )
301        .unwrap();
302
303        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
304            "stream_mview_input_row_count",
305            "Total number of rows streamed into materialize executors",
306            &["actor_id", "table_id", "fragment_id"],
307            registry
308        )
309        .unwrap()
310        .relabel_debug_1(level);
311
312        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
313            "stream_mview_current_epoch",
314            "The current epoch of the materialized executor",
315            &["actor_id", "table_id", "fragment_id"],
316            registry
317        )
318        .unwrap()
319        .relabel_debug_1(level);
320
321        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
322            "stream_pg_cdc_state_table_lsn",
323            "Current LSN value stored in PostgreSQL CDC state table",
324            &["source_id"],
325            registry,
326        )
327        .unwrap();
328
329        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
330            "stream_pg_cdc_jni_commit_offset_lsn",
331            "LSN value when JNI commit offset is called for PostgreSQL CDC",
332            &["source_id"],
333            registry,
334        )
335        .unwrap();
336
337        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
338            "stream_mysql_cdc_state_binlog_file_seq",
339            "Current binlog file sequence number stored in MySQL CDC state table",
340            &["source_id"],
341            registry,
342        )
343        .unwrap();
344
345        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
346            "stream_mysql_cdc_state_binlog_position",
347            "Current binlog position stored in MySQL CDC state table",
348            &["source_id"],
349            registry,
350        )
351        .unwrap();
352
353        let sqlserver_cdc_state_change_lsn = register_guarded_int_gauge_vec_with_registry!(
354            "stream_sqlserver_cdc_state_change_lsn",
355            "Current change_lsn value stored in SQL Server CDC state table",
356            &["source_id"],
357            registry,
358        )
359        .unwrap();
360
361        let sqlserver_cdc_state_commit_lsn = register_guarded_int_gauge_vec_with_registry!(
362            "stream_sqlserver_cdc_state_commit_lsn",
363            "Current commit_lsn value stored in SQL Server CDC state table",
364            &["source_id"],
365            registry,
366        )
367        .unwrap();
368
369        let sqlserver_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
370            "stream_sqlserver_cdc_jni_commit_offset_lsn",
371            "LSN value when JNI commit offset is called for SQL Server CDC",
372            &["source_id"],
373            registry,
374        )
375        .unwrap();
376
377        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
378            "stream_sink_chunk_buffer_size",
379            "Total size of chunks buffered in a barrier",
380            &["sink_id", "actor_id", "fragment_id"],
381            registry
382        )
383        .unwrap();
384        let actor_output_buffer_blocking_duration_ns =
385            register_guarded_int_counter_vec_with_registry!(
386                "stream_actor_output_buffer_blocking_duration_ns",
387                "Total blocking duration (ns) of output buffer",
388                &["actor_id", "fragment_id", "downstream_fragment_id"],
389                registry
390            )
391            .unwrap()
392            // mask the first label `actor_id` if the level is less verbose than `Debug`
393            .relabel_debug_1(level);
394
395        let actor_input_buffer_blocking_duration_ns =
396            register_guarded_int_counter_vec_with_registry!(
397                "stream_actor_input_buffer_blocking_duration_ns",
398                "Total blocking duration (ns) of input buffer",
399                &["actor_id", "fragment_id", "upstream_fragment_id"],
400                registry
401            )
402            .unwrap()
403            // mask the first label `actor_id` if the level is less verbose than `Debug`
404            .relabel_debug_1(level);
405
406        let fragment_channel_buffered_bytes = register_guarded_int_gauge_vec_with_registry!(
407            "stream_fragment_channel_buffered_bytes",
408            "Estimated buffered bytes for actor channels by fragment",
409            &["fragment_id"],
410            registry
411        )
412        .unwrap();
413
414        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
415            "stream_exchange_frag_recv_size",
416            "Total size of messages that have been received from upstream Fragment",
417            &["up_fragment_id", "down_fragment_id"],
418            registry
419        )
420        .unwrap();
421
422        let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
423            "stream_actor_poll_duration",
424            "tokio's metrics",
425            &["actor_id", "fragment_id"],
426            registry
427        )
428        .unwrap()
429        .relabel_debug_1(level);
430
431        let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
432            "stream_actor_poll_cnt",
433            "tokio's metrics",
434            &["actor_id", "fragment_id"],
435            registry
436        )
437        .unwrap()
438        .relabel_debug_1(level);
439
440        let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
441            "stream_actor_scheduled_duration",
442            "tokio's metrics",
443            &["actor_id", "fragment_id"],
444            registry
445        )
446        .unwrap()
447        .relabel_debug_1(level);
448
449        let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
450            "stream_actor_scheduled_cnt",
451            "tokio's metrics",
452            &["actor_id", "fragment_id"],
453            registry
454        )
455        .unwrap()
456        .relabel_debug_1(level);
457
458        let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
459            "stream_actor_idle_duration",
460            "tokio's metrics",
461            &["actor_id", "fragment_id"],
462            registry
463        )
464        .unwrap()
465        .relabel_debug_1(level);
466
467        let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
468            "stream_actor_idle_cnt",
469            "tokio's metrics",
470            &["actor_id", "fragment_id"],
471            registry
472        )
473        .unwrap()
474        .relabel_debug_1(level);
475
476        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
477            "stream_actor_in_record_cnt",
478            "Total number of rows actor received",
479            &["actor_id", "fragment_id", "upstream_fragment_id"],
480            registry
481        )
482        .unwrap()
483        .relabel_debug_1(level);
484
485        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
486            "stream_actor_out_record_cnt",
487            "Total number of rows actor sent",
488            &["actor_id", "fragment_id"],
489            registry
490        )
491        .unwrap()
492        .relabel_debug_1(level);
493
494        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
495            "stream_actor_current_epoch",
496            "Current epoch of actor",
497            &["actor_id", "fragment_id"],
498            registry
499        )
500        .unwrap()
501        .relabel_debug_1(level);
502
503        let actor_count = register_guarded_int_gauge_vec_with_registry!(
504            "stream_actor_count",
505            "Total number of actors (parallelism)",
506            &["fragment_id"],
507            registry
508        )
509        .unwrap();
510
511        let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
512            "stream_merge_barrier_align_duration_ns",
513            "Total merge barrier alignment duration (ns)",
514            &["actor_id", "fragment_id"],
515            registry
516        )
517        .unwrap()
518        .relabel_debug_1(level);
519
520        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
521            "stream_join_lookup_miss_count",
522            "Join executor lookup miss duration",
523            &["side", "join_table_id", "actor_id", "fragment_id"],
524            registry
525        )
526        .unwrap();
527
528        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
529            "stream_join_lookup_total_count",
530            "Join executor lookup total operation",
531            &["side", "join_table_id", "actor_id", "fragment_id"],
532            registry
533        )
534        .unwrap();
535
536        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
537            "stream_join_insert_cache_miss_count",
538            "Join executor cache miss when insert operation",
539            &["side", "join_table_id", "actor_id", "fragment_id"],
540            registry
541        )
542        .unwrap();
543
544        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
545            "stream_join_actor_input_waiting_duration_ns",
546            "Total waiting duration (ns) of input buffer of join actor",
547            &["actor_id", "fragment_id"],
548            registry
549        )
550        .unwrap();
551
552        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
553            "stream_join_match_duration_ns",
554            "Matching duration for each side",
555            &["actor_id", "fragment_id", "side"],
556            registry
557        )
558        .unwrap();
559
560        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
561            "stream_barrier_align_duration_ns",
562            "Duration of join align barrier",
563            &["actor_id", "fragment_id", "wait_side", "executor"],
564            registry
565        )
566        .unwrap()
567        .relabel_debug_1(level);
568
569        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
570            "stream_join_cached_entry_count",
571            "Number of cached entries in streaming join operators",
572            &["actor_id", "fragment_id", "side"],
573            registry
574        )
575        .unwrap();
576
577        let join_matched_join_keys_opts = histogram_opts!(
578            "stream_join_matched_join_keys",
579            "The number of keys matched in the opposite side",
580            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
581        );
582
583        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
584            join_matched_join_keys_opts,
585            &["actor_id", "fragment_id", "table_id"],
586            registry
587        )
588        .unwrap()
589        .relabel_debug_1(level);
590
591        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
592            "stream_agg_lookup_miss_count",
593            "Aggregation executor lookup miss duration",
594            &["table_id", "actor_id", "fragment_id"],
595            registry
596        )
597        .unwrap();
598
599        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
600            "stream_agg_lookup_total_count",
601            "Aggregation executor lookup total operation",
602            &["table_id", "actor_id", "fragment_id"],
603            registry
604        )
605        .unwrap();
606
607        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
608            "stream_agg_distinct_cache_miss_count",
609            "Aggregation executor dinsinct miss duration",
610            &["table_id", "actor_id", "fragment_id"],
611            registry
612        )
613        .unwrap();
614
615        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
616            "stream_agg_distinct_total_cache_count",
617            "Aggregation executor distinct total operation",
618            &["table_id", "actor_id", "fragment_id"],
619            registry
620        )
621        .unwrap();
622
623        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
624            "stream_agg_distinct_cached_entry_count",
625            "Total entry counts in distinct aggregation executor cache",
626            &["table_id", "actor_id", "fragment_id"],
627            registry
628        )
629        .unwrap();
630
631        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
632            "stream_agg_dirty_groups_count",
633            "Total dirty group counts in aggregation executor",
634            &["table_id", "actor_id", "fragment_id"],
635            registry
636        )
637        .unwrap();
638
639        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
640            "stream_agg_dirty_groups_heap_size",
641            "Total dirty group heap size in aggregation executor",
642            &["table_id", "actor_id", "fragment_id"],
643            registry
644        )
645        .unwrap();
646
647        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
648            "stream_agg_state_cache_lookup_count",
649            "Aggregation executor state cache lookup count",
650            &["table_id", "actor_id", "fragment_id"],
651            registry
652        )
653        .unwrap();
654
655        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
656            "stream_agg_state_cache_miss_count",
657            "Aggregation executor state cache miss count",
658            &["table_id", "actor_id", "fragment_id"],
659            registry
660        )
661        .unwrap();
662
663        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
664            "stream_group_top_n_cache_miss_count",
665            "Group top n executor cache miss count",
666            &["table_id", "actor_id", "fragment_id"],
667            registry
668        )
669        .unwrap();
670
671        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
672            "stream_group_top_n_total_query_cache_count",
673            "Group top n executor query cache total count",
674            &["table_id", "actor_id", "fragment_id"],
675            registry
676        )
677        .unwrap();
678
679        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
680            "stream_group_top_n_cached_entry_count",
681            "Total entry counts in group top n executor cache",
682            &["table_id", "actor_id", "fragment_id"],
683            registry
684        )
685        .unwrap();
686
687        let group_top_n_appendonly_cache_miss_count =
688            register_guarded_int_counter_vec_with_registry!(
689                "stream_group_top_n_appendonly_cache_miss_count",
690                "Group top n appendonly executor cache miss count",
691                &["table_id", "actor_id", "fragment_id"],
692                registry
693            )
694            .unwrap();
695
696        let group_top_n_appendonly_total_query_cache_count =
697            register_guarded_int_counter_vec_with_registry!(
698                "stream_group_top_n_appendonly_total_query_cache_count",
699                "Group top n appendonly executor total cache count",
700                &["table_id", "actor_id", "fragment_id"],
701                registry
702            )
703            .unwrap();
704
705        let group_top_n_appendonly_cached_entry_count =
706            register_guarded_int_gauge_vec_with_registry!(
707                "stream_group_top_n_appendonly_cached_entry_count",
708                "Total entry counts in group top n appendonly executor cache",
709                &["table_id", "actor_id", "fragment_id"],
710                registry
711            )
712            .unwrap();
713
714        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
715            "stream_lookup_cache_miss_count",
716            "Lookup executor cache miss count",
717            &["table_id", "actor_id", "fragment_id"],
718            registry
719        )
720        .unwrap();
721
722        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
723            "stream_lookup_total_query_cache_count",
724            "Lookup executor query cache total count",
725            &["table_id", "actor_id", "fragment_id"],
726            registry
727        )
728        .unwrap();
729
730        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
731            "stream_lookup_cached_entry_count",
732            "Total entry counts in lookup executor cache",
733            &["table_id", "actor_id", "fragment_id"],
734            registry
735        )
736        .unwrap();
737
738        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
739            "stream_temporal_join_cache_miss_count",
740            "Temporal join executor cache miss count",
741            &["table_id", "actor_id", "fragment_id"],
742            registry
743        )
744        .unwrap();
745
746        let temporal_join_total_query_cache_count =
747            register_guarded_int_counter_vec_with_registry!(
748                "stream_temporal_join_total_query_cache_count",
749                "Temporal join executor query cache total count",
750                &["table_id", "actor_id", "fragment_id"],
751                registry
752            )
753            .unwrap();
754
755        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
756            "stream_temporal_join_cached_entry_count",
757            "Total entry count in temporal join executor cache",
758            &["table_id", "actor_id", "fragment_id"],
759            registry
760        )
761        .unwrap();
762
763        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
764            "stream_agg_cached_entry_count",
765            "Number of cached keys in streaming aggregation operators",
766            &["table_id", "actor_id", "fragment_id"],
767            registry
768        )
769        .unwrap();
770
771        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
772            "stream_agg_chunk_lookup_miss_count",
773            "Aggregation executor chunk-level lookup miss duration",
774            &["table_id", "actor_id", "fragment_id"],
775            registry
776        )
777        .unwrap();
778
779        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
780            "stream_agg_chunk_lookup_total_count",
781            "Aggregation executor chunk-level lookup total operation",
782            &["table_id", "actor_id", "fragment_id"],
783            registry
784        )
785        .unwrap();
786
787        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
788            "stream_backfill_snapshot_read_row_count",
789            "Total number of rows that have been read from the backfill snapshot",
790            &["table_id", "actor_id"],
791            registry
792        )
793        .unwrap();
794
795        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
796            "stream_backfill_upstream_output_row_count",
797            "Total number of rows that have been output from the backfill upstream",
798            &["table_id", "actor_id"],
799            registry
800        )
801        .unwrap();
802
803        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
804            "stream_cdc_backfill_snapshot_read_row_count",
805            "Total number of rows that have been read from the cdc_backfill snapshot",
806            &["table_id", "actor_id"],
807            registry
808        )
809        .unwrap();
810
811        let cdc_backfill_upstream_output_row_count =
812            register_guarded_int_counter_vec_with_registry!(
813                "stream_cdc_backfill_upstream_output_row_count",
814                "Total number of rows that have been output from the cdc_backfill upstream",
815                &["table_id", "actor_id"],
816                registry
817            )
818            .unwrap();
819
820        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
821            "stream_snapshot_backfill_consume_snapshot_row_count",
822            "Total number of rows that have been output from snapshot backfill",
823            &["table_id", "actor_id", "stage"],
824            registry
825        )
826        .unwrap();
827
828        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
829            "stream_over_window_cached_entry_count",
830            "Total entry (partition) count in over window executor cache",
831            &["table_id", "actor_id", "fragment_id"],
832            registry
833        )
834        .unwrap();
835
836        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
837            "stream_over_window_cache_lookup_count",
838            "Over window executor cache lookup count",
839            &["table_id", "actor_id", "fragment_id"],
840            registry
841        )
842        .unwrap();
843
844        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
845            "stream_over_window_cache_miss_count",
846            "Over window executor cache miss count",
847            &["table_id", "actor_id", "fragment_id"],
848            registry
849        )
850        .unwrap();
851
852        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
853            "stream_over_window_range_cache_entry_count",
854            "Over window partition range cache entry count",
855            &["table_id", "actor_id", "fragment_id"],
856            registry,
857        )
858        .unwrap();
859
860        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
861            "stream_over_window_range_cache_lookup_count",
862            "Over window partition range cache lookup count",
863            &["table_id", "actor_id", "fragment_id"],
864            registry
865        )
866        .unwrap();
867
868        let over_window_range_cache_left_miss_count =
869            register_guarded_int_counter_vec_with_registry!(
870                "stream_over_window_range_cache_left_miss_count",
871                "Over window partition range cache left miss count",
872                &["table_id", "actor_id", "fragment_id"],
873                registry
874            )
875            .unwrap();
876
877        let over_window_range_cache_right_miss_count =
878            register_guarded_int_counter_vec_with_registry!(
879                "stream_over_window_range_cache_right_miss_count",
880                "Over window partition range cache right miss count",
881                &["table_id", "actor_id", "fragment_id"],
882                registry
883            )
884            .unwrap();
885
886        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
887            "stream_over_window_accessed_entry_count",
888            "Over window accessed entry count",
889            &["table_id", "actor_id", "fragment_id"],
890            registry
891        )
892        .unwrap();
893
894        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
895            "stream_over_window_compute_count",
896            "Over window compute count",
897            &["table_id", "actor_id", "fragment_id"],
898            registry
899        )
900        .unwrap();
901
902        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
903            "stream_over_window_same_output_count",
904            "Over window same output count",
905            &["table_id", "actor_id", "fragment_id"],
906            registry
907        )
908        .unwrap();
909
910        let opts = histogram_opts!(
911            "stream_barrier_inflight_duration_seconds",
912            "barrier_inflight_latency",
913            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
914        );
915        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
916
917        let opts = histogram_opts!(
918            "stream_barrier_sync_storage_duration_seconds",
919            "barrier_sync_latency",
920            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
921        );
922        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
923
924        let opts = histogram_opts!(
925            "stream_barrier_batch_size",
926            "barrier_batch_size",
927            exponential_buckets(1.0, 2.0, 8).unwrap()
928        );
929        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
930
931        let barrier_manager_progress = register_int_counter_with_registry!(
932            "stream_barrier_manager_progress",
933            "The number of actors that have processed the earliest in-flight barriers",
934            registry
935        )
936        .unwrap();
937
938        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
939            "sync_kv_log_store_wait_next_poll_ns",
940            "Total duration (ns) of waiting for next poll",
941            &["actor_id", "target", "fragment_id", "relation"],
942            registry
943        )
944        .unwrap();
945
946        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
947            "sync_kv_log_store_read_count",
948            "read row count throughput of sync_kv log store",
949            &["type", "actor_id", "target", "fragment_id", "relation"],
950            registry
951        )
952        .unwrap();
953
954        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
955            "sync_kv_log_store_read_size",
956            "read size throughput of sync_kv log store",
957            &["type", "actor_id", "target", "fragment_id", "relation"],
958            registry
959        )
960        .unwrap();
961
962        let sync_kv_log_store_write_pause_duration_ns =
963            register_guarded_int_counter_vec_with_registry!(
964                "sync_kv_log_store_write_pause_duration_ns",
965                "Duration (ns) of sync_kv log store write pause",
966                &["actor_id", "target", "fragment_id", "relation"],
967                registry
968            )
969            .unwrap();
970
971        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
972            "sync_kv_log_store_state",
973            "clean/unclean state transition for sync_kv log store",
974            &["state", "actor_id", "target", "fragment_id", "relation"],
975            registry
976        )
977        .unwrap();
978
979        let sync_kv_log_store_storage_write_count =
980            register_guarded_int_counter_vec_with_registry!(
981                "sync_kv_log_store_storage_write_count",
982                "Write row count throughput of sync_kv log store",
983                &["actor_id", "target", "fragment_id", "relation"],
984                registry
985            )
986            .unwrap();
987
988        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
989            "sync_kv_log_store_storage_write_size",
990            "Write size throughput of sync_kv log store",
991            &["actor_id", "target", "fragment_id", "relation"],
992            registry
993        )
994        .unwrap();
995
996        let sync_kv_log_store_buffer_unconsumed_item_count =
997            register_guarded_int_gauge_vec_with_registry!(
998                "sync_kv_log_store_buffer_unconsumed_item_count",
999                "Number of Unconsumed Item in buffer",
1000                &["actor_id", "target", "fragment_id", "relation"],
1001                registry
1002            )
1003            .unwrap();
1004
1005        let sync_kv_log_store_buffer_unconsumed_row_count =
1006            register_guarded_int_gauge_vec_with_registry!(
1007                "sync_kv_log_store_buffer_unconsumed_row_count",
1008                "Number of Unconsumed Row in buffer",
1009                &["actor_id", "target", "fragment_id", "relation"],
1010                registry
1011            )
1012            .unwrap();
1013
1014        let sync_kv_log_store_buffer_unconsumed_epoch_count =
1015            register_guarded_int_gauge_vec_with_registry!(
1016                "sync_kv_log_store_buffer_unconsumed_epoch_count",
1017                "Number of Unconsumed Epoch in buffer",
1018                &["actor_id", "target", "fragment_id", "relation"],
1019                registry
1020            )
1021            .unwrap();
1022
1023        let sync_kv_log_store_buffer_unconsumed_min_epoch =
1024            register_guarded_int_gauge_vec_with_registry!(
1025                "sync_kv_log_store_buffer_unconsumed_min_epoch",
1026                "Number of Unconsumed Epoch in buffer",
1027                &["actor_id", "target", "fragment_id", "relation"],
1028                registry
1029            )
1030            .unwrap();
1031        let sync_kv_log_store_buffer_memory_bytes =
1032            register_guarded_int_gauge_vec_with_registry!(
1033                "sync_kv_log_store_buffer_memory_bytes",
1034                "Estimated heap bytes used by synced kv log store buffer (unconsumed + consumed but not truncated)",
1035                &["actor_id", "target", "fragment_id", "relation"],
1036                registry
1037            )
1038            .unwrap();
1039
1040        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1041            "kv_log_store_storage_write_count",
1042            "Write row count throughput of kv log store",
1043            &["actor_id", "connector", "sink_id", "sink_name"],
1044            registry
1045        )
1046        .unwrap();
1047
1048        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1049            "kv_log_store_storage_write_size",
1050            "Write size throughput of kv log store",
1051            &["actor_id", "connector", "sink_id", "sink_name"],
1052            registry
1053        )
1054        .unwrap();
1055
1056        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1057            "kv_log_store_storage_read_count",
1058            "Write row count throughput of kv log store",
1059            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1060            registry
1061        )
1062        .unwrap();
1063
1064        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1065            "kv_log_store_storage_read_size",
1066            "Write size throughput of kv log store",
1067            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1068            registry
1069        )
1070        .unwrap();
1071
1072        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1073            "kv_log_store_rewind_count",
1074            "Kv log store rewind rate",
1075            &["actor_id", "connector", "sink_id", "sink_name"],
1076            registry
1077        )
1078        .unwrap();
1079
1080        let kv_log_store_rewind_delay_opts = {
1081            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1082            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1083                - REWIND_BASE_DELAY.as_secs_f64().log2())
1084            .ceil() as usize;
1085            let buckets = exponential_buckets(
1086                REWIND_BASE_DELAY.as_secs_f64(),
1087                REWIND_BACKOFF_FACTOR as _,
1088                bucket_count,
1089            )
1090            .unwrap();
1091            histogram_opts!(
1092                "kv_log_store_rewind_delay",
1093                "Kv log store rewind delay",
1094                buckets,
1095            )
1096        };
1097
1098        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1099            kv_log_store_rewind_delay_opts,
1100            &["actor_id", "connector", "sink_id", "sink_name"],
1101            registry
1102        )
1103        .unwrap();
1104
1105        let kv_log_store_buffer_unconsumed_item_count =
1106            register_guarded_int_gauge_vec_with_registry!(
1107                "kv_log_store_buffer_unconsumed_item_count",
1108                "Number of Unconsumed Item in buffer",
1109                &["actor_id", "connector", "sink_id", "sink_name"],
1110                registry
1111            )
1112            .unwrap();
1113
1114        let kv_log_store_buffer_unconsumed_row_count =
1115            register_guarded_int_gauge_vec_with_registry!(
1116                "kv_log_store_buffer_unconsumed_row_count",
1117                "Number of Unconsumed Row in buffer",
1118                &["actor_id", "connector", "sink_id", "sink_name"],
1119                registry
1120            )
1121            .unwrap();
1122
1123        let kv_log_store_buffer_unconsumed_epoch_count =
1124            register_guarded_int_gauge_vec_with_registry!(
1125                "kv_log_store_buffer_unconsumed_epoch_count",
1126                "Number of Unconsumed Epoch in buffer",
1127                &["actor_id", "connector", "sink_id", "sink_name"],
1128                registry
1129            )
1130            .unwrap();
1131
1132        let kv_log_store_buffer_unconsumed_min_epoch =
1133            register_guarded_int_gauge_vec_with_registry!(
1134                "kv_log_store_buffer_unconsumed_min_epoch",
1135                "Number of Unconsumed Epoch in buffer",
1136                &["actor_id", "connector", "sink_id", "sink_name"],
1137                registry
1138            )
1139            .unwrap();
1140
1141        let crossdb_last_consumed_min_epoch = register_guarded_int_gauge_vec_with_registry!(
1142            "crossdb_last_consumed_min_epoch",
1143            "Last consumed min epoch for cross-database changelog stream scan",
1144            &["table_id", "actor_id", "fragment_id"],
1145            registry
1146        )
1147        .unwrap();
1148
1149        let kv_log_store_buffer_memory_bytes =
1150            register_guarded_int_gauge_vec_with_registry!(
1151                "kv_log_store_buffer_memory_bytes",
1152                "Estimated heap bytes used by kv log store buffer (unconsumed + consumed but not truncated)",
1153                &["actor_id", "connector", "sink_id", "sink_name"],
1154                registry
1155            )
1156            .unwrap();
1157
1158        let lru_runtime_loop_count = register_int_counter_with_registry!(
1159            "lru_runtime_loop_count",
1160            "The counts of the eviction loop in LRU manager per second",
1161            registry
1162        )
1163        .unwrap();
1164
1165        let lru_latest_sequence = register_int_gauge_with_registry!(
1166            "lru_latest_sequence",
1167            "Current LRU global sequence",
1168            registry,
1169        )
1170        .unwrap();
1171
1172        let lru_watermark_sequence = register_int_gauge_with_registry!(
1173            "lru_watermark_sequence",
1174            "Current LRU watermark sequence",
1175            registry,
1176        )
1177        .unwrap();
1178
1179        let lru_eviction_policy = register_int_gauge_with_registry!(
1180            "lru_eviction_policy",
1181            "Current LRU eviction policy",
1182            registry,
1183        )
1184        .unwrap();
1185
1186        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1187            "jemalloc_allocated_bytes",
1188            "The allocated memory jemalloc, got from jemalloc_ctl",
1189            registry
1190        )
1191        .unwrap();
1192
1193        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1194            "jemalloc_active_bytes",
1195            "The active memory jemalloc, got from jemalloc_ctl",
1196            registry
1197        )
1198        .unwrap();
1199
1200        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1201            "jemalloc_resident_bytes",
1202            "The active memory jemalloc, got from jemalloc_ctl",
1203            registry
1204        )
1205        .unwrap();
1206
1207        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1208            "jemalloc_metadata_bytes",
1209            "The active memory jemalloc, got from jemalloc_ctl",
1210            registry
1211        )
1212        .unwrap();
1213
1214        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1215            "jvm_allocated_bytes",
1216            "The allocated jvm memory",
1217            registry
1218        )
1219        .unwrap();
1220
1221        let jvm_active_bytes = register_int_gauge_with_registry!(
1222            "jvm_active_bytes",
1223            "The active jvm memory",
1224            registry
1225        )
1226        .unwrap();
1227
1228        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1229            "stream_materialize_cache_hit_count",
1230            "Materialize executor cache hit count",
1231            &["actor_id", "table_id", "fragment_id"],
1232            registry
1233        )
1234        .unwrap()
1235        .relabel_debug_1(level);
1236
1237        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1238            "stream_materialize_data_exist_count",
1239            "Materialize executor data exist count",
1240            &["actor_id", "table_id", "fragment_id"],
1241            registry
1242        )
1243        .unwrap()
1244        .relabel_debug_1(level);
1245
1246        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1247            "stream_materialize_cache_total_count",
1248            "Materialize executor cache total operation",
1249            &["actor_id", "table_id", "fragment_id"],
1250            registry
1251        )
1252        .unwrap()
1253        .relabel_debug_1(level);
1254
1255        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1256            "stream_memory_usage",
1257            "Memory usage for stream executors",
1258            &["actor_id", "table_id", "desc"],
1259            registry
1260        )
1261        .unwrap()
1262        .relabel_debug_1(level);
1263
1264        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1265            "gap_fill_generated_rows_count",
1266            "Total number of rows generated by gap fill executor",
1267            &["actor_id", "fragment_id"],
1268            registry
1269        )
1270        .unwrap()
1271        .relabel_debug_1(level);
1272
1273        let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1274            "state_table_iter_count",
1275            "Total number of state table iter operations",
1276            &["actor_id", "fragment_id", "table_id"],
1277            registry
1278        )
1279        .unwrap()
1280        .relabel_debug_1(level);
1281
1282        let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1283            "state_table_get_count",
1284            "Total number of state table get operations",
1285            &["actor_id", "fragment_id", "table_id"],
1286            registry
1287        )
1288        .unwrap()
1289        .relabel_debug_1(level);
1290
1291        let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1292            "state_table_iter_vnode_pruned_count",
1293            "Total number of state table iter operations pruned by vnode statistics",
1294            &["actor_id", "fragment_id", "table_id"],
1295            registry
1296        )
1297        .unwrap()
1298        .relabel_debug_1(level);
1299
1300        let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1301            "state_table_get_vnode_pruned_count",
1302            "Total number of state table get operations pruned by vnode statistics",
1303            &["actor_id", "fragment_id", "table_id"],
1304            registry
1305        )
1306        .unwrap()
1307        .relabel_debug_1(level);
1308
1309        Self {
1310            level,
1311            executor_row_count,
1312            mem_stream_node_output_row_count: stream_node_output_row_count,
1313            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1314            actor_scheduled_duration,
1315            actor_scheduled_cnt,
1316            actor_poll_duration,
1317            actor_poll_cnt,
1318            actor_idle_duration,
1319            actor_idle_cnt,
1320            actor_count,
1321            actor_in_record_cnt,
1322            actor_out_record_cnt,
1323            fragment_channel_buffered_bytes,
1324            actor_current_epoch,
1325            source_output_row_count,
1326            source_split_change_count,
1327            source_backfill_row_count,
1328            sink_input_row_count,
1329            sink_input_bytes,
1330            sink_chunk_buffer_size,
1331            exchange_frag_recv_size,
1332            merge_barrier_align_duration,
1333            actor_output_buffer_blocking_duration_ns,
1334            actor_input_buffer_blocking_duration_ns,
1335            join_lookup_miss_count,
1336            join_lookup_total_count,
1337            join_insert_cache_miss_count,
1338            join_actor_input_waiting_duration_ns,
1339            join_match_duration_ns,
1340            join_cached_entry_count,
1341            join_matched_join_keys,
1342            barrier_align_duration,
1343            agg_lookup_miss_count,
1344            agg_total_lookup_count,
1345            agg_cached_entry_count,
1346            agg_chunk_lookup_miss_count,
1347            agg_chunk_total_lookup_count,
1348            agg_dirty_groups_count,
1349            agg_dirty_groups_heap_size,
1350            agg_distinct_cache_miss_count,
1351            agg_distinct_total_cache_count,
1352            agg_distinct_cached_entry_count,
1353            agg_state_cache_lookup_count,
1354            agg_state_cache_miss_count,
1355            group_top_n_cache_miss_count,
1356            group_top_n_total_query_cache_count,
1357            group_top_n_cached_entry_count,
1358            group_top_n_appendonly_cache_miss_count,
1359            group_top_n_appendonly_total_query_cache_count,
1360            group_top_n_appendonly_cached_entry_count,
1361            lookup_cache_miss_count,
1362            lookup_total_query_cache_count,
1363            lookup_cached_entry_count,
1364            temporal_join_cache_miss_count,
1365            temporal_join_total_query_cache_count,
1366            temporal_join_cached_entry_count,
1367            backfill_snapshot_read_row_count,
1368            backfill_upstream_output_row_count,
1369            cdc_backfill_snapshot_read_row_count,
1370            cdc_backfill_upstream_output_row_count,
1371            snapshot_backfill_consume_row_count,
1372            over_window_cached_entry_count,
1373            over_window_cache_lookup_count,
1374            over_window_cache_miss_count,
1375            over_window_range_cache_entry_count,
1376            over_window_range_cache_lookup_count,
1377            over_window_range_cache_left_miss_count,
1378            over_window_range_cache_right_miss_count,
1379            over_window_accessed_entry_count,
1380            over_window_compute_count,
1381            over_window_same_output_count,
1382            barrier_inflight_latency,
1383            barrier_sync_latency,
1384            barrier_batch_size,
1385            barrier_manager_progress,
1386            kv_log_store_storage_write_count,
1387            kv_log_store_storage_write_size,
1388            kv_log_store_rewind_count,
1389            kv_log_store_rewind_delay,
1390            kv_log_store_storage_read_count,
1391            kv_log_store_storage_read_size,
1392            kv_log_store_buffer_unconsumed_item_count,
1393            kv_log_store_buffer_unconsumed_row_count,
1394            kv_log_store_buffer_unconsumed_epoch_count,
1395            kv_log_store_buffer_unconsumed_min_epoch,
1396            kv_log_store_buffer_memory_bytes,
1397            crossdb_last_consumed_min_epoch,
1398            sync_kv_log_store_read_count,
1399            sync_kv_log_store_read_size,
1400            sync_kv_log_store_write_pause_duration_ns,
1401            sync_kv_log_store_state,
1402            sync_kv_log_store_wait_next_poll_ns,
1403            sync_kv_log_store_storage_write_count,
1404            sync_kv_log_store_storage_write_size,
1405            sync_kv_log_store_buffer_unconsumed_item_count,
1406            sync_kv_log_store_buffer_unconsumed_row_count,
1407            sync_kv_log_store_buffer_unconsumed_epoch_count,
1408            sync_kv_log_store_buffer_unconsumed_min_epoch,
1409            sync_kv_log_store_buffer_memory_bytes,
1410            lru_runtime_loop_count,
1411            lru_latest_sequence,
1412            lru_watermark_sequence,
1413            lru_eviction_policy,
1414            jemalloc_allocated_bytes,
1415            jemalloc_active_bytes,
1416            jemalloc_resident_bytes,
1417            jemalloc_metadata_bytes,
1418            jvm_allocated_bytes,
1419            jvm_active_bytes,
1420            stream_memory_usage,
1421            materialize_cache_hit_count,
1422            materialize_data_exist_count,
1423            materialize_cache_total_count,
1424            materialize_input_row_count,
1425            materialize_current_epoch,
1426            pg_cdc_state_table_lsn,
1427            pg_cdc_jni_commit_offset_lsn,
1428            mysql_cdc_state_binlog_file_seq,
1429            mysql_cdc_state_binlog_position,
1430            sqlserver_cdc_state_change_lsn,
1431            sqlserver_cdc_state_commit_lsn,
1432            sqlserver_cdc_jni_commit_offset_lsn,
1433            gap_fill_generated_rows_count,
1434            state_table_iter_count,
1435            state_table_get_count,
1436            state_table_iter_vnode_pruned_count,
1437            state_table_get_vnode_pruned_count,
1438        }
1439    }
1440
1441    /// Create a new `StreamingMetrics` instance used in tests or other places.
1442    pub fn unused() -> Self {
1443        global_streaming_metrics(MetricLevel::Disabled)
1444    }
1445
1446    pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1447        let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1448        let actor_scheduled_duration = self
1449            .actor_scheduled_duration
1450            .with_guarded_label_values(label_list);
1451        let actor_scheduled_cnt = self
1452            .actor_scheduled_cnt
1453            .with_guarded_label_values(label_list);
1454        let actor_poll_duration = self
1455            .actor_poll_duration
1456            .with_guarded_label_values(label_list);
1457        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1458        let actor_idle_duration = self
1459            .actor_idle_duration
1460            .with_guarded_label_values(label_list);
1461        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1462        ActorMetrics {
1463            actor_scheduled_duration,
1464            actor_scheduled_cnt,
1465            actor_poll_duration,
1466            actor_poll_cnt,
1467            actor_idle_duration,
1468            actor_idle_cnt,
1469        }
1470    }
1471
1472    pub(crate) fn new_actor_input_metrics(
1473        &self,
1474        actor_id: ActorId,
1475        fragment_id: FragmentId,
1476        upstream_fragment_id: FragmentId,
1477    ) -> ActorInputMetrics {
1478        let actor_id_str = actor_id.to_string();
1479        let fragment_id_str = fragment_id.to_string();
1480        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1481        ActorInputMetrics {
1482            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1483                &actor_id_str,
1484                &fragment_id_str,
1485                &upstream_fragment_id_str,
1486            ]),
1487            actor_input_buffer_blocking_duration_ns: self
1488                .actor_input_buffer_blocking_duration_ns
1489                .with_guarded_label_values(&[
1490                    &actor_id_str,
1491                    &fragment_id_str,
1492                    &upstream_fragment_id_str,
1493                ]),
1494        }
1495    }
1496
1497    pub fn new_sink_exec_metrics(
1498        &self,
1499        id: SinkId,
1500        actor_id: ActorId,
1501        fragment_id: FragmentId,
1502    ) -> SinkExecutorMetrics {
1503        let label_list: &[&str; 3] = &[
1504            &id.to_string(),
1505            &actor_id.to_string(),
1506            &fragment_id.to_string(),
1507        ];
1508        SinkExecutorMetrics {
1509            sink_input_row_count: self
1510                .sink_input_row_count
1511                .with_guarded_label_values(label_list),
1512            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1513            sink_chunk_buffer_size: self
1514                .sink_chunk_buffer_size
1515                .with_guarded_label_values(label_list),
1516        }
1517    }
1518
1519    pub fn new_group_top_n_metrics(
1520        &self,
1521        table_id: TableId,
1522        actor_id: ActorId,
1523        fragment_id: FragmentId,
1524    ) -> GroupTopNMetrics {
1525        let label_list: &[&str; 3] = &[
1526            &table_id.to_string(),
1527            &actor_id.to_string(),
1528            &fragment_id.to_string(),
1529        ];
1530
1531        GroupTopNMetrics {
1532            group_top_n_cache_miss_count: self
1533                .group_top_n_cache_miss_count
1534                .with_guarded_label_values(label_list),
1535            group_top_n_total_query_cache_count: self
1536                .group_top_n_total_query_cache_count
1537                .with_guarded_label_values(label_list),
1538            group_top_n_cached_entry_count: self
1539                .group_top_n_cached_entry_count
1540                .with_guarded_label_values(label_list),
1541        }
1542    }
1543
1544    pub fn new_append_only_group_top_n_metrics(
1545        &self,
1546        table_id: TableId,
1547        actor_id: ActorId,
1548        fragment_id: FragmentId,
1549    ) -> GroupTopNMetrics {
1550        let label_list: &[&str; 3] = &[
1551            &table_id.to_string(),
1552            &actor_id.to_string(),
1553            &fragment_id.to_string(),
1554        ];
1555
1556        GroupTopNMetrics {
1557            group_top_n_cache_miss_count: self
1558                .group_top_n_appendonly_cache_miss_count
1559                .with_guarded_label_values(label_list),
1560            group_top_n_total_query_cache_count: self
1561                .group_top_n_appendonly_total_query_cache_count
1562                .with_guarded_label_values(label_list),
1563            group_top_n_cached_entry_count: self
1564                .group_top_n_appendonly_cached_entry_count
1565                .with_guarded_label_values(label_list),
1566        }
1567    }
1568
1569    pub fn new_lookup_executor_metrics(
1570        &self,
1571        table_id: TableId,
1572        actor_id: ActorId,
1573        fragment_id: FragmentId,
1574    ) -> LookupExecutorMetrics {
1575        let label_list: &[&str; 3] = &[
1576            &table_id.to_string(),
1577            &actor_id.to_string(),
1578            &fragment_id.to_string(),
1579        ];
1580
1581        LookupExecutorMetrics {
1582            lookup_cache_miss_count: self
1583                .lookup_cache_miss_count
1584                .with_guarded_label_values(label_list),
1585            lookup_total_query_cache_count: self
1586                .lookup_total_query_cache_count
1587                .with_guarded_label_values(label_list),
1588            lookup_cached_entry_count: self
1589                .lookup_cached_entry_count
1590                .with_guarded_label_values(label_list),
1591        }
1592    }
1593
1594    pub fn new_hash_agg_metrics(
1595        &self,
1596        table_id: TableId,
1597        actor_id: ActorId,
1598        fragment_id: FragmentId,
1599    ) -> HashAggMetrics {
1600        let label_list: &[&str; 3] = &[
1601            &table_id.to_string(),
1602            &actor_id.to_string(),
1603            &fragment_id.to_string(),
1604        ];
1605        HashAggMetrics {
1606            agg_lookup_miss_count: self
1607                .agg_lookup_miss_count
1608                .with_guarded_label_values(label_list),
1609            agg_total_lookup_count: self
1610                .agg_total_lookup_count
1611                .with_guarded_label_values(label_list),
1612            agg_cached_entry_count: self
1613                .agg_cached_entry_count
1614                .with_guarded_label_values(label_list),
1615            agg_chunk_lookup_miss_count: self
1616                .agg_chunk_lookup_miss_count
1617                .with_guarded_label_values(label_list),
1618            agg_chunk_total_lookup_count: self
1619                .agg_chunk_total_lookup_count
1620                .with_guarded_label_values(label_list),
1621            agg_dirty_groups_count: self
1622                .agg_dirty_groups_count
1623                .with_guarded_label_values(label_list),
1624            agg_dirty_groups_heap_size: self
1625                .agg_dirty_groups_heap_size
1626                .with_guarded_label_values(label_list),
1627            agg_state_cache_lookup_count: self
1628                .agg_state_cache_lookup_count
1629                .with_guarded_label_values(label_list),
1630            agg_state_cache_miss_count: self
1631                .agg_state_cache_miss_count
1632                .with_guarded_label_values(label_list),
1633        }
1634    }
1635
1636    pub fn new_agg_distinct_dedup_metrics(
1637        &self,
1638        table_id: TableId,
1639        actor_id: ActorId,
1640        fragment_id: FragmentId,
1641    ) -> AggDistinctDedupMetrics {
1642        let label_list: &[&str; 3] = &[
1643            &table_id.to_string(),
1644            &actor_id.to_string(),
1645            &fragment_id.to_string(),
1646        ];
1647        AggDistinctDedupMetrics {
1648            agg_distinct_cache_miss_count: self
1649                .agg_distinct_cache_miss_count
1650                .with_guarded_label_values(label_list),
1651            agg_distinct_total_cache_count: self
1652                .agg_distinct_total_cache_count
1653                .with_guarded_label_values(label_list),
1654            agg_distinct_cached_entry_count: self
1655                .agg_distinct_cached_entry_count
1656                .with_guarded_label_values(label_list),
1657        }
1658    }
1659
1660    pub fn new_temporal_join_metrics(
1661        &self,
1662        table_id: TableId,
1663        actor_id: ActorId,
1664        fragment_id: FragmentId,
1665    ) -> TemporalJoinMetrics {
1666        let label_list: &[&str; 3] = &[
1667            &table_id.to_string(),
1668            &actor_id.to_string(),
1669            &fragment_id.to_string(),
1670        ];
1671        TemporalJoinMetrics {
1672            temporal_join_cache_miss_count: self
1673                .temporal_join_cache_miss_count
1674                .with_guarded_label_values(label_list),
1675            temporal_join_total_query_cache_count: self
1676                .temporal_join_total_query_cache_count
1677                .with_guarded_label_values(label_list),
1678            temporal_join_cached_entry_count: self
1679                .temporal_join_cached_entry_count
1680                .with_guarded_label_values(label_list),
1681        }
1682    }
1683
1684    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1685        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1686        BackfillMetrics {
1687            backfill_snapshot_read_row_count: self
1688                .backfill_snapshot_read_row_count
1689                .with_guarded_label_values(label_list),
1690            backfill_upstream_output_row_count: self
1691                .backfill_upstream_output_row_count
1692                .with_guarded_label_values(label_list),
1693        }
1694    }
1695
1696    pub fn new_cdc_backfill_metrics(
1697        &self,
1698        table_id: TableId,
1699        actor_id: ActorId,
1700    ) -> CdcBackfillMetrics {
1701        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1702        CdcBackfillMetrics {
1703            cdc_backfill_snapshot_read_row_count: self
1704                .cdc_backfill_snapshot_read_row_count
1705                .with_guarded_label_values(label_list),
1706            cdc_backfill_upstream_output_row_count: self
1707                .cdc_backfill_upstream_output_row_count
1708                .with_guarded_label_values(label_list),
1709        }
1710    }
1711
1712    pub fn new_over_window_metrics(
1713        &self,
1714        table_id: TableId,
1715        actor_id: ActorId,
1716        fragment_id: FragmentId,
1717    ) -> OverWindowMetrics {
1718        let label_list: &[&str; 3] = &[
1719            &table_id.to_string(),
1720            &actor_id.to_string(),
1721            &fragment_id.to_string(),
1722        ];
1723        OverWindowMetrics {
1724            over_window_cached_entry_count: self
1725                .over_window_cached_entry_count
1726                .with_guarded_label_values(label_list),
1727            over_window_cache_lookup_count: self
1728                .over_window_cache_lookup_count
1729                .with_guarded_label_values(label_list),
1730            over_window_cache_miss_count: self
1731                .over_window_cache_miss_count
1732                .with_guarded_label_values(label_list),
1733            over_window_range_cache_entry_count: self
1734                .over_window_range_cache_entry_count
1735                .with_guarded_label_values(label_list),
1736            over_window_range_cache_lookup_count: self
1737                .over_window_range_cache_lookup_count
1738                .with_guarded_label_values(label_list),
1739            over_window_range_cache_left_miss_count: self
1740                .over_window_range_cache_left_miss_count
1741                .with_guarded_label_values(label_list),
1742            over_window_range_cache_right_miss_count: self
1743                .over_window_range_cache_right_miss_count
1744                .with_guarded_label_values(label_list),
1745            over_window_accessed_entry_count: self
1746                .over_window_accessed_entry_count
1747                .with_guarded_label_values(label_list),
1748            over_window_compute_count: self
1749                .over_window_compute_count
1750                .with_guarded_label_values(label_list),
1751            over_window_same_output_count: self
1752                .over_window_same_output_count
1753                .with_guarded_label_values(label_list),
1754        }
1755    }
1756
1757    pub fn new_materialize_cache_metrics(
1758        &self,
1759        table_id: TableId,
1760        actor_id: ActorId,
1761        fragment_id: FragmentId,
1762    ) -> MaterializeCacheMetrics {
1763        let label_list: &[&str; 3] = &[
1764            &actor_id.to_string(),
1765            &table_id.to_string(),
1766            &fragment_id.to_string(),
1767        ];
1768        MaterializeCacheMetrics {
1769            materialize_cache_hit_count: self
1770                .materialize_cache_hit_count
1771                .with_guarded_label_values(label_list),
1772            materialize_data_exist_count: self
1773                .materialize_data_exist_count
1774                .with_guarded_label_values(label_list),
1775            materialize_cache_total_count: self
1776                .materialize_cache_total_count
1777                .with_guarded_label_values(label_list),
1778        }
1779    }
1780
1781    pub fn new_materialize_metrics(
1782        &self,
1783        table_id: TableId,
1784        actor_id: ActorId,
1785        fragment_id: FragmentId,
1786    ) -> MaterializeMetrics {
1787        let label_list: &[&str; 3] = &[
1788            &actor_id.to_string(),
1789            &table_id.to_string(),
1790            &fragment_id.to_string(),
1791        ];
1792        MaterializeMetrics {
1793            materialize_input_row_count: self
1794                .materialize_input_row_count
1795                .with_guarded_label_values(label_list),
1796            materialize_current_epoch: self
1797                .materialize_current_epoch
1798                .with_guarded_label_values(label_list),
1799        }
1800    }
1801
1802    pub fn new_state_table_metrics(
1803        &self,
1804        table_id: TableId,
1805        actor_id: ActorId,
1806        fragment_id: FragmentId,
1807    ) -> StateTableMetrics {
1808        let label_list: &[&str; 3] = &[
1809            &actor_id.to_string(),
1810            &fragment_id.to_string(),
1811            &table_id.to_string(),
1812        ];
1813        StateTableMetrics {
1814            iter_count: self
1815                .state_table_iter_count
1816                .with_guarded_label_values(label_list),
1817            get_count: self
1818                .state_table_get_count
1819                .with_guarded_label_values(label_list),
1820            iter_vnode_pruned_count: self
1821                .state_table_iter_vnode_pruned_count
1822                .with_guarded_label_values(label_list),
1823            get_vnode_pruned_count: self
1824                .state_table_get_vnode_pruned_count
1825                .with_guarded_label_values(label_list),
1826        }
1827    }
1828}
1829
1830pub(crate) struct ActorInputMetrics {
1831    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1832    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1833}
1834
1835/// Tokio metrics for actors
1836pub struct ActorMetrics {
1837    pub actor_scheduled_duration: LabelGuardedIntCounter,
1838    pub actor_scheduled_cnt: LabelGuardedIntCounter,
1839    pub actor_poll_duration: LabelGuardedIntCounter,
1840    pub actor_poll_cnt: LabelGuardedIntCounter,
1841    pub actor_idle_duration: LabelGuardedIntCounter,
1842    pub actor_idle_cnt: LabelGuardedIntCounter,
1843}
1844
1845pub struct SinkExecutorMetrics {
1846    pub sink_input_row_count: LabelGuardedIntCounter,
1847    pub sink_input_bytes: LabelGuardedIntCounter,
1848    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1849}
1850
1851pub struct MaterializeCacheMetrics {
1852    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1853    pub materialize_data_exist_count: LabelGuardedIntCounter,
1854    pub materialize_cache_total_count: LabelGuardedIntCounter,
1855}
1856
1857pub struct MaterializeMetrics {
1858    pub materialize_input_row_count: LabelGuardedIntCounter,
1859    pub materialize_current_epoch: LabelGuardedIntGauge,
1860}
1861
1862pub struct GroupTopNMetrics {
1863    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1864    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1865    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1866}
1867
1868pub struct LookupExecutorMetrics {
1869    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1870    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1871    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1872}
1873
1874pub struct HashAggMetrics {
1875    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1876    pub agg_total_lookup_count: LabelGuardedIntCounter,
1877    pub agg_cached_entry_count: LabelGuardedIntGauge,
1878    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1879    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1880    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1881    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1882    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1883    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1884}
1885
1886pub struct AggDistinctDedupMetrics {
1887    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1888    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1889    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1890}
1891
1892pub struct TemporalJoinMetrics {
1893    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1894    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1895    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1896}
1897
1898pub struct BackfillMetrics {
1899    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1900    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1901}
1902
1903#[derive(Clone)]
1904pub struct CdcBackfillMetrics {
1905    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1906    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1907}
1908
1909pub struct OverWindowMetrics {
1910    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1911    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1912    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1913    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1914    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1915    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1916    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1917    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1918    pub over_window_compute_count: LabelGuardedIntCounter,
1919    pub over_window_same_output_count: LabelGuardedIntCounter,
1920}
1921
1922pub struct StateTableMetrics {
1923    pub iter_count: LabelGuardedIntCounter,
1924    pub get_count: LabelGuardedIntCounter,
1925    pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1926    pub get_vnode_pruned_count: LabelGuardedIntCounter,
1927}